This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 101dd6b [SPARK-37125][SQL] Support AnsiInterval radix sort 101dd6b is described below commit 101dd6bbff2491a608e1ab51541a120a1f08e942 Author: ulysses-you <ulyssesyo...@gmail.com> AuthorDate: Wed Oct 27 15:31:26 2021 +0800 [SPARK-37125][SQL] Support AnsiInterval radix sort ### What changes were proposed in this pull request? - Make `AnsiInterval` data type support radix sort in SQL. - Enhance the `SortSuite` by disable radix. ### Why are the changes needed? The radix sort is more faster than timsort, the benchmark result can see in `SortBenchmark`. Since the `AnsiInterval` data type is comparable: - `YearMonthIntervalType` -> int ordering - `DayTimeIntervalType` -> long ordering And we aslo support radix sort when the ordering column date type is int or long. So `AnsiInterval` radix sort can be supported. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? - The data correctness should be ensured in `SortSuite` - Add a new benchmark Closes #34398 from ulysses-you/ansi-interval-sort. Authored-by: ulysses-you <ulyssesyo...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/catalyst/expressions/SortOrder.scala | 6 +- .../AnsiIntervalSortBenchmark-jdk11-results.txt | 28 +++++++++ .../AnsiIntervalSortBenchmark-results.txt | 28 +++++++++ .../spark/sql/execution/SortPrefixUtils.scala | 5 +- .../org/apache/spark/sql/execution/SortSuite.scala | 17 +++-- .../benchmark/AnsiIntervalSortBenchmark.scala | 73 ++++++++++++++++++++++ 6 files changed, 146 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 9aef25c..8e6f076 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -132,7 +132,7 @@ object SortOrder { case class SortPrefix(child: SortOrder) extends UnaryExpression { val nullValue = child.child.dataType match { - case BooleanType | DateType | TimestampType | _: IntegralType => + case BooleanType | DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => if (nullAsSmallest) Long.MinValue else Long.MaxValue case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => if (nullAsSmallest) Long.MinValue else Long.MaxValue @@ -154,7 +154,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { private lazy val calcPrefix: Any => Long = child.child.dataType match { case BooleanType => (raw) => if (raw.asInstanceOf[Boolean]) 1 else 0 - case DateType | TimestampType | _: IntegralType => (raw) => + case DateType | TimestampType | _: IntegralType | _: AnsiIntervalType => (raw) => raw.asInstanceOf[java.lang.Number].longValue() case FloatType | DoubleType => (raw) => { val dVal = raw.asInstanceOf[java.lang.Number].doubleValue() @@ -198,7 +198,7 @@ case class SortPrefix(child: SortOrder) extends UnaryExpression { s"$input ? 1L : 0L" case _: IntegralType => s"(long) $input" - case DateType | TimestampType => + case DateType | TimestampType | _: AnsiIntervalType => s"(long) $input" case FloatType | DoubleType => s"$DoublePrefixCmp.computePrefix((double)$input)" diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt new file mode 100644 index 0000000..004d9d8 --- /dev/null +++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-jdk11-results.txt @@ -0,0 +1,28 @@ +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +year month interval one column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------- +year month interval one column enable radix 40092 40744 668 2.5 400.9 1.0X +year month interval one column disable radix 55178 55871 609 1.8 551.8 0.7X + +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +year month interval two columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------- +year month interval two columns enable radix 56855 57911 1497 1.8 568.5 1.0X +year month interval two columns disable radix 58694 59525 774 1.7 586.9 1.0X + +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +day time interval one columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +day time interval one columns enable radix 52460 52564 115 1.9 524.6 1.0X +day time interval one columns disable radix 56617 56967 505 1.8 566.2 0.9X + +OpenJDK 64-Bit Server VM 11.0.13+8-LTS on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8171M CPU @ 2.60GHz +day time interval two columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +day time interval two columns enable radix 57437 60961 770 1.7 574.4 1.0X +day time interval two columns disable radix 59075 60393 1153 1.7 590.7 1.0X + diff --git a/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt b/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt new file mode 100644 index 0000000..f2d8714 --- /dev/null +++ b/sql/core/benchmarks/AnsiIntervalSortBenchmark-results.txt @@ -0,0 +1,28 @@ +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +year month interval one column: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +---------------------------------------------------------------------------------------------------------------------------- +year month interval one column enable radix 32543 33408 895 3.1 325.4 1.0X +year month interval one column disable radix 43452 44715 1124 2.3 434.5 0.7X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +year month interval two columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +----------------------------------------------------------------------------------------------------------------------------- +year month interval two columns enable radix 47072 47193 168 2.1 470.7 1.0X +year month interval two columns disable radix 47212 47230 21 2.1 472.1 1.0X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +day time interval one columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +day time interval one columns enable radix 34811 36177 1207 2.9 348.1 1.0X +day time interval one columns disable radix 47870 50210 NaN 2.1 478.7 0.7X + +OpenJDK 64-Bit Server VM 1.8.0_312-b07 on Linux 5.8.0-1042-azure +Intel(R) Xeon(R) Platinum 8272CL CPU @ 2.60GHz +day time interval two columns: Best Time(ms) Avg Time(ms) Stdev(ms) Rate(M/s) Per Row(ns) Relative +--------------------------------------------------------------------------------------------------------------------------- +day time interval two columns enable radix 50871 52438 NaN 2.0 508.7 1.0X +day time interval two columns disable radix 50320 51250 897 2.0 503.2 1.0X + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala index 2bd5cad..a1b093f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SortPrefixUtils.scala @@ -42,7 +42,8 @@ object SortPrefixUtils { sortOrder.dataType match { case StringType => stringPrefixComparator(sortOrder) case BinaryType => binaryPrefixComparator(sortOrder) - case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType => + case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | TimestampType | + _: AnsiIntervalType => longPrefixComparator(sortOrder) case dt: DecimalType if dt.precision - dt.scale <= Decimal.MAX_LONG_DIGITS => longPrefixComparator(sortOrder) @@ -122,7 +123,7 @@ object SortPrefixUtils { def canSortFullyWithPrefix(sortOrder: SortOrder): Boolean = { sortOrder.dataType match { case BooleanType | ByteType | ShortType | IntegerType | LongType | DateType | - TimestampType | FloatType | DoubleType => + TimestampType | FloatType | DoubleType | _: AnsiIntervalType => true case dt: DecimalType if dt.precision <= Decimal.MAX_LONG_DIGITS => true diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 6a4f3f6..812fdba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -22,6 +22,7 @@ import scala.util.Random import org.apache.spark.AccumulatorSuite import org.apache.spark.sql.{RandomDataGenerator, Row} import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -124,12 +125,16 @@ class SortSuite extends SparkPlanTest with SharedSparkSession { sparkContext.parallelize(Random.shuffle(inputData).map(v => Row(v))), StructType(StructField("a", dataType, nullable = true) :: Nil) ) - checkThatPlansAgree( - inputDf, - p => SortExec(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23), - ReferenceSort(sortOrder, global = true, _: SparkPlan), - sortAnswers = false - ) + Seq(true, false).foreach { enableRadix => + withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> enableRadix.toString) { + checkThatPlansAgree( + inputDf, + p => SortExec(sortOrder, global = true, p: SparkPlan, testSpillFrequency = 23), + ReferenceSort(sortOrder, global = true, _: SparkPlan), + sortAnswers = false + ) + } + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala new file mode 100644 index 0000000..0537527 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/AnsiIntervalSortBenchmark.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.benchmark.Benchmark +import org.apache.spark.sql.internal.SQLConf + +/** + * Benchmark to measure performance for interval sort. + * To run this benchmark: + * {{{ + * 1. without sbt: + * bin/spark-submit --class <this class> --jars <spark core test jar> <sql core test jar> + * 2. build/sbt "sql/test:runMain <this class>" + * 3. generate result: + * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>" + * Results will be written to "benchmarks/IntervalBenchmark-results.txt". + * }}} + */ +object AnsiIntervalSortBenchmark extends SqlBasedBenchmark { + private val numRows = 100 * 1000 * 1000 + + private def radixBenchmark(name: String, cardinality: Long)(f: => Unit): Unit = { + val benchmark = new Benchmark(name, cardinality, output = output) + benchmark.addCase(s"$name enable radix", 3) { _ => + withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> "true") { + f + } + } + + benchmark.addCase(s"$name disable radix", 3) { _ => + withSQLConf(SQLConf.RADIX_SORT_ENABLED.key -> "false") { + f + } + } + benchmark.run() + } + + override def runBenchmarkSuite(mainArgs: Array[String]): Unit = { + val dt = spark.range(numRows).selectExpr("make_dt_interval(id % 24) as c1", "id as c2") + radixBenchmark("year month interval one column", numRows) { + dt.sortWithinPartitions("c1").select("c2").noop() + } + + radixBenchmark("year month interval two columns", numRows) { + dt.sortWithinPartitions("c1", "c2").select("c2").noop() + } + + val ym = spark.range(numRows).selectExpr("make_ym_interval(id % 2000) as c1", "id as c2") + radixBenchmark("day time interval one columns", numRows) { + ym.sortWithinPartitions("c1").select("c2").noop() + } + + radixBenchmark("day time interval two columns", numRows) { + ym.sortWithinPartitions("c1", "c2").select("c2").noop() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org