Repository: spark Updated Branches: refs/heads/master 4b55482ab -> 84e5da87e
http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala index 6fe1079..066d47c 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/SorterSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import java.lang.{Float => JFloat} +import java.lang.{Float => JFloat, Integer => JInteger} import java.util.{Arrays, Comparator} import org.scalatest.FunSuite @@ -30,11 +30,15 @@ class SorterSuite extends FunSuite { val rand = new XORShiftRandom(123) val data0 = Array.tabulate[Int](10000) { i => rand.nextInt() } val data1 = data0.clone() + val data2 = data0.clone() Arrays.sort(data0) new Sorter(new IntArraySortDataFormat).sort(data1, 0, data1.length, Ordering.Int) + new Sorter(new KeyReuseIntArraySortDataFormat) + .sort(data2, 0, data2.length, Ordering[IntWrapper]) - data0.zip(data1).foreach { case (x, y) => assert(x === y) } + assert(data0.view === data1.view) + assert(data0.view === data2.view) } test("KVArraySorter") { @@ -61,10 +65,33 @@ class SorterSuite extends FunSuite { } } + /** Runs an experiment several times. */ + def runExperiment(name: String, skip: Boolean = false)(f: => Unit, prepare: () => Unit): Unit = { + if (skip) { + println(s"Skipped experiment $name.") + return + } + + val firstTry = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) + System.gc() + + var i = 0 + var next10: Long = 0 + while (i < 10) { + val time = org.apache.spark.util.Utils.timeIt(1)(f, Some(prepare)) + next10 += time + println(s"$name: Took $time ms") + i += 1 + } + + println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)") + } + /** * This provides a simple benchmark for comparing the Sorter with Java internal sorting. * Ideally these would be executed one at a time, each in their own JVM, so their listing - * here is mainly to have the code. + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. * * The goal of this code is to sort an array of key-value pairs, where the array physically * has the keys and values alternating. The basic Java sorts work only on the keys, so the @@ -72,96 +99,167 @@ class SorterSuite extends FunSuite { * those, while the Sorter approach can work directly on the input data format. * * Note that the Java implementation varies tremendously between Java 6 and Java 7, when - * the Java sort changed from merge sort to Timsort. + * the Java sort changed from merge sort to TimSort. */ - ignore("Sorter benchmark") { - - /** Runs an experiment several times. */ - def runExperiment(name: String)(f: => Unit): Unit = { - val firstTry = org.apache.spark.util.Utils.timeIt(1)(f) - System.gc() - - var i = 0 - var next10: Long = 0 - while (i < 10) { - val time = org.apache.spark.util.Utils.timeIt(1)(f) - next10 += time - println(s"$name: Took $time ms") - i += 1 - } - - println(s"$name: ($firstTry ms first try, ${next10 / 10} ms average)") - } - + ignore("Sorter benchmark for key-value pairs") { val numElements = 25000000 // 25 mil val rand = new XORShiftRandom(123) - val keys = Array.tabulate[JFloat](numElements) { i => - new JFloat(rand.nextFloat()) + // Test our key-value pairs where each element is a Tuple2[Float, Integer]. + + val kvTuples = Array.tabulate(numElements) { i => + (new JFloat(rand.nextFloat()), new JInteger(i)) } - // Test our key-value pairs where each element is a Tuple2[Float, Integer) - val kvTupleArray = Array.tabulate[AnyRef](numElements) { i => - (keys(i / 2): Float, i / 2: Int) + val kvTupleArray = new Array[AnyRef](numElements) + val prepareKvTupleArray = () => { + System.arraycopy(kvTuples, 0, kvTupleArray, 0, numElements) } - runExperiment("Tuple-sort using Arrays.sort()") { + runExperiment("Tuple-sort using Arrays.sort()")({ Arrays.sort(kvTupleArray, new Comparator[AnyRef] { override def compare(x: AnyRef, y: AnyRef): Int = - Ordering.Float.compare(x.asInstanceOf[(Float, _)]._1, y.asInstanceOf[(Float, _)]._1) + x.asInstanceOf[(JFloat, _)]._1.compareTo(y.asInstanceOf[(JFloat, _)]._1) }) - } + }, prepareKvTupleArray) // Test our Sorter where each element alternates between Float and Integer, non-primitive - val keyValueArray = Array.tabulate[AnyRef](numElements * 2) { i => - if (i % 2 == 0) keys(i / 2) else new Integer(i / 2) + + val keyValues = { + val data = new Array[AnyRef](numElements * 2) + var i = 0 + while (i < numElements) { + data(2 * i) = kvTuples(i)._1 + data(2 * i + 1) = kvTuples(i)._2 + i += 1 + } + data } + + val keyValueArray = new Array[AnyRef](numElements * 2) + val prepareKeyValueArray = () => { + System.arraycopy(keyValues, 0, keyValueArray, 0, numElements * 2) + } + val sorter = new Sorter(new KVArraySortDataFormat[JFloat, AnyRef]) - runExperiment("KV-sort using Sorter") { - sorter.sort(keyValueArray, 0, keys.length, new Comparator[JFloat] { - override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) + runExperiment("KV-sort using Sorter")({ + sorter.sort(keyValueArray, 0, numElements, new Comparator[JFloat] { + override def compare(x: JFloat, y: JFloat): Int = x.compareTo(y) }) + }, prepareKeyValueArray) + } + + /** + * Tests for sorting with primitive keys with/without key reuse. Java's Arrays.sort is used as + * reference, which is expected to be faster but it can only sort a single array. Sorter can be + * used to sort parallel arrays. + * + * Ideally these would be executed one at a time, each in their own JVM, so their listing + * here is mainly to have the code. Running multiple tests within the same JVM session would + * prevent JIT inlining overridden methods and hence hurt the performance. + */ + test("Sorter benchmark for primitive int array") { + val numElements = 25000000 // 25 mil + val rand = new XORShiftRandom(123) + + val ints = Array.fill(numElements)(rand.nextInt()) + val intObjects = { + val data = new Array[JInteger](numElements) + var i = 0 + while (i < numElements) { + data(i) = new JInteger(ints(i)) + i += 1 + } + data } - // Test non-primitive sort on float array - runExperiment("Java Arrays.sort()") { - Arrays.sort(keys, new Comparator[JFloat] { - override def compare(x: JFloat, y: JFloat): Int = Ordering.Float.compare(x, y) - }) + val intObjectArray = new Array[JInteger](numElements) + val prepareIntObjectArray = () => { + System.arraycopy(intObjects, 0, intObjectArray, 0, numElements) } - // Test primitive sort on float array - val primitiveKeys = Array.tabulate[Float](numElements) { i => rand.nextFloat() } - runExperiment("Java Arrays.sort() on primitive keys") { - Arrays.sort(primitiveKeys) + runExperiment("Java Arrays.sort() on non-primitive int array")({ + Arrays.sort(intObjectArray, new Comparator[JInteger] { + override def compare(x: JInteger, y: JInteger): Int = x.compareTo(y) + }) + }, prepareIntObjectArray) + + val intPrimitiveArray = new Array[Int](numElements) + val prepareIntPrimitiveArray = () => { + System.arraycopy(ints, 0, intPrimitiveArray, 0, numElements) } - } -} + runExperiment("Java Arrays.sort() on primitive int array")({ + Arrays.sort(intPrimitiveArray) + }, prepareIntPrimitiveArray) -/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */ -class IntArraySortDataFormat extends SortDataFormat[Int, Array[Int]] { - override protected def getKey(data: Array[Int], pos: Int): Int = { - data(pos) + val sorterWithoutKeyReuse = new Sorter(new IntArraySortDataFormat) + runExperiment("Sorter without key reuse on primitive int array")({ + sorterWithoutKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[Int]) + }, prepareIntPrimitiveArray) + + val sorterWithKeyReuse = new Sorter(new KeyReuseIntArraySortDataFormat) + runExperiment("Sorter with key reuse on primitive int array")({ + sorterWithKeyReuse.sort(intPrimitiveArray, 0, numElements, Ordering[IntWrapper]) + }, prepareIntPrimitiveArray) } +} - override protected def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = { +abstract class AbstractIntArraySortDataFormat[K] extends SortDataFormat[K, Array[Int]] { + + override def swap(data: Array[Int], pos0: Int, pos1: Int): Unit = { val tmp = data(pos0) data(pos0) = data(pos1) data(pos1) = tmp } - override protected def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) { + override def copyElement(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int) { dst(dstPos) = src(srcPos) } /** Copy a range of elements starting at src(srcPos) to dest, starting at destPos. */ - override protected def copyRange(src: Array[Int], srcPos: Int, - dst: Array[Int], dstPos: Int, length: Int) { + override def copyRange(src: Array[Int], srcPos: Int, dst: Array[Int], dstPos: Int, length: Int) { System.arraycopy(src, srcPos, dst, dstPos, length) } /** Allocates a new structure that can hold up to 'length' elements. */ - override protected def allocate(length: Int): Array[Int] = { + override def allocate(length: Int): Array[Int] = { new Array[Int](length) } } + +/** Format to sort a simple Array[Int]. Could be easily generified and specialized. */ +class IntArraySortDataFormat extends AbstractIntArraySortDataFormat[Int] { + + override protected def getKey(data: Array[Int], pos: Int): Int = { + data(pos) + } +} + +/** Wrapper of Int for key reuse. */ +class IntWrapper(var key: Int = 0) extends Ordered[IntWrapper] { + + override def compare(that: IntWrapper): Int = { + Ordering.Int.compare(key, that.key) + } +} + +/** SortDataFormat for Array[Int] with reused keys. */ +class KeyReuseIntArraySortDataFormat extends AbstractIntArraySortDataFormat[IntWrapper] { + + override def newKey(): IntWrapper = { + new IntWrapper() + } + + override def getKey(data: Array[Int], pos: Int, reuse: IntWrapper): IntWrapper = { + if (reuse == null) { + new IntWrapper(data(pos)) + } else { + reuse.key = data(pos) + reuse + } + } + + override protected def getKey(data: Array[Int], pos: Int): IntWrapper = { + getKey(data, pos, null) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/84e5da87/project/MimaExcludes.scala ---------------------------------------------------------------------- diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c58666a..95152b5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -53,7 +53,9 @@ object MimaExcludes { "org.apache.spark.scheduler.MapStatus"), // TaskContext was promoted to Abstract class ProblemFilters.exclude[AbstractClassProblem]( - "org.apache.spark.TaskContext") + "org.apache.spark.TaskContext"), + ProblemFilters.exclude[IncompatibleTemplateDefProblem]( + "org.apache.spark.util.collection.SortDataFormat") ) ++ Seq( // Adding new methods to the JavaRDDLike trait: ProblemFilters.exclude[MissingMethodProblem]( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org