Hi, I’m trying to understand toDebugString feature as how to get insights out
of it. It seems a little confusing. Can anybody, please help me with
understanding?
So, I have following example. First number is id, second is actual data, but
for the sake of example it’s number of RDD.
RDD 1: (1, 1), (2, 1), (3, 1)
RDD 2: (3, 2), (4, 2), (5, 2)
RDD 3: (2, 3), (3, 3), (5, 3)
Number of RDD might be thought as a recency, so I have to get the most recent
data in the result.
In the result I have to have: (1, 1), (2, 3), (3, 3), (4, 2), (5, 3).
It’s possible to achieve this by different ways. Actual code a little bit more
complex, but the ideas are:
1: rdd1.union(rdd2).union(rdd3).reduceByKey(v2);
2. rdd1.fullOuterJoin(rdd2).reduceByKey(v2).fullOuterJoin(rdd3).reduceByKey(v2)
3. rdd1.cogroup(rdd2).reduceByKey(v2).cogroup(rdd3).reduceByKey(v2); //cogroup
allows to group more, than one rdd, but it’s an example.
So, toDebugString look like:
union
(12) ShuffledRDD[8] at reduceByKey at DatasetsTest.java:67 []
+-(12) UnionRDD[7] at union at DatasetsTest.java:67 []
| UnionRDD[6] at union at DatasetsTest.java:67 []
| MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:63 []
| ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:60 []
| MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:64 []
| ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:61 []
| MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:65 []
| ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:62 []
cogroup
(4) MapPartitionsRDD[16] at mapToPair at DatasetsTest.java:84 []
| MapPartitionsRDD[15] at cogroup at DatasetsTest.java:84 []
| MapPartitionsRDD[14] at cogroup at DatasetsTest.java:84 []
| CoGroupedRDD[13] at cogroup at DatasetsTest.java:84 []
+-(4) MapPartitionsRDD[12] at mapToPair at DatasetsTest.java:84 []
| | MapPartitionsRDD[11] at cogroup at DatasetsTest.java:84 []
| | MapPartitionsRDD[10] at cogroup at DatasetsTest.java:84 []
| | CoGroupedRDD[9] at cogroup at DatasetsTest.java:84 []
| +-(4) MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:63 []
| | | ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:60 []
| +-(4) MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:64 []
| | ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:61 []
+-(4) MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:65 []
| ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:62 []
join
(4) MapPartitionsRDD[18] at mapToPair at DatasetsTest.java:85 []
| MapPartitionsRDD[17] at fullOuterJoin at DatasetsTest.java:85 []
| MapPartitionsRDD[16] at fullOuterJoin at DatasetsTest.java:85 []
| MapPartitionsRDD[15] at fullOuterJoin at DatasetsTest.java:85 []
| CoGroupedRDD[14] at fullOuterJoin at DatasetsTest.java:85 []
+-(4) MapPartitionsRDD[13] at mapToPair at DatasetsTest.java:85 []
| | MapPartitionsRDD[12] at fullOuterJoin at DatasetsTest.java:85 []
| | MapPartitionsRDD[11] at fullOuterJoin at DatasetsTest.java:85 []
| | MapPartitionsRDD[10] at fullOuterJoin at DatasetsTest.java:85 []
| | CoGroupedRDD[9] at fullOuterJoin at DatasetsTest.java:85 []
| +-(4) MapPartitionsRDD[3] at mapToPair at DatasetsTest.java:64 []
| | | ParallelCollectionRDD[0] at parallelize at DatasetsTest.java:61 []
| +-(4) MapPartitionsRDD[4] at mapToPair at DatasetsTest.java:65 []
| | ParallelCollectionRDD[1] at parallelize at DatasetsTest.java:62 []
+-(4) MapPartitionsRDD[5] at mapToPair at DatasetsTest.java:66 []
| ParallelCollectionRDD[2] at parallelize at DatasetsTest.java:63 []
The problem is I don’t really understand what will happen. My current
understanding is that number in braces (12 and 4) are number of partitions to
process. Different indentation means different stages. Different stage means
different shuffle, which in turn means the data will be sent over the network.
So, based on these I assume that there will be just one network hop for union
and 4 network hops for both join and cogroup.
Is my understanding correct?
If it is, then union is the fastest way of doing what I need, but it doesn’t
guarantee ordering. So, it might make sense to transform RDDs into RDDs with
some number first, then do union and that’ll still be faster, than other
solutions.
Is my conclusion here correct?
One more question. Even though I do not change keys in fullOuterJoin and
cogroup, it appers that nevertheless Spark does reshuffle. Why it does so?
Thanks in advance.
--
Eugene Morozov
[email protected]