Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/1338#issuecomment-50206155
If you call `first()` on an RDD, we only perform as much computation as
necessary to return the first result. If the RDD's lineage chain contains a
shuffle stage, we have to perform that entire map-side of the shuffle stage
before we can return the first result (e.g. if I call
`someRDD.groupByKey().first()`). The map-side shuffle data should be
automatically materialized to disk but I think we'd still pay the cost of
re-computing the first partition of the post-shuffle stage when transforming
the full RDD.
Here's a quick experiment in the shell (with `./bin/spark-shell --master
"local-cluster[2,2,512]"`):
```scala
scala> val myRDD = sc.parallelize(1 to 10).flatMap(x => (1 to 10).map(y =>
(x, y)))
myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = FlatMappedRDD[1] at flatMap
at <console>:12
scala> val grouped = myRDD.groupByKey(4)
grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] =
MappedValuesRDD[9] at groupByKey at <console>:14
scala> grouped.first()
14/07/25 14:14:09 INFO SparkContext: Starting job: first at <console>:17
14/07/25 14:14:09 INFO DAGScheduler: Registering RDD 1 (flatMap at
<console>:12)
14/07/25 14:14:09 INFO DAGScheduler: Got job 3 (first at <console>:17) with
1 output partitions (allowLocal=true)
14/07/25 14:14:09 INFO DAGScheduler: Final stage: Stage 6(first at
<console>:17)
14/07/25 14:14:09 INFO DAGScheduler: Parents of final stage: List(Stage 7)
14/07/25 14:14:09 INFO DAGScheduler: Missing parents: List(Stage 7)
14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 7 (FlatMappedRDD[1]
at flatMap at <console>:12), which has no missing parents
14/07/25 14:14:09 INFO DAGScheduler: Submitting 4 missing tasks from Stage
7 (FlatMappedRDD[1] at flatMap at <console>:12)
// Run entire map side of the shuffle stage:
14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 7.0 with 4 tasks
14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID
15, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
14/07/25 14:14:09 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID
16, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
14/07/25 14:14:09 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID
17, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
14/07/25 14:14:09 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID
18, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
14/07/25 14:14:09 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID
17) in 14 ms on dhcp-47-206.eecs.berkeley.edu (1/4)
14/07/25 14:14:09 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID
16) in 15 ms on dhcp-47-206.eecs.berkeley.edu (2/4)
14/07/25 14:14:09 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID
15) in 17 ms on dhcp-47-206.eecs.berkeley.edu (3/4)
14/07/25 14:14:09 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID
18) in 16 ms on dhcp-47-206.eecs.berkeley.edu (4/4)
14/07/25 14:14:09 INFO DAGScheduler: Stage 7 (flatMap at <console>:12)
finished in 0.018 s
14/07/25 14:14:09 INFO DAGScheduler: looking for newly runnable stages
14/07/25 14:14:09 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks
have all completed, from pool
14/07/25 14:14:09 INFO DAGScheduler: running: Set()
14/07/25 14:14:09 INFO DAGScheduler: waiting: Set(Stage 6)
14/07/25 14:14:09 INFO DAGScheduler: failed: Set()
14/07/25 14:14:09 INFO DAGScheduler: Missing parents for Stage 6: List()
14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 6 (MappedValuesRDD[9]
at groupByKey at <console>:14), which is now runnable
// Only ran a single post-shuffle task:
14/07/25 14:14:09 INFO DAGScheduler: Submitting 1 missing tasks from Stage
6 (MappedValuesRDD[9] at groupByKey at <console>:14)
14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID
19, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1721 bytes)
14/07/25 14:14:09 INFO MapOutputTrackerMasterActor: Asked to send map
output locations for shuffle 3 to [email protected]:53826
14/07/25 14:14:09 INFO MapOutputTrackerMaster: Size of output statuses for
shuffle 3 is 177 bytes
14/07/25 14:14:10 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID
19) in 181 ms on dhcp-47-206.eecs.berkeley.edu (1/1)
14/07/25 14:14:10 INFO DAGScheduler: Stage 6 (first at <console>:17)
finished in 0.182 s
14/07/25 14:14:10 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks
have all completed, from pool
14/07/25 14:14:10 INFO SparkContext: Job finished: first at <console>:17,
took 0.208059 s
res6: (Int, Iterable[Int]) = (4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
// Demonstration that the map side of the shuffle was automatically cached
/ materialized:
scala> grouped.collect().size
14/07/25 14:16:03 INFO SparkContext: Starting job: collect at <console>:17
14/07/25 14:16:03 INFO DAGScheduler: Got job 5 (collect at <console>:17)
with 4 output partitions (allowLocal=false)
14/07/25 14:16:03 INFO DAGScheduler: Final stage: Stage 10(collect at
<console>:17)
14/07/25 14:16:03 INFO DAGScheduler: Parents of final stage: List(Stage 11)
14/07/25 14:16:03 INFO DAGScheduler: Missing parents: List()
14/07/25 14:16:03 INFO DAGScheduler: Submitting Stage 10
(MappedValuesRDD[9] at groupByKey at <console>:14), which has no missing parents
14/07/25 14:16:03 INFO DAGScheduler: Submitting 4 missing tasks from Stage
10 (MappedValuesRDD[9] at groupByKey at <console>:14)
14/07/25 14:16:03 INFO TaskSchedulerImpl: Adding task set 10.0 with 4 tasks
14/07/25 14:16:03 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID
21, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
14/07/25 14:16:03 INFO TaskSetManager: Starting task 1.0 in stage 10.0 (TID
22, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
14/07/25 14:16:03 INFO TaskSetManager: Starting task 2.0 in stage 10.0 (TID
23, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
14/07/25 14:16:03 INFO TaskSetManager: Starting task 3.0 in stage 10.0 (TID
24, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
14/07/25 14:16:03 INFO TaskSetManager: Finished task 2.0 in stage 10.0 (TID
23) in 26 ms on dhcp-47-206.eecs.berkeley.edu (1/4)
14/07/25 14:16:03 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID
21) in 30 ms on dhcp-47-206.eecs.berkeley.edu (2/4)
14/07/25 14:16:03 INFO TaskSetManager: Finished task 1.0 in stage 10.0 (TID
22) in 32 ms on dhcp-47-206.eecs.berkeley.edu (3/4)
14/07/25 14:16:03 INFO TaskSetManager: Finished task 3.0 in stage 10.0 (TID
24) in 34 ms on dhcp-47-206.eecs.berkeley.edu (4/4)
14/07/25 14:16:03 INFO DAGScheduler: Stage 10 (collect at <console>:17)
finished in 0.036 s
14/07/25 14:16:03 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks
have all completed, from pool
14/07/25 14:16:03 INFO SparkContext: Job finished: collect at <console>:17,
took 0.040781 s
res8: Int = 10
```
In the worst case I think we only re-run one additional post-shuffle task.
In fact, `first()` is implemented in terms of `take(1)`, which pushes the limit
of `1` into the post-shuffle stage, so we probably won't run the full task
anyways. So, I think it's fine to use `first()` here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---