lucasbru opened a new pull request #30262:
URL: https://github.com/apache/spark/pull/30262


   The current implementation of the `DAGScheduler` exhibits exponential 
runtime in DAGs with many `PartitionerAwareUnions`. The reason seems to be a 
mutual recursion between
   `PartitionerAwareUnion.getPreferredLocations` and 
`DAGScheduler.getPreferredLocs`.
   
   A minimal example reproducing the issue:
   
   ```
   object Example extends App {
     val partitioner = new HashPartitioner(2)
     val sc = new SparkContext(new 
SparkConf().setAppName("").setMaster("local[*]"))
     val rdd1 = sc.emptyRDD[(Int, Int)].partitionBy(partitioner)
     val rdd2 = (1 to 30).map(_ => rdd1)
     val rdd3 = rdd2.reduce(_ union _)
     rdd3.collect()
   }
   ```
   
   The whole app should take around one second to complete, as no actual work 
is done. However,  it takes more time to submit the job than I am willing to 
wait. 
   
   The underlying cause appears to be mutual recursion between 
`PartitionerAwareUnion.getPreferredLocations` and 
`DAGScheduler.getPreferredLocs`, which restarts graph traversal at each 
`PartitionerAwareUnion` with no memoization. Each node of the DAG is visited 
`O(n!)` (exponentially many) times.
   
   In the pull-request I propose to call rdd.getPreferredLocations instead of 
DAGScheduler.getPreferredLocs inside PartitionerAwareUnion.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to