This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 49c062b  [SPARK-25484][SQL][TEST] Refactor 
ExternalAppendOnlyUnsafeRowArrayBenchmark
49c062b is described below

commit 49c062b2e0487b13b732b18edde105e1f000c20d
Author: Peter Toth <peter.t...@gmail.com>
AuthorDate: Wed Jan 9 09:54:21 2019 -0800

    [SPARK-25484][SQL][TEST] Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark
    
    ## What changes were proposed in this pull request?
    
    Refactor ExternalAppendOnlyUnsafeRowArrayBenchmark to use main method.
    
    ## How was this patch tested?
    
    Manually tested and regenerated results.
    Please note that `spark.memory.debugFill` setting has a huge impact on this 
benchmark. Since it is set to true by default when running the benchmark from 
SBT, we need to disable it:
    ```
    SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project sql;set javaOptions in 
Test += \"-Dspark.memory.debugFill=false\";test:runMain 
org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArrayBenchmark"
    ```
    
    Closes #22617 from peter-toth/SPARK-25484.
    
    Lead-authored-by: Peter Toth <peter.t...@gmail.com>
    Co-authored-by: Peter Toth <pt...@hortonworks.com>
    Co-authored-by: Dongjoon Hyun <dongj...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 ...alAppendOnlyUnsafeRowArrayBenchmark-results.txt |  45 ++++++
 ...ExternalAppendOnlyUnsafeRowArrayBenchmark.scala | 158 ++++++++-------------
 2 files changed, 105 insertions(+), 98 deletions(-)

diff --git 
a/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt 
b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
new file mode 100644
index 0000000..02c6b72
--- /dev/null
+++ b/sql/core/benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt
@@ -0,0 +1,45 @@
+================================================================================================
+WITHOUT SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                   6378 / 6550         16.1         
 62.3       1.0X
+ExternalAppendOnlyUnsafeRowArray              6196 / 6242         16.5         
 60.5       1.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 11988 / 12027         21.9         
 45.7       1.0X
+ExternalAppendOnlyUnsafeRowArray            37480 / 37574          7.0         
143.0       0.3X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+ArrayBuffer                                 23536 / 23538         20.9         
 47.9       1.0X
+ExternalAppendOnlyUnsafeRowArray            31275 / 31277         15.7         
 63.6       0.8X
+
+
+================================================================================================
+WITH SPILL
+================================================================================================
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                        29241 / 29279          9.0         
111.5       1.0X
+ExternalAppendOnlyUnsafeRowArray            14309 / 14313         18.3         
 54.6       2.0X
+
+OpenJDK 64-Bit Server VM 1.8.0_191-b12 on Linux 3.10.0-862.3.2.el7.x86_64
+Intel(R) Xeon(R) CPU E5-2670 v2 @ 2.50GHz
+Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   Per 
Row(ns)   Relative
+------------------------------------------------------------------------------------------------
+UnsafeExternalSorter                            11 /   11         14.8         
 67.4       1.0X
+ExternalAppendOnlyUnsafeRowArray                 9 /    9         17.6         
 56.8       1.2X
+
+
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
index 611b2fc..e174dc6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ExternalAppendOnlyUnsafeRowArrayBenchmark.scala
@@ -20,24 +20,57 @@ package org.apache.spark.sql.execution
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.{SparkConf, SparkContext, SparkEnv, TaskContext}
-import org.apache.spark.benchmark.Benchmark
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
 import org.apache.spark.internal.config
 import org.apache.spark.memory.MemoryTestingUtils
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
 
-object ExternalAppendOnlyUnsafeRowArrayBenchmark {
+/**
+ * Benchmark ExternalAppendOnlyUnsafeRowArray.
+ * To run this benchmark:
+ * {{{
+ *   1. without sbt:
+ *      bin/spark-submit --class <this class> --jars <spark core test jar> 
<spark sql test jar>
+ *   2. build/sbt build/sbt ";project sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this 
class>"
+ *   3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt ";project 
sql;set javaOptions
+ *        in Test += \"-Dspark.memory.debugFill=false\";test:runMain <this 
class>"
+ *      Results will be written to
+ *      "benchmarks/ExternalAppendOnlyUnsafeRowArrayBenchmark-results.txt".
+ * }}}
+ */
+object ExternalAppendOnlyUnsafeRowArrayBenchmark extends BenchmarkBase {
 
-  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, 
iterations: Int): Unit = {
+  private val conf = new SparkConf(false)
+    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
+    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
+    .set("spark.serializer.objectStreamReset", "1")
+    .set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
+
+  private def withFakeTaskContext(f: => Unit): Unit = {
+    val sc = new SparkContext("local", "test", conf)
+    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
+    TaskContext.setTaskContext(taskContext)
+    f
+    sc.stop()
+  }
+
+  private def testRows(numRows: Int): Seq[UnsafeRow] = {
     val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
+    (1 to numRows).map(_ => {
       val row = new UnsafeRow(1)
       row.pointTo(new Array[Byte](64), 16)
       row.setLong(0, random.nextLong())
       row
     })
+  }
 
-    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * 
numRows)
+  def testAgainstRawArrayBuffer(numSpillThreshold: Int, numRows: Int, 
iterations: Int): Unit = {
+    val rows = testRows(numRows)
+
+    val benchmark = new Benchmark(s"Array with $numRows rows", iterations * 
numRows,
+      output = output)
 
     // Internally, `ExternalAppendOnlyUnsafeRowArray` will create an
     // in-memory buffer of size `numSpillThreshold`. This will mimic that
@@ -82,33 +115,19 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
-    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
   def testAgainstRawUnsafeExternalSorter(
       numSpillThreshold: Int,
       numRows: Int,
       iterations: Int): Unit = {
+    val rows = testRows(numRows)
 
-    val random = new java.util.Random()
-    val rows = (1 to numRows).map(_ => {
-      val row = new UnsafeRow(1)
-      row.pointTo(new Array[Byte](64), 16)
-      row.setLong(0, random.nextLong())
-      row
-    })
-
-    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * 
numRows)
+    val benchmark = new Benchmark(s"Spilling with $numRows rows", iterations * 
numRows,
+      output = output)
 
     benchmark.addCase("UnsafeExternalSorter") { _: Int =>
       var sum = 0L
@@ -158,80 +177,23 @@ object ExternalAppendOnlyUnsafeRowArrayBenchmark {
       }
     }
 
-    val conf = new SparkConf(false)
-    // Make the Java serializer write a reset instruction (TC_RESET) after 
each object to test
-    // for a bug we had with bytes written past the last object in a batch 
(SPARK-2792)
-    conf.set("spark.serializer.objectStreamReset", "1")
-    conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
-
-    val sc = new SparkContext("local", "test", conf)
-    val taskContext = MemoryTestingUtils.fakeTaskContext(SparkEnv.get)
-    TaskContext.setTaskContext(taskContext)
-    benchmark.run()
-    sc.stop()
+    withFakeTaskContext {
+      benchmark.run()
+    }
   }
 
-  def main(args: Array[String]): Unit = {
-
-    // 
=========================================================================================
 //
-    // WITHOUT SPILL
-    // 
=========================================================================================
 //
-
-    val spillThreshold = 100 * 1000
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 1000 rows:                    Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   7821 / 7941         33.5     
     29.8       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8798 / 8819         29.8     
     33.6       0.9X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 30000 rows:                   Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                 19200 / 19206         25.6     
     39.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray            19558 / 19562         25.1     
     39.8       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Array with 100000 rows:                  Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    ArrayBuffer                                   5949 / 6028         17.2     
     58.1       1.0X
-    ExternalAppendOnlyUnsafeRowArray              6078 / 6138         16.8     
     59.4       1.0X
-    */
-    testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
-
-    // 
=========================================================================================
 //
-    // WITH SPILL
-    // 
=========================================================================================
 //
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 1000 rows:                 Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                          9239 / 9470         28.4     
     35.2       1.0X
-    ExternalAppendOnlyUnsafeRowArray              8857 / 8909         29.6     
     33.8       1.0X
-    */
-    testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
-
-    /*
-    Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz
-
-    Spilling with 10000 rows:                Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
-    
------------------------------------------------------------------------------------------------
-    UnsafeExternalSorter                             4 /    5         39.3     
     25.5       1.0X
-    ExternalAppendOnlyUnsafeRowArray                 5 /    6         29.8     
     33.5       0.8X
-    */
-    testAgainstRawUnsafeExternalSorter(
-      
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 
1000, 1 << 4)
+  override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+    runBenchmark("WITHOUT SPILL") {
+      val spillThreshold = 100 * 1000
+      testAgainstRawArrayBuffer(spillThreshold, 100 * 1000, 1 << 10)
+      testAgainstRawArrayBuffer(spillThreshold, 1000, 1 << 18)
+      testAgainstRawArrayBuffer(spillThreshold, 30 * 1000, 1 << 14)
+    }
+
+    runBenchmark("WITH SPILL") {
+      testAgainstRawUnsafeExternalSorter(100 * 1000, 1000, 1 << 18)
+      testAgainstRawUnsafeExternalSorter(
+        
config.SHUFFLE_SPILL_NUM_ELEMENTS_FORCE_SPILL_THRESHOLD.defaultValue.get, 10 * 
1000, 1 << 4)
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to