Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/21698#discussion_r200044231
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -461,9 +464,12 @@ abstract class RDD[T: ClassTag](
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still
distributed
+ val recomputeOnFailure =
+
conf.getBoolean("spark.shuffle.recomputeAllPartitionsOnRepartitionFailure",
true)
new CoalescedRDD(
- new ShuffledRDD[Int, T,
T](mapPartitionsWithIndex(distributePartition),
- new HashPartitioner(numPartitions)),
+ new ShuffledRDD[Int, T, T](
+ mapPartitionsWithIndex(distributePartition, recomputeOnFailure),
--- End diff --
to avoid changing the existing `mapPartitionsWithIndex`, we can create
`MapPartitionsRDD` directly here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]