Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212570426
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
---
@@ -305,17 +306,19 @@ object ShuffleExchangeExec {
rdd
}
+ // round-robin function is order sensitive if we don't sort the
input.
+ val orderSensitiveFunc = isRoundRobin &&
!SQLConf.get.sortBeforeRepartition
if (needToCopyObjectsBeforeShuffle(part)) {
- newRdd.mapPartitionsInternal { iter =>
+ newRdd.mapPartitionsWithIndexInternal((_, iter) => {
--- End diff --
the `newRdd` is always IDEMPOTENT, the problem is `getPartitionKey`, which
is what we are fixing here.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]