Github user cloud-fan commented on a diff in the pull request:
https://github.com/apache/spark/pull/22112#discussion_r212632746
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
---
@@ -305,17 +306,19 @@ object ShuffleExchangeExec {
rdd
}
+ // round-robin function is order sensitive if we don't sort the
input.
+ val orderSensitiveFunc = isRoundRobin &&
!SQLConf.get.sortBeforeRepartition
if (needToCopyObjectsBeforeShuffle(part)) {
- newRdd.mapPartitionsInternal { iter =>
+ newRdd.mapPartitionsWithIndexInternal((_, iter) => {
--- End diff --
I agree it's clearer, the problem is we need to build a framework to set
the property of map functions, e.g.
1. FORCE_ORDER: even the input order changed, the output data set and order
will not change. (e.g. sort)
2. SAME_SET: even the input order changed, the output data set will not
change. (e.g. i => i + 1)
3. ORDER_SENSITIVE: if the input order change, the output data set will be
different. (e.g. round robin)
There may be more types, I haven't dug into it yet. Since it's only used
here, maybe not worth to do it now?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]