Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21698#discussion_r200044231
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -461,9 +464,12 @@ abstract class RDD[T: ClassTag](
           } : Iterator[(Int, T)]
     
           // include a shuffle step so that our upstream tasks are still 
distributed
    +      val recomputeOnFailure =
    +        
conf.getBoolean("spark.shuffle.recomputeAllPartitionsOnRepartitionFailure", 
true)
           new CoalescedRDD(
    -        new ShuffledRDD[Int, T, 
T](mapPartitionsWithIndex(distributePartition),
    -        new HashPartitioner(numPartitions)),
    +        new ShuffledRDD[Int, T, T](
    +          mapPartitionsWithIndex(distributePartition, recomputeOnFailure),
    --- End diff --
    
    to avoid changing the existing `mapPartitionsWithIndex`, we can create 
`MapPartitionsRDD` directly here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to