asfgit closed pull request #22617: [SPARK-25484][SQL][TEST] Refactor
ExternalAppendOnlyUnsafeRowArrayBenchmark
URL: https://github.com/apache/spark/pull/22617
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
new file mode 100644
index 0000000000000..02c6b72f32216
--- /dev/null
+++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+WITHOUT SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 100000 rows: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 6378 / 6550 16.1
62.3 1.0X
+ExternalAppendOnlyUnsafeRowArray 6196 / 6242 16.5
60.5 1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 11988 / 12027 21.9
45.7 1.0X
+ExternalAppendOnlyUnsafeRowArray 37480 / 37574 7.0
143.0 0.3X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 30000 rows: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer 23536 / 23538 20.9
47.9 1.0X
+ExternalAppendOnlyUnsafeRowArray 31275 / 31277 15.7
63.6 0.8X
+
+
+================================================================================================
+WITH SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter 29241 / 29279 9.0
111.5 1.0X
+ExternalAppendOnlyUnsafeRowArray 14309 / 14313 18.3
54.6 2.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s) Per
Row(ns) Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter 11 / 11 14.8
67.4 1.0X
+ExternalAppendOnlyUnsafeRowArray 9 / 9 17.6
56.8 1.2X
+
+
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 611b2fc037f3d..e174dc6f31a46 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.internal.config
import org.apache.spark.memory.MemoryTestingUtils
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
-object ExternalAppendOnlyUnsafeRowArrayBenchmark {
+/**
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar>
<spark sql test jar>
+ * 2. build/sbt build/sbt ";project sql;set javaOptions
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this
class>"
+ * 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project
sql;set javaOptions
+ * in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this
class>"
+ * Results will be written to
+ * "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
+ * }}}
+ */
+object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
- def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int,
iterations: Int): Unit = {
+ private val conf = new SparkConf(false)
+ // Make the Java serializer write a reset instruction (TC_RESET) after
each object to test
+ // for a bug we had with bytes written past the last object in a batch
(SPARK-2792)
+ .set("spark.serializer.objectStreamReset", "1")
+ .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+
+ private def withFakeTaskContext(f: => Unit): Unit = {
+ val sc = new SparkContext("local", "test", conf)
+ val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+ TaskContext.setTaskContext(taskContext)
+ f
+ sc.stop()
+ }
+
+ private def testRows(numRows: Int): Seq[UnsafeRow] = {
val random = new java.util.Random()
- val rows = (1 to numRows).map(_ => {
+ (1 to numRows).map(_ => {
val row = new UnsafeRow(1)
row.pointTo(new Array[Byte](64), 16)
row.setLong(0, random.nextLong())
row
})
+ }
- val benchmark = new Benchmark(s"Array with $numRows rows", iterations *
numRows)
+ def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int,
iterations: Int): Unit = {
+ val rows = testRows(numRows)
+
+ val benchmark = new Benchmark(s"Array with $numRows rows", iterations *
numRows,
+ output = output)
// Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
// in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}
- val conf = new SparkConf(false)
- // Make the Java serializer write a reset instruction (TC_RESET) after
each object to test
- // for a bug we had with bytes written past the last object in a batch
(SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
- val sc = new SparkContext("local", "test", conf)
- val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
- TaskContext.setTaskContext(taskContext)
- benchmark.run()
- sc.stop()
+ withFakeTaskContext {
+ benchmark.run()
+ }
}
def testAgainstRawUnsafeExternalSorter(
numSpillThreshold: Int,
numRows: Int,
iterations: Int): Unit = {
+ val rows = testRows(numRows)
- val random = new java.util.Random()
- val rows = (1 to numRows).map(_ => {
- val row = new UnsafeRow(1)
- row.pointTo(new Array[Byte](64), 16)
- row.setLong(0, random.nextLong())
- row
- })
-
- val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations *
numRows)
+ val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations *
numRows,
+ output = output)
benchmark.addCase("UnsafeExternalSorter") { _: Int =>
var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
}
}
- val conf = new SparkConf(false)
- // Make the Java serializer write a reset instruction (TC_RESET) after
each object to test
- // for a bug we had with bytes written past the last object in a batch
(SPARK-2792)
- conf.set("spark.serializer.objectStreamReset", "1")
- conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
- val sc = new SparkContext("local", "test", conf)
- val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
- TaskContext.setTaskContext(taskContext)
- benchmark.run()
- sc.stop()
+ withFakeTaskContext {
+ benchmark.run()
+ }
}
- def main(args: Array[String]): Unit = {
-
- //
=========================================================================================
//
- // WITHOUT SPILL
- //
=========================================================================================
//
-
- val spillThreshold = 100 * 1000
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 1000 rows: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- ArrayBuffer 7821 / 7941 33.5
29.8 1.0X
- ExternalAppendOnlyUnsafeRowArray 8798 / 8819 29.8
33.6 0.9X
- */
- testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 30000 rows: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- ArrayBuffer 19200 / 19206 25.6
39.1 1.0X
- ExternalAppendOnlyUnsafeRowArray 19558 / 19562 25.1
39.8 1.0X
- */
- testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Array with 100000 rows: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- ArrayBuffer 5949 / 6028 17.2
58.1 1.0X
- ExternalAppendOnlyUnsafeRowArray 6078 / 6138 16.8
59.4 1.0X
- */
- testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
-
- //
=========================================================================================
//
- // WITH SPILL
- //
=========================================================================================
//
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Spilling with 1000 rows: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- UnsafeExternalSorter 9239 / 9470 28.4
35.2 1.0X
- ExternalAppendOnlyUnsafeRowArray 8857 / 8909 29.6
33.8 1.0X
- */
- testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
-
- /*
- Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
- Spilling with 10000 rows: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
-
------------------------------------------------------------------------------------------------
- UnsafeExternalSorter 4 / 5 39.3
25.5 1.0X
- ExternalAppendOnlyUnsafeRowArray 5 / 6 29.8
33.5 0.8X
- */
- testAgainstRawUnsafeExternalSorter(
-
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 *
1000, 1 << 4)
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ runBenchmark("WITHOUT SPILL") {
+ val spillThreshold = 100 * 1000
+ testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
+ testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
+ testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
+ }
+
+ runBenchmark("WITH SPILL") {
+ testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
+ testAgainstRawUnsafeExternalSorter(
+
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 *
1000, 1 << 4)
+ }
}
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]