Hi, I’m trying to understand toDebugString feature as how to get insights out 
of it. It seems a little confusing. Can anybody, please help me with 
understanding?

So, I have following example. First number is id, second is actual data, but 
for the sake of example it’s number of RDD.
RDD 1: (1, 1), (2, 1), (3, 1)
RDD 2: (3, 2), (4, 2), (5, 2)
RDD 3: (2, 3), (3, 3), (5, 3)

Number of RDD might be thought as a recency, so I have to get the most recent 
data in the result.
In the result I have to have: (1, 1), (2, 3), (3, 3), (4, 2), (5, 3).

It’s possible to achieve this by different ways. Actual code a little bit more 
complex, but the ideas are:
1: rdd1.union(rdd2).union(rdd3).reduceByKey(v2);
2. rdd1.fullOuterJoin(rdd2).reduceByKey(v2).fullOuterJoin(rdd3).reduceByKey(v2)
3. rdd1.cogroup(rdd2).reduceByKey(v2).cogroup(rdd3).reduceByKey(v2); //cogroup 
allows to group more, than one rdd, but it’s an example.

So, toDebugString look like:
union
(12) ShuffledRDD[8] at reduceByKey at DatasetsTest.java:67 []
 +-(12) UnionRDD[7] at union at DatasetsTest.java:67 []
    |   UnionRDD[6] at union at DatasetsTest.java:67 []
    |   MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:63 []
    |   ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:60 []
    |   MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:64 []
    |   ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:61 []
    |   MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:65 []
    |   ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:62 []

cogroup
(4) MapPartitionsRDD[16] at mapToPair at DatasetsTest.java:84 []
 |  MapPartitionsRDD[15] at cogroup at DatasetsTest.java:84 []
 |  MapPartitionsRDD[14] at cogroup at DatasetsTest.java:84 []
 |  CoGroupedRDD[13] at cogroup at DatasetsTest.java:84 []
 +-(4) MapPartitionsRDD[12] at mapToPair at DatasetsTest.java:84 []
 |  |  MapPartitionsRDD[11] at cogroup at DatasetsTest.java:84 []
 |  |  MapPartitionsRDD[10] at cogroup at DatasetsTest.java:84 []
 |  |  CoGroupedRDD[9] at cogroup at DatasetsTest.java:84 []
 |  +-(4) MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:63 []
 |  |  |  ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:60 []
 |  +-(4) MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:64 []
 |     |  ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:61 []
 +-(4) MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:65 []
    |  ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:62 []

join
(4) MapPartitionsRDD[18] at mapToPair at DatasetsTest.java:85 []
 |  MapPartitionsRDD[17] at fullOuterJoin at DatasetsTest.java:85 []
 |  MapPartitionsRDD[16] at fullOuterJoin at DatasetsTest.java:85 []
 |  MapPartitionsRDD[15] at fullOuterJoin at DatasetsTest.java:85 []
 |  CoGroupedRDD[14] at fullOuterJoin at DatasetsTest.java:85 []
 +-(4) MapPartitionsRDD[13] at mapToPair at DatasetsTest.java:85 []
 |  |  MapPartitionsRDD[12] at fullOuterJoin at DatasetsTest.java:85 []
 |  |  MapPartitionsRDD[11] at fullOuterJoin at DatasetsTest.java:85 []
 |  |  MapPartitionsRDD[10] at fullOuterJoin at DatasetsTest.java:85 []
 |  |  CoGroupedRDD[9] at fullOuterJoin at DatasetsTest.java:85 []
 |  +-(4) MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:64 []
 |  |  |  ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:61 []
 |  +-(4) MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:65 []
 |     |  ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:62 []
 +-(4) MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:66 []
    |  ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:63 []


The problem is I don’t really understand what will happen. My current 
understanding is that number in braces (12 and 4) are number of partitions to 
process. Different indentation means different stages. Different stage means 
different shuffle, which in turn means the data will be sent over the network. 
So, based on these I assume that there will be just one network hop for union 
and 4 network hops for both join and cogroup. 
Is my understanding correct?

If it is, then union is the fastest way of doing what I need, but it doesn’t 
guarantee ordering. So, it might make sense to transform RDDs into RDDs with 
some number first, then do union and that’ll still be faster, than other 
solutions.
Is my conclusion here correct?

One more question. Even though I do not change keys in fullOuterJoin and 
cogroup, it appers that nevertheless Spark does reshuffle. Why it does so?

Thanks in advance.

--
Eugene Morozov
fathers...@list.ru




Reply via email to