Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/1338#issuecomment-50206155
  
    If you call `first()` on an RDD, we only perform as much computation as 
necessary to return the first result.  If the RDD's lineage chain contains a 
shuffle stage, we have to perform that entire map-side of the shuffle stage 
before we can return the first result (e.g. if I call 
`someRDD.groupByKey().first()`).  The map-side shuffle data should be 
automatically materialized to disk but I think we'd still pay the cost of 
re-computing the first partition of the post-shuffle stage when transforming 
the full RDD.
    
    Here's a quick experiment in the shell (with `./bin/spark-shell --master 
"local-cluster[2,2,512]"`):
    
    ```scala
    scala> val myRDD = sc.parallelize(1 to 10).flatMap(x => (1 to 10).map(y => 
(x, y)))
    myRDD: org.apache.spark.rdd.RDD[(Int, Int)] = FlatMappedRDD[1] at flatMap 
at <console>:12
    
    scala> val grouped = myRDD.groupByKey(4)
    grouped: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = 
MappedValuesRDD[9] at groupByKey at <console>:14
    
    scala> grouped.first()
    14/07/25 14:14:09 INFO SparkContext: Starting job: first at <console>:17
    14/07/25 14:14:09 INFO DAGScheduler: Registering RDD 1 (flatMap at 
<console>:12)
    14/07/25 14:14:09 INFO DAGScheduler: Got job 3 (first at <console>:17) with 
1 output partitions (allowLocal=true)
    14/07/25 14:14:09 INFO DAGScheduler: Final stage: Stage 6(first at 
<console>:17)
    14/07/25 14:14:09 INFO DAGScheduler: Parents of final stage: List(Stage 7)
    14/07/25 14:14:09 INFO DAGScheduler: Missing parents: List(Stage 7)
    14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 7 (FlatMappedRDD[1] 
at flatMap at <console>:12), which has no missing parents
    14/07/25 14:14:09 INFO DAGScheduler: Submitting 4 missing tasks from Stage 
7 (FlatMappedRDD[1] at flatMap at <console>:12)
    // Run entire map side of the shuffle stage:
    14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 7.0 with 4 tasks
    14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID 
15, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
    14/07/25 14:14:09 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID 
16, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
    14/07/25 14:14:09 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID 
17, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
    14/07/25 14:14:09 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID 
18, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1717 bytes)
    14/07/25 14:14:09 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 
17) in 14 ms on dhcp-47-206.eecs.berkeley.edu (1/4)
    14/07/25 14:14:09 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 
16) in 15 ms on dhcp-47-206.eecs.berkeley.edu (2/4)
    14/07/25 14:14:09 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 
15) in 17 ms on dhcp-47-206.eecs.berkeley.edu (3/4)
    14/07/25 14:14:09 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID 
18) in 16 ms on dhcp-47-206.eecs.berkeley.edu (4/4)
    14/07/25 14:14:09 INFO DAGScheduler: Stage 7 (flatMap at <console>:12) 
finished in 0.018 s
    14/07/25 14:14:09 INFO DAGScheduler: looking for newly runnable stages
    14/07/25 14:14:09 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks 
have all completed, from pool
    14/07/25 14:14:09 INFO DAGScheduler: running: Set()
    14/07/25 14:14:09 INFO DAGScheduler: waiting: Set(Stage 6)
    14/07/25 14:14:09 INFO DAGScheduler: failed: Set()
    14/07/25 14:14:09 INFO DAGScheduler: Missing parents for Stage 6: List()
    14/07/25 14:14:09 INFO DAGScheduler: Submitting Stage 6 (MappedValuesRDD[9] 
at groupByKey at <console>:14), which is now runnable
    // Only ran a single post-shuffle task:
    14/07/25 14:14:09 INFO DAGScheduler: Submitting 1 missing tasks from Stage 
6 (MappedValuesRDD[9] at groupByKey at <console>:14)
    14/07/25 14:14:09 INFO TaskSchedulerImpl: Adding task set 6.0 with 1 tasks
    14/07/25 14:14:09 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 
19, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1721 bytes)
    14/07/25 14:14:09 INFO MapOutputTrackerMasterActor: Asked to send map 
output locations for shuffle 3 to sp...@dhcp-47-206.eecs.berkeley.edu:53826
    14/07/25 14:14:09 INFO MapOutputTrackerMaster: Size of output statuses for 
shuffle 3 is 177 bytes
    14/07/25 14:14:10 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 
19) in 181 ms on dhcp-47-206.eecs.berkeley.edu (1/1)
    14/07/25 14:14:10 INFO DAGScheduler: Stage 6 (first at <console>:17) 
finished in 0.182 s
    14/07/25 14:14:10 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks 
have all completed, from pool
    14/07/25 14:14:10 INFO SparkContext: Job finished: first at <console>:17, 
took 0.208059 s
    res6: (Int, Iterable[Int]) = (4,ArrayBuffer(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
    
    // Demonstration that the map side of the shuffle was automatically cached 
/ materialized:
    scala> grouped.collect().size
    14/07/25 14:16:03 INFO SparkContext: Starting job: collect at <console>:17
    14/07/25 14:16:03 INFO DAGScheduler: Got job 5 (collect at <console>:17) 
with 4 output partitions (allowLocal=false)
    14/07/25 14:16:03 INFO DAGScheduler: Final stage: Stage 10(collect at 
<console>:17)
    14/07/25 14:16:03 INFO DAGScheduler: Parents of final stage: List(Stage 11)
    14/07/25 14:16:03 INFO DAGScheduler: Missing parents: List()
    14/07/25 14:16:03 INFO DAGScheduler: Submitting Stage 10 
(MappedValuesRDD[9] at groupByKey at <console>:14), which has no missing parents
    14/07/25 14:16:03 INFO DAGScheduler: Submitting 4 missing tasks from Stage 
10 (MappedValuesRDD[9] at groupByKey at <console>:14)
    14/07/25 14:16:03 INFO TaskSchedulerImpl: Adding task set 10.0 with 4 tasks
    14/07/25 14:16:03 INFO TaskSetManager: Starting task 0.0 in stage 10.0 (TID 
21, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
    14/07/25 14:16:03 INFO TaskSetManager: Starting task 1.0 in stage 10.0 (TID 
22, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
    14/07/25 14:16:03 INFO TaskSetManager: Starting task 2.0 in stage 10.0 (TID 
23, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
    14/07/25 14:16:03 INFO TaskSetManager: Starting task 3.0 in stage 10.0 (TID 
24, dhcp-47-206.eecs.berkeley.edu, PROCESS_LOCAL, 1710 bytes)
    14/07/25 14:16:03 INFO TaskSetManager: Finished task 2.0 in stage 10.0 (TID 
23) in 26 ms on dhcp-47-206.eecs.berkeley.edu (1/4)
    14/07/25 14:16:03 INFO TaskSetManager: Finished task 0.0 in stage 10.0 (TID 
21) in 30 ms on dhcp-47-206.eecs.berkeley.edu (2/4)
    14/07/25 14:16:03 INFO TaskSetManager: Finished task 1.0 in stage 10.0 (TID 
22) in 32 ms on dhcp-47-206.eecs.berkeley.edu (3/4)
    14/07/25 14:16:03 INFO TaskSetManager: Finished task 3.0 in stage 10.0 (TID 
24) in 34 ms on dhcp-47-206.eecs.berkeley.edu (4/4)
    14/07/25 14:16:03 INFO DAGScheduler: Stage 10 (collect at <console>:17) 
finished in 0.036 s
    14/07/25 14:16:03 INFO TaskSchedulerImpl: Removed TaskSet 10.0, whose tasks 
have all completed, from pool
    14/07/25 14:16:03 INFO SparkContext: Job finished: collect at <console>:17, 
took 0.040781 s
    res8: Int = 10
    ```
    
    In the worst case I think we only re-run one additional post-shuffle task.  
In fact, `first()` is implemented in terms of `take(1)`, which pushes the limit 
of `1` into the post-shuffle stage, so we probably won't run the full task 
anyways.  So, I think it's fine to use `first()` here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to