asfgit closed pull request #22617: [SPARK-25484][SQL][TEST] Refactor 
ExternalAppendOnlyUnsafeRowArrayBenchmark
URL: https://github.com/apache/spark/pull/22617
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt 
b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
new file mode 100644
index 0000000000000..02c6b72f32216
--- /dev/null
+++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+WITHOUT SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                   6378 / 6550         16.1         
 62.3       1.0X
+ExternalAppendOnlyUnsafeRowArray              6196 / 6242         16.5         
 60.5       1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 11988 / 12027         21.9         
 45.7       1.0X
+ExternalAppendOnlyUnsafeRowArray            37480 / 37574          7.0         
143.0       0.3X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 23536 / 23538         20.9         
 47.9       1.0X
+ExternalAppendOnlyUnsafeRowArray            31275 / 31277         15.7         
 63.6       0.8X
+
+
+================================================================================================
+WITH SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                        29241 / 29279          9.0         
111.5       1.0X
+ExternalAppendOnlyUnsafeRowArray            14309 / 14313         18.3         
 54.6       2.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                            11 /   11         14.8         
 67.4       1.0X
+ExternalAppendOnlyUnsafeRowArray                 9 /    9         17.6         
 56.8       1.2X
+
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 611b2fc037f3d..e174dc6f31a46 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
-object ExternalAppendOnlyUnsafeRowArrayBenchmark {
+/**
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> 
<spark sql test jar>
+ *   2. build/sbt build/sbt ";project sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this 
class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project 
sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this 
class>"
+ *      Results will be written to
+ *      "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
+ * }}}
+ */
+object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
 
-  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, 
iterations: Int): Unit = {
+  private val conf = new SparkConf(false)
+    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+    .set("spark.serializer.objectStreamReset", "1")
+    .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+
+  private def withFakeTaskContext(f: => Unit): Unit = {
+    val sc = new SparkContext("local", "test", conf)
+    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+    TaskContext.setTaskContext(taskContext)
+    f
+    sc.stop()
+  }
+
+  private def testRows(numRows: Int): Seq[UnsafeRow] = {
     val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
+    (1 to numRows).map(_ => {
       val row = new UnsafeRow(1)
       row.pointTo(new Array[Byte](64), 16)
       row.setLong(0, random.nextLong())
       row
     })
+  }
 
-    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * 
numRows)
+  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, 
iterations: Int): Unit = {
+    val rows = testRows(numRows)
+
+    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * 
numRows,
+      output = output)
 
     // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
     // in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
-    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
   def testAgainstRawUnsafeExternalSorter(
       numSpillThreshold: Int,
       numRows: Int,
       iterations: Int): Unit = {
+    val rows = testRows(numRows)
 
-    val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
-      val row = new UnsafeRow(1)
-      row.pointTo(new Array[Byte](64), 16)
-      row.setLong(0, random.nextLong())
-      row
-    })
-
-    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * 
numRows)
+    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * 
numRows,
+      output = output)
 
     benchmark.addCase("UnsafeExternalSorter") { _: Int =>
       var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
-    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
-  def main(args: Array[String]): Unit = {
-
-    // 
=========================================================================================
 //
-    // WITHOUT SPILL
-    // 
=========================================================================================
 //
-
-    val spillThreshold = 100 * 1000
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   7821 / 7941         33.5     
     29.8       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8     
     33.6       0.9X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                 19200 / 19206         25.6     
     39.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1     
     39.8       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   5949 / 6028         17.2     
     58.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8     
     59.4       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
-
-    // 
=========================================================================================
 //
-    // WITH SPILL
-    // 
=========================================================================================
 //
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                          9239 / 9470         28.4     
     35.2       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6     
     33.8       1.0X
-    */
-    testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                             4 /    5         39.3     
     25.5       1.0X
-    ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8     
     33.5       0.8X
-    */
-    testAgainstRawUnsafeExternalSorter(
-      
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 
1000, 1 << 4)
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("WITHOUT SPILL") {
+      val spillThreshold = 100 * 1000
+      testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
+      testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
+      testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
+    }
+
+    runBenchmark("WITH SPILL") {
+      testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
+      testAgainstRawUnsafeExternalSorter(
+        
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 
1000, 1 << 4)
+    }
   }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to