Github user JoshRosen commented on the pull request: https://github.com/apache/spark/pull/1338#issuecomment-50206155 If you call `first()` on an RDD, we only perform as much computation as necessary to return the first result. If the RDD's lineage chain contains a shuffle stage, we have to perform that entire map-side of the shuffle stage before we can return the first result (e.g. if I call `someRDD.groupByKey().first()`). The map-side shuffle data should be automatically materialized to disk but I think we'd still pay the cost of re-computing the first partition of the post-shuffle stage when transforming the full RDD. Here's a quick experiment in the shell (with `./bin/spark-shell --master "local-cluster[2,2,512]"`): ```scala scala> val myRDD = sc.parallelize(1 to 10).flatMap(x => (1 to 10).map(y => (x, y))) myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = FlatMappedRDD[1] at flatMap at <console>:12 scala> val grouped = myRDD.groupByKey(4) grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = MappedValuesRDD[9] at groupByKey at <console>:14 scala> grouped.first() 14/07/25 14:14:09 INFO SparkContext: Starting job: first at <console>:17 14/07/25 14:14:09 INFO DAGScheduler: Registering RDD 1 (flatMap at <console>:12) 14/07/25 14:14:09 INFO DAGScheduler: Got job 3 (first at <console>:17) with 1 output partitions (allowLocal=true) 14/07/25 14:14:09 INFO DAGScheduler: Final stage: Stage 6(first at <console>:17) 14/07/25 14:14:09 INFO DAGScheduler: Parents of final stage: List(Stage 7) 14/07/25 14:14:09 INFO DAGScheduler: Missing parents: List(Stage 7) 14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 7 (FlatMappedRDD[1] at flatMap at <console>:12), which has no missing parents 14/07/25 14:14:09 INFO DAGScheduler: Submitting 4 missing tasks from Stage 7 (FlatMappedRDD[1] at flatMap at <console>:12) // Run entire map side of the shuffle stage: 14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 7.0 with 4 tasks 14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID 15, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes) 14/07/25 14:14:09 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID 16, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes) 14/07/25 14:14:09 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID 17, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes) 14/07/25 14:14:09 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID 18, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes) 14/07/25 14:14:09 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 17) in 14 ms on dhcp-47-206.eecs.berkeley.edu (1/4) 14/07/25 14:14:09 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 16) in 15 ms on dhcp-47-206.eecs.berkeley.edu (2/4) 14/07/25 14:14:09 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 15) in 17 ms on dhcp-47-206.eecs.berkeley.edu (3/4) 14/07/25 14:14:09 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID 18) in 16 ms on dhcp-47-206.eecs.berkeley.edu (4/4) 14/07/25 14:14:09 INFO DAGScheduler: Stage 7 (flatMap at <console>:12) finished in 0.018 s 14/07/25 14:14:09 INFO DAGScheduler: looking for newly runnable stages 14/07/25 14:14:09 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool 14/07/25 14:14:09 INFO DAGScheduler: running: Set() 14/07/25 14:14:09 INFO DAGScheduler: waiting: Set(Stage 6) 14/07/25 14:14:09 INFO DAGScheduler: failed: Set() 14/07/25 14:14:09 INFO DAGScheduler: Missing parents for Stage 6: List() 14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 6 (MappedValuesRDD[9] at groupByKey at <console>:14), which is now runnable // Only ran a single post-shuffle task: 14/07/25 14:14:09 INFO DAGScheduler: Submitting 1 missing tasks from Stage 6 (MappedValuesRDD[9] at groupByKey at <console>:14) 14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks 14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 19, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1721 bytes) 14/07/25 14:14:09 INFO MapOutputTrackerMasterActor: Asked to send map output locations for shuffle 3 to sp...@dhcp-47-206.eecs.berkeley.edu:53826 14/07/25 14:14:09 INFO MapOutputTrackerMaster: Size of output statuses for shuffle 3 is 177 bytes 14/07/25 14:14:10 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 19) in 181 ms on dhcp-47-206.eecs.berkeley.edu (1/1) 14/07/25 14:14:10 INFO DAGScheduler: Stage 6 (first at <console>:17) finished in 0.182 s 14/07/25 14:14:10 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool 14/07/25 14:14:10 INFO SparkContext: Job finished: first at <console>:17, took 0.208059 s res6: (Int, Iterable[Int]) = (4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) // Demonstration that the map side of the shuffle was automatically cached / materialized: scala> grouped.collect().size 14/07/25 14:16:03 INFO SparkContext: Starting job: collect at <console>:17 14/07/25 14:16:03 INFO DAGScheduler: Got job 5 (collect at <console>:17) with 4 output partitions (allowLocal=false) 14/07/25 14:16:03 INFO DAGScheduler: Final stage: Stage 10(collect at <console>:17) 14/07/25 14:16:03 INFO DAGScheduler: Parents of final stage: List(Stage 11) 14/07/25 14:16:03 INFO DAGScheduler: Missing parents: List() 14/07/25 14:16:03 INFO DAGScheduler: Submitting Stage 10 (MappedValuesRDD[9] at groupByKey at <console>:14), which has no missing parents 14/07/25 14:16:03 INFO DAGScheduler: Submitting 4 missing tasks from Stage 10 (MappedValuesRDD[9] at groupByKey at <console>:14) 14/07/25 14:16:03 INFO TaskSchedulerImpl: Adding task set 10.0 with 4 tasks 14/07/25 14:16:03 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 21, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes) 14/07/25 14:16:03 INFO TaskSetManager: Starting task 1.0 in stage 10.0 (TID 22, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes) 14/07/25 14:16:03 INFO TaskSetManager: Starting task 2.0 in stage 10.0 (TID 23, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes) 14/07/25 14:16:03 INFO TaskSetManager: Starting task 3.0 in stage 10.0 (TID 24, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes) 14/07/25 14:16:03 INFO TaskSetManager: Finished task 2.0 in stage 10.0 (TID 23) in 26 ms on dhcp-47-206.eecs.berkeley.edu (1/4) 14/07/25 14:16:03 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 21) in 30 ms on dhcp-47-206.eecs.berkeley.edu (2/4) 14/07/25 14:16:03 INFO TaskSetManager: Finished task 1.0 in stage 10.0 (TID 22) in 32 ms on dhcp-47-206.eecs.berkeley.edu (3/4) 14/07/25 14:16:03 INFO TaskSetManager: Finished task 3.0 in stage 10.0 (TID 24) in 34 ms on dhcp-47-206.eecs.berkeley.edu (4/4) 14/07/25 14:16:03 INFO DAGScheduler: Stage 10 (collect at <console>:17) finished in 0.036 s 14/07/25 14:16:03 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks have all completed, from pool 14/07/25 14:16:03 INFO SparkContext: Job finished: collect at <console>:17, took 0.040781 s res8: Int = 10 ``` In the worst case I think we only re-run one additional post-shuffle task. In fact, `first()` is implemented in terms of `take(1)`, which pushes the limit of `1` into the post-shuffle stage, so we probably won't run the full task anyways. So, I think it's fine to use `first()` here.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---