This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e6bebb66651 [SPARK-40660][CORE][SQL] Switch to XORShiftRandom to distribute elements e6bebb66651 is described below commit e6bebb66651a1ff06f821bd4ee2b7b52bd532c01 Author: Yuming Wang <yumw...@ebay.com> AuthorDate: Wed Oct 5 14:00:55 2022 +0800 [SPARK-40660][CORE][SQL] Switch to XORShiftRandom to distribute elements ### What changes were proposed in this pull request? This PR replaces `Random(hashing.byteswap32(index))` with `XORShiftRandom(index)` to distribute elements evenly across output partitions. ### Why are the changes needed? It seems that the distribution using `XORShiftRandom` is better. For example: 1. The number of output files has changed since SPARK-40407. [Some downstream projects](https://github.com/apache/iceberg/blob/c07f2aabc0a1d02f068ecf1514d2479c0fbdd3b0/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java#L578-L579) use repartition to determine the number of output files in the test. ``` bin/spark-shell --master "local[2]" spark.range(10).repartition(10).write.mode("overwrite").parquet("/tmp/spark/repartition") ``` Before this PR and after SPARK-40407, the number of output files is 8. After this PR or before SPARK-40407, the number of output files is 10. 2. The distribution using `XORShiftRandom` seem better. ```scala import java.util.Random import org.apache.spark.util.random.XORShiftRandom import scala.util.hashing def distribution(count: Int, partition: Int) = { println((1 to count).map(partitionId => new Random(partitionId).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) println((1 to count).map(partitionId => new Random(hashing.byteswap32(partitionId)).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) println((1 to count).map(partitionId => new XORShiftRandom(partitionId).nextInt(partition)) .groupBy(f => f) .map(_._2.size).mkString(". ")) } distribution(200, 4) ``` The output: ``` 200 50. 60. 46. 44 55. 48. 43. 54 ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. Closes #38106 from wangyum/SPARK-40660. Authored-by: Yuming Wang <yumw...@ebay.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 ++--- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 5 ++--- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 14 +++++++++++++- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 4 files changed, 19 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d12804fc12b..18f3f87f30f 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,7 +25,6 @@ import scala.io.Codec import scala.language.implicitConversions import scala.ref.WeakReference import scala.reflect.{classTag, ClassTag} -import scala.util.hashing import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus import org.apache.hadoop.io.{BytesWritable, NullWritable, Text} @@ -50,7 +49,7 @@ import org.apache.spark.util.Utils import org.apache.spark.util.collection.{ExternalAppendOnlyMap, OpenHashMap, Utils => collectionUtils} import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler, - SamplingUtils} + SamplingUtils, XORShiftRandom} /** * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable, @@ -505,7 +504,7 @@ abstract class RDD[T: ClassTag]( if (shuffle) { /** Distributes elements evenly across output partitions, starting from a random partition. */ val distributePartition = (index: Int, items: Iterator[T]) => { - var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions) + var position = new XORShiftRandom(index).nextInt(numPartitions) items.map { t => // Note that the hash code of the key will just be the key itself. The HashPartitioner // will mod it with the number of total partitions. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 6f287028f74..806a048b244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,11 +17,9 @@ package org.apache.spark.sql.execution.exchange -import java.util.Random import java.util.function.Supplier import scala.concurrent.Future -import scala.util.hashing import org.apache.spark._ import org.apache.spark.internal.config @@ -40,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} +import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -307,7 +306,7 @@ object ShuffleExchangeExec { // end up being almost the same regardless of the index. substantially scrambling the // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() - var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) + var position = new XORShiftRandom(partitionId).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 7dc44b76e03..48b434fbb3a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import java.io.{Externalizable, ObjectInput, ObjectOutput} import java.sql.{Date, Timestamp} +import org.apache.hadoop.fs.{Path, PathFilter} import org.scalatest.Assertions._ import org.scalatest.exceptions.TestFailedException import org.scalatest.prop.TableDrivenPropertyChecks._ @@ -2169,7 +2170,18 @@ class DatasetSuite extends QueryTest test("SPARK-40407: repartition should not result in severe data skew") { val df = spark.range(0, 100, 1, 50).repartition(4) val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() - assert(result.sorted.toSeq === Seq(19, 25, 25, 31)) + assert(result.sorted.toSeq === Seq(23, 25, 25, 27)) + } + + test("SPARK-40660: Switch to XORShiftRandom to distribute elements") { + withTempDir { dir => + spark.range(10).repartition(10).write.mode(SaveMode.Overwrite).parquet(dir.getCanonicalPath) + val fs = new Path(dir.getAbsolutePath).getFileSystem(spark.sessionState.newHadoopConf()) + val parquetFiles = fs.listStatus(new Path(dir.getAbsolutePath), new PathFilter { + override def accept(path: Path): Boolean = path.getName.endsWith("parquet") + }) + assert(parquetFiles.size === 10) + } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fbaa6fea60b..81bce35a584 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [144,72,144,216,144] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6) + // partition size [144,72,144,72,72,144,72] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) } // no skewed partition should be optimized --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org