Repository: spark
Updated Branches:
  refs/heads/master 4b55482ab -> 84e5da87e


http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala 
b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
index 6fe1079..066d47c 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.util.collection
 
-import java.lang.{Float => JFloat}
+import java.lang.{Float => JFloat, Integer => JInteger}
 import java.util.{Arrays, Comparator}
 
 import org.scalatest.FunSuite
@@ -30,11 +30,15 @@ class SorterSuite extends FunSuite {
     val rand = new XORShiftRandom(123)
     val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() }
     val data1 = data0.clone()
+    val data2 = data0.clone()
 
     Arrays.sort(data0)
     new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, 
Ordering.Int)
+    new Sorter(new KeyReuseIntArraySortDataFormat)
+      .sort(data2, 0, data2.length, Ordering[IntWrapper])
 
-    data0.zip(data1).foreach { case (x, y) => assert(x === y) }
+    assert(data0.view === data1.view)
+    assert(data0.view === data2.view)
   }
 
   test("KVArraySorter") {
@@ -61,10 +65,33 @@ class SorterSuite extends FunSuite {
     }
   }
 
+  /** Runs an experiment several times. */
+  def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: 
() => Unit): Unit = {
+    if (skip) {
+      println(s"Skipped experiment $name.")
+      return
+    }
+
+    val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+    System.gc()
+
+    var i = 0
+    var next10: Long = 0
+    while (i < 10) {
+      val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare))
+      next10 += time
+      println(s"$name: Took $time ms")
+      i += 1
+    }
+
+    println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
+  }
+
   /**
    * This provides a simple benchmark for comparing the Sorter with Java 
internal sorting.
    * Ideally these would be executed one at a time, each in their own JVM, so 
their listing
-   * here is mainly to have the code.
+   * here is mainly to have the code. Running multiple tests within the same 
JVM session would
+   * prevent JIT inlining overridden methods and hence hurt the performance.
    *
    * The goal of this code is to sort an array of key-value pairs, where the 
array physically
    * has the keys and values alternating. The basic Java sorts work only on 
the keys, so the
@@ -72,96 +99,167 @@ class SorterSuite extends FunSuite {
    * those, while the Sorter approach can work directly on the input data 
format.
    *
    * Note that the Java implementation varies tremendously between Java 6 and 
Java 7, when
-   * the Java sort changed from merge sort to Timsort.
+   * the Java sort changed from merge sort to TimSort.
    */
-  ignore("Sorter benchmark") {
-
-    /** Runs an experiment several times. */
-    def runExperiment(name: String)(f: => Unit): Unit = {
-      val firstTry = org.apache.spark.util.Utils.timeIt(1)(f)
-      System.gc()
-
-      var i = 0
-      var next10: Long = 0
-      while (i < 10) {
-        val time = org.apache.spark.util.Utils.timeIt(1)(f)
-        next10 += time
-        println(s"$name: Took $time ms")
-        i += 1
-      }
-
-      println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)")
-    }
-
+  ignore("Sorter benchmark for key-value pairs") {
     val numElements = 25000000 // 25 mil
     val rand = new XORShiftRandom(123)
 
-    val keys = Array.tabulate[JFloat](numElements) { i =>
-      new JFloat(rand.nextFloat())
+    // Test our key-value pairs where each element is a Tuple2[Float, Integer].
+
+    val kvTuples = Array.tabulate(numElements) { i =>
+      (new JFloat(rand.nextFloat()), new JInteger(i))
     }
 
-    // Test our key-value pairs where each element is a Tuple2[Float, Integer)
-    val kvTupleArray = Array.tabulate[AnyRef](numElements) { i =>
-      (keys(i / 2): Float, i / 2: Int)
+    val kvTupleArray = new Array[AnyRef](numElements)
+    val prepareKvTupleArray = () => {
+      System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements)
     }
-    runExperiment("Tuple-sort using Arrays.sort()") {
+    runExperiment("Tuple-sort using Arrays.sort()")({
       Arrays.sort(kvTupleArray, new Comparator[AnyRef] {
         override def compare(x: AnyRef, y: AnyRef): Int =
-          Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, 
y.asInstanceOf[(Float, _)]._1)
+          x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, 
_)]._1)
       })
-    }
+    }, prepareKvTupleArray)
 
     // Test our Sorter where each element alternates between Float and 
Integer, non-primitive
-    val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i =>
-      if (i % 2 == 0) keys(i / 2) else new Integer(i / 2)
+
+    val keyValues = {
+      val data = new Array[AnyRef](numElements * 2)
+      var i = 0
+      while (i < numElements) {
+        data(2 * i) = kvTuples(i)._1
+        data(2 * i + 1) = kvTuples(i)._2
+        i += 1
+      }
+      data
     }
+
+    val keyValueArray = new Array[AnyRef](numElements * 2)
+    val prepareKeyValueArray = () => {
+      System.arraycopy(keyValues, 0, keyValueArray, 0, numElements * 2)
+    }
+
     val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef])
-    runExperiment("KV-sort using Sorter") {
-      sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] {
-        override def compare(x: JFloat, y: JFloat): Int = 
Ordering.Float.compare(x, y)
+    runExperiment("KV-sort using Sorter")({
+      sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] {
+        override def compare(x: JFloat, y: JFloat): Int = x.compareTo(y)
       })
+    }, prepareKeyValueArray)
+  }
+
+  /**
+   * Tests for sorting with primitive keys with/without key reuse. Java's 
Arrays.sort is used as
+   * reference, which is expected to be faster but it can only sort a single 
array. Sorter can be
+   * used to sort parallel arrays.
+   *
+   * Ideally these would be executed one at a time, each in their own JVM, so 
their listing
+   * here is mainly to have the code. Running multiple tests within the same 
JVM session would
+   * prevent JIT inlining overridden methods and hence hurt the performance.
+   */
+  test("Sorter benchmark for primitive int array") {
+    val numElements = 25000000 // 25 mil
+    val rand = new XORShiftRandom(123)
+
+    val ints = Array.fill(numElements)(rand.nextInt())
+    val intObjects = {
+      val data = new Array[JInteger](numElements)
+      var i = 0
+      while (i < numElements) {
+        data(i) = new JInteger(ints(i))
+        i += 1
+      }
+      data
     }
 
-    // Test non-primitive sort on float array
-    runExperiment("Java Arrays.sort()") {
-      Arrays.sort(keys, new Comparator[JFloat] {
-        override def compare(x: JFloat, y: JFloat): Int = 
Ordering.Float.compare(x, y)
-      })
+    val intObjectArray = new Array[JInteger](numElements)
+    val prepareIntObjectArray = () => {
+      System.arraycopy(intObjects, 0, intObjectArray, 0, numElements)
     }
 
-    // Test primitive sort on float array
-    val primitiveKeys = Array.tabulate[Float](numElements) { i => 
rand.nextFloat() }
-    runExperiment("Java Arrays.sort() on primitive keys") {
-      Arrays.sort(primitiveKeys)
+    runExperiment("Java Arrays.sort() on non-primitive int array")({
+      Arrays.sort(intObjectArray, new Comparator[JInteger] {
+        override def compare(x: JInteger, y: JInteger): Int = x.compareTo(y)
+      })
+    }, prepareIntObjectArray)
+
+    val intPrimitiveArray = new Array[Int](numElements)
+    val prepareIntPrimitiveArray = () => {
+      System.arraycopy(ints, 0, intPrimitiveArray, 0, numElements)
     }
-  }
-}
 
+    runExperiment("Java Arrays.sort() on primitive int array")({
+      Arrays.sort(intPrimitiveArray)
+    }, prepareIntPrimitiveArray)
 
-/** Format to sort a simple Array[Int]. Could be easily generified and 
specialized. */
-class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] {
-  override protected def getKey(data: Array[Int], pos: Int): Int = {
-    data(pos)
+    val sorterWithoutKeyReuse = new Sorter(new IntArraySortDataFormat)
+    runExperiment("Sorter without key reuse on primitive int array")({
+      sorterWithoutKeyReuse.sort(intPrimitiveArray, 0, numElements, 
Ordering[Int])
+    }, prepareIntPrimitiveArray)
+
+    val sorterWithKeyReuse = new Sorter(new KeyReuseIntArraySortDataFormat)
+    runExperiment("Sorter with key reuse on primitive int array")({
+      sorterWithKeyReuse.sort(intPrimitiveArray, 0, numElements, 
Ordering[IntWrapper])
+    }, prepareIntPrimitiveArray)
   }
+}
 
-  override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
+abstract class AbstractIntArraySortDataFormat[K] extends SortDataFormat[K, 
Array[Int]] {
+
+  override def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = {
     val tmp = data(pos0)
     data(pos0) = data(pos1)
     data(pos1) = tmp
   }
 
-  override protected def copyElement(src: Array[Int], srcPos: Int, dst: 
Array[Int], dstPos: Int) {
+  override def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], 
dstPos: Int) {
     dst(dstPos) = src(srcPos)
   }
 
   /** Copy a range of elements starting at src(srcPos) to dest, starting at 
destPos. */
-  override protected def copyRange(src: Array[Int], srcPos: Int,
-                                   dst: Array[Int], dstPos: Int, length: Int) {
+  override def copyRange(src: Array[Int], srcPos: Int, dst: Array[Int], 
dstPos: Int, length: Int) {
     System.arraycopy(src, srcPos, dst, dstPos, length)
   }
 
   /** Allocates a new structure that can hold up to 'length' elements. */
-  override protected def allocate(length: Int): Array[Int] = {
+  override def allocate(length: Int): Array[Int] = {
     new Array[Int](length)
   }
 }
+
+/** Format to sort a simple Array[Int]. Could be easily generified and 
specialized. */
+class IntArraySortDataFormat extends AbstractIntArraySortDataFormat[Int] {
+
+  override protected def getKey(data: Array[Int], pos: Int): Int = {
+    data(pos)
+  }
+}
+
+/** Wrapper of Int for key reuse. */
+class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] {
+
+  override def compare(that: IntWrapper): Int = {
+    Ordering.Int.compare(key, that.key)
+  }
+}
+
+/** SortDataFormat for Array[Int] with reused keys. */
+class KeyReuseIntArraySortDataFormat extends 
AbstractIntArraySortDataFormat[IntWrapper] {
+
+  override def newKey(): IntWrapper = {
+    new IntWrapper()
+  }
+
+  override def getKey(data: Array[Int], pos: Int, reuse: IntWrapper): 
IntWrapper = {
+    if (reuse == null) {
+      new IntWrapper(data(pos))
+    } else {
+      reuse.key = data(pos)
+      reuse
+    }
+  }
+
+  override protected def getKey(data: Array[Int], pos: Int): IntWrapper = {
+    getKey(data, pos, null)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c58666a..95152b5 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -53,7 +53,9 @@ object MimaExcludes {
               "org.apache.spark.scheduler.MapStatus"),
             // TaskContext was promoted to Abstract class
             ProblemFilters.exclude[AbstractClassProblem](
-              "org.apache.spark.TaskContext")
+              "org.apache.spark.TaskContext"),
+            ProblemFilters.exclude[IncompatibleTemplateDefProblem](
+              "org.apache.spark.util.collection.SortDataFormat")
           ) ++ Seq(
             // Adding new methods to the JavaRDDLike trait:
             ProblemFilters.exclude[MissingMethodProblem](


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to