GitHub user jiangxb1987 opened a pull request:
https://github.com/apache/spark/pull/20393
[SPARK-23207][SQL] Shuffle+Repartition on an RDD/DataFrame could lead to
Data Loss
## What changes were proposed in this pull request?
Currently shuffle repartition uses RoundRobinPartitioning, the generated
result is nondeterministic since the sequence of input rows are not determined.
The bug can be triggered when there is a repartition call following a
shuffle (which would lead to non-deterministic row ordering), as the pattern
shows below:
upstream stage -> repartition stage -> result stage
(-> indicate a shuffle)
When one of the executors process goes down, some tasks on the repartition
stage will be retried and generate inconsistent ordering, and some tasks of the
result stage will be retried generating different data.
The following code returns 931532, instead of 1000000:
```
import scala.sys.process._
import org.apache.spark.TaskContext
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
x
}.repartition(200).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId <
2) {
throw new Exception("pkill -f java".!!)
}
x
}
res.distinct().count()
```
In this PR, we propose a most straight-forward way to fix this problem by
performing a local sort before partitioning, after we make the input row
ordering deterministic, the function from rows to partitions is fully
deterministic too.
The downside of the approach is that with extra local sort inserted, the
performance of repartition() will go down, so we add a new config named
`spark.sql.execution.sortBeforeRepartition` to control whether this patch is
applied. The patch is default enabled to be safe-by-default, but user may
choose to manually turn it off to avoid performance regression.
This patch also changes the output rows ordering of repartition(), that
leads to a bunch of test cases failure because they are comparing the results
directly.
## How was this patch tested?
Add unit test in ExchangeSuite.
With this patch(and `spark.sql.execution.sortBeforeRepartition` set to
true), the following query returns 1000000:
```
import scala.sys.process._
import org.apache.spark.TaskContext
spark.conf.set("spark.sql.execution.sortBeforeRepartition", "true")
val res = spark.range(0, 1000 * 1000, 1).repartition(200).map { x =>
x
}.repartition(200).map { x =>
if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId <
2) {
throw new Exception("pkill -f java".!!)
}
x
}
res.distinct().count()
res7: Long = 1000000
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/jiangxb1987/spark shuffle-repartition
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/20393.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #20393
----
commit 7fd964e964d6385f263b45fc264871d16163772d
Author: Xingbo Jiang <xingbo.jiang@...>
Date: 2018-01-24T22:38:35Z
make RoundRobinPartitioning output deterministic.
----
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]