I want to join two RDDs such as R(K, V) and S(K, W), where the sets of keys from R and S are identical and the keys are unique. The resultant RDD should look like (K,(V,W)). Both the RDDs R and S and formed by using the map function to create key-value pairs. What is the most optimal way to carry out this operation? Both the RDDs cannot fit in the driver. Is there a way to use partitionby() to optimize this?
I am using Pyspark.
CodePudding user response:
You can use a join().
data1_ls = [
(1, 'a'),
(2, 'b')
]
data2_ls = [
(1, 'c'),
(2, 'd')
]
data1_rdd = spark.sparkContext.parallelize(data1_ls)
data2_rdd = spark.sparkContext.parallelize(data2_ls)
data1_rdd.join(data2_rdd).collect()
# [(1, ('a', 'c')), (2, ('b', 'd'))]
CodePudding user response:
If you have the same number of unique keys in both R and S, then there are many ways to do that without a join. However, do not immediately discard the join(), which could already be optimized to handle cases like this. Instead, you should do some benchmarks to find out the best method.
Here are a couple of ideas (untested):
Compute the union of the RDDs and then run a
groupByKey(). The resulting RDD should contain pairs of type(K, [V, W]).rdd1.union(rdd2).groupByKey() #.mapValues(list) if you collect() immediately afterThe same (more or less) could probably be accomplished by using
rdd1.cogroup(rdd2), I am not sure it is more performant though.You could sort the RDDs by their keys and then zip them. Finally, you map each pair from
((K, V), (K, W))to(K, (V, W)).rdd1.sortByKey().zip(rdd2.sortByKey()).map(lambda x: (x[0][0], (x[0][1], x[1][1])))
I do not see the benefit of using partitionBy() here, given that there is no reason to repartition your RDDs. You could also try to use reduceByKey()/aggregateByKey()/... instead of groupByKey() but I doubt the performances would be much different.
