Repository: spark
Updated Branches:
  refs/heads/master e789000b8 -> 2117eea71


[SPARK-10710] Remove ability to disable spilling in core and SQL

It does not make much sense to set `spark.shuffle.spill` or 
`spark.sql.planner.externalSort` to false: I believe that these configurations 
were initially added as "escape hatches" to guard against bugs in the external 
operators, but these operators are now mature and well-tested. In addition, 
these configurations are not handled in a consistent way anymore: SQL's 
Tungsten codepath ignores these configurations and will continue to use 
spilling operators. Similarly, Spark Core's `tungsten-sort` shuffle manager 
does not respect `spark.shuffle.spill=false`.

This pull request removes these configurations, adds warnings at the 
appropriate places, and deletes a large amount of code which was only used in 
code paths that did not support spilling.

Author: Josh Rosen <joshro...@databricks.com>

Closes #8831 from JoshRosen/remove-ability-to-disable-spilling.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2117eea7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2117eea7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2117eea7

Branch: refs/heads/master
Commit: 2117eea71ece825fbc3797c8b38184ae221f5223
Parents: e789000
Author: Josh Rosen <joshro...@databricks.com>
Authored: Sat Sep 19 21:40:21 2015 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Sep 19 21:40:21 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/Aggregator.scala     | 59 +++++---------------
 .../org/apache/spark/rdd/CoGroupedRDD.scala     | 40 ++++---------
 .../spark/shuffle/hash/HashShuffleManager.scala |  8 ++-
 .../spark/shuffle/sort/SortShuffleManager.scala | 10 +++-
 .../spark/util/collection/ExternalSorter.scala  |  6 --
 .../apache/spark/deploy/SparkSubmitSuite.scala  | 22 ++++----
 docs/configuration.md                           | 14 +----
 docs/sql-programming-guide.md                   |  7 ---
 python/pyspark/rdd.py                           | 25 +++------
 python/pyspark/shuffle.py                       | 30 ----------
 python/pyspark/tests.py                         | 13 +----
 .../scala/org/apache/spark/sql/SQLConf.scala    |  8 +--
 .../spark/sql/execution/SparkStrategies.scala   |  2 -
 .../apache/spark/sql/execution/commands.scala   |  9 +++
 .../org/apache/spark/sql/execution/sort.scala   | 30 +---------
 .../org/apache/spark/sql/SQLQuerySuite.scala    | 26 ++-------
 .../execution/RowFormatConvertersSuite.scala    |  2 +-
 .../apache/spark/sql/execution/SortSuite.scala  |  4 +-
 18 files changed, 81 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/main/scala/org/apache/spark/Aggregator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala 
b/core/src/main/scala/org/apache/spark/Aggregator.scala
index 289aab9..7196e57 100644
--- a/core/src/main/scala/org/apache/spark/Aggregator.scala
+++ b/core/src/main/scala/org/apache/spark/Aggregator.scala
@@ -18,7 +18,7 @@
 package org.apache.spark
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{AppendOnlyMap, ExternalAppendOnlyMap}
+import org.apache.spark.util.collection.ExternalAppendOnlyMap
 
 /**
  * :: DeveloperApi ::
@@ -34,59 +34,30 @@ case class Aggregator[K, V, C] (
     mergeValue: (C, V) => C,
     mergeCombiners: (C, C) => C) {
 
-  // When spilling is enabled sorting will happen externally, but not 
necessarily with an
-  // ExternalSorter.
-  private val isSpillEnabled = 
SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
-
   @deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
   def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, 
C)] =
     combineValuesByKey(iter, null)
 
-  def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
-                         context: TaskContext): Iterator[(K, C)] = {
-    if (!isSpillEnabled) {
-      val combiners = new AppendOnlyMap[K, C]
-      var kv: Product2[K, V] = null
-      val update = (hadValue: Boolean, oldValue: C) => {
-        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
-      }
-      while (iter.hasNext) {
-        kv = iter.next()
-        combiners.changeValue(kv._1, update)
-      }
-      combiners.iterator
-    } else {
-      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, 
mergeValue, mergeCombiners)
-      combiners.insertAll(iter)
-      updateMetrics(context, combiners)
-      combiners.iterator
-    }
+  def combineValuesByKey(
+      iter: Iterator[_ <: Product2[K, V]],
+      context: TaskContext): Iterator[(K, C)] = {
+    val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, 
mergeValue, mergeCombiners)
+    combiners.insertAll(iter)
+    updateMetrics(context, combiners)
+    combiners.iterator
   }
 
   @deprecated("use combineCombinersByKey with TaskContext argument", "0.9.0")
   def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]]) : 
Iterator[(K, C)] =
     combineCombinersByKey(iter, null)
 
-  def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: 
TaskContext)
-    : Iterator[(K, C)] =
-  {
-    if (!isSpillEnabled) {
-      val combiners = new AppendOnlyMap[K, C]
-      var kc: Product2[K, C] = null
-      val update = (hadValue: Boolean, oldValue: C) => {
-        if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2
-      }
-      while (iter.hasNext) {
-        kc = iter.next()
-        combiners.changeValue(kc._1, update)
-      }
-      combiners.iterator
-    } else {
-      val combiners = new ExternalAppendOnlyMap[K, C, C](identity, 
mergeCombiners, mergeCombiners)
-      combiners.insertAll(iter)
-      updateMetrics(context, combiners)
-      combiners.iterator
-    }
+  def combineCombinersByKey(
+      iter: Iterator[_ <: Product2[K, C]],
+      context: TaskContext): Iterator[(K, C)] = {
+    val combiners = new ExternalAppendOnlyMap[K, C, C](identity, 
mergeCombiners, mergeCombiners)
+    combiners.insertAll(iter)
+    updateMetrics(context, combiners)
+    combiners.iterator
   }
 
   /** Update task metrics after populating the external map. */

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
index 7bad749..935c3ba 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoGroupedRDD.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark._
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.collection.{ExternalAppendOnlyMap, AppendOnlyMap, 
CompactBuffer}
+import org.apache.spark.util.collection.{CompactBuffer, ExternalAppendOnlyMap}
 import org.apache.spark.util.Utils
 import org.apache.spark.serializer.Serializer
 
@@ -128,8 +128,6 @@ class CoGroupedRDD[K: ClassTag](
   override val partitioner: Some[Partitioner] = Some(part)
 
   override def compute(s: Partition, context: TaskContext): Iterator[(K, 
Array[Iterable[_]])] = {
-    val sparkConf = SparkEnv.get.conf
-    val externalSorting = sparkConf.getBoolean("spark.shuffle.spill", true)
     val split = s.asInstanceOf[CoGroupPartition]
     val numRdds = dependencies.length
 
@@ -150,34 +148,16 @@ class CoGroupedRDD[K: ClassTag](
         rddIterators += ((it, depNum))
     }
 
-    if (!externalSorting) {
-      val map = new AppendOnlyMap[K, CoGroupCombiner]
-      val update: (Boolean, CoGroupCombiner) => CoGroupCombiner = (hadVal, 
oldVal) => {
-        if (hadVal) oldVal else Array.fill(numRdds)(new CoGroup)
-      }
-      val getCombiner: K => CoGroupCombiner = key => {
-        map.changeValue(key, update)
-      }
-      rddIterators.foreach { case (it, depNum) =>
-        while (it.hasNext) {
-          val kv = it.next()
-          getCombiner(kv._1)(depNum) += kv._2
-        }
-      }
-      new InterruptibleIterator(context,
-        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
-    } else {
-      val map = createExternalMap(numRdds)
-      for ((it, depNum) <- rddIterators) {
-        map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, 
depNum))))
-      }
-      context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
-      context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
-      context.internalMetricsToAccumulators(
-        InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
-      new InterruptibleIterator(context,
-        map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
+    val map = createExternalMap(numRdds)
+    for ((it, depNum) <- rddIterators) {
+      map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, 
depNum))))
     }
+    context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
+    context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
+    context.internalMetricsToAccumulators(
+      InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
+    new InterruptibleIterator(context,
+      map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
   }
 
   private def createExternalMap(numRdds: Int)

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
index c089088..0b46634 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleManager.scala
@@ -24,7 +24,13 @@ import org.apache.spark.shuffle._
  * A ShuffleManager using hashing, that creates one output file per reduce 
partition on each
  * mapper (possibly reusing these across waves of tasks).
  */
-private[spark] class HashShuffleManager(conf: SparkConf) extends 
ShuffleManager {
+private[spark] class HashShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
+
+  if (!conf.getBoolean("spark.shuffle.spill", true)) {
+    logWarning(
+      "spark.shuffle.spill was set to false, but this configuration is ignored 
as of Spark 1.6+." +
+        " Shuffle will continue to spill to disk when necessary.")
+  }
 
   private val fileShuffleBlockResolver = new FileShuffleBlockResolver(conf)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index d7fab35..476cc1f 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -19,11 +19,17 @@ package org.apache.spark.shuffle.sort
 
 import java.util.concurrent.ConcurrentHashMap
 
-import org.apache.spark.{SparkConf, TaskContext, ShuffleDependency}
+import org.apache.spark.{Logging, SparkConf, TaskContext, ShuffleDependency}
 import org.apache.spark.shuffle._
 import org.apache.spark.shuffle.hash.HashShuffleReader
 
-private[spark] class SortShuffleManager(conf: SparkConf) extends 
ShuffleManager {
+private[spark] class SortShuffleManager(conf: SparkConf) extends 
ShuffleManager with Logging {
+
+  if (!conf.getBoolean("spark.shuffle.spill", true)) {
+    logWarning(
+      "spark.shuffle.spill was set to false, but this configuration is ignored 
as of Spark 1.6+." +
+        " Shuffle will continue to spill to disk when necessary.")
+  }
 
   private val indexShuffleBlockResolver = new IndexShuffleBlockResolver(conf)
   private val shuffleMapNumber = new ConcurrentHashMap[Int, Int]()

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 31230d5..2a30f75 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -116,8 +116,6 @@ private[spark] class ExternalSorter[K, V, C](
   private val ser = Serializer.getSerializer(serializer)
   private val serInstance = ser.newInstance()
 
-  private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
-
   // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
   private val fileBufferSize = conf.getSizeAsKb("spark.shuffle.file.buffer", 
"32k").toInt * 1024
 
@@ -229,10 +227,6 @@ private[spark] class ExternalSorter[K, V, C](
    * @param usingMap whether we're using a map or buffer as our current 
in-memory collection
    */
   private def maybeSpillCollection(usingMap: Boolean): Unit = {
-    if (!spillingEnabled) {
-      return
-    }
-
     var estimatedSize = 0L
     if (usingMap) {
       estimatedSize = map.estimateSize()

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 1110ca6..1fd470c 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -147,7 +147,7 @@ class SparkSubmitSuite
       "--archives", "archive1.txt,archive2.txt",
       "--num-executors", "6",
       "--name", "beauty",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -166,7 +166,7 @@ class SparkSubmitSuite
     mainClass should be ("org.apache.spark.deploy.yarn.Client")
     classpath should have length (0)
     sysProps("spark.app.name") should be ("beauty")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
     sysProps("SPARK_SUBMIT") should be ("true")
     sysProps.keys should not contain ("spark.jars")
   }
@@ -185,7 +185,7 @@ class SparkSubmitSuite
       "--archives", "archive1.txt,archive2.txt",
       "--num-executors", "6",
       "--name", "trill",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -206,7 +206,7 @@ class SparkSubmitSuite
     sysProps("spark.yarn.dist.archives") should include regex 
(".*archive1.txt,.*archive2.txt")
     sysProps("spark.jars") should include regex 
(".*one.jar,.*two.jar,.*three.jar,.*thejar.jar")
     sysProps("SPARK_SUBMIT") should be ("true")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles standalone cluster mode") {
@@ -229,7 +229,7 @@ class SparkSubmitSuite
       "--supervise",
       "--driver-memory", "4g",
       "--driver-cores", "5",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -253,9 +253,9 @@ class SparkSubmitSuite
     sysProps.keys should contain ("spark.driver.memory")
     sysProps.keys should contain ("spark.driver.cores")
     sysProps.keys should contain ("spark.driver.supervise")
-    sysProps.keys should contain ("spark.shuffle.spill")
+    sysProps.keys should contain ("spark.ui.enabled")
     sysProps.keys should contain ("spark.submit.deployMode")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles standalone client mode") {
@@ -266,7 +266,7 @@ class SparkSubmitSuite
       "--total-executor-cores", "5",
       "--class", "org.SomeClass",
       "--driver-memory", "4g",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -277,7 +277,7 @@ class SparkSubmitSuite
     classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles mesos client mode") {
@@ -288,7 +288,7 @@ class SparkSubmitSuite
       "--total-executor-cores", "5",
       "--class", "org.SomeClass",
       "--driver-memory", "4g",
-      "--conf", "spark.shuffle.spill=false",
+      "--conf", "spark.ui.enabled=false",
       "thejar.jar",
       "arg1", "arg2")
     val appArgs = new SparkSubmitArguments(clArgs)
@@ -299,7 +299,7 @@ class SparkSubmitSuite
     classpath(0) should endWith ("thejar.jar")
     sysProps("spark.executor.memory") should be ("5g")
     sysProps("spark.cores.max") should be ("5")
-    sysProps("spark.shuffle.spill") should be ("false")
+    sysProps("spark.ui.enabled") should be ("false")
   }
 
   test("handles confs with flag equivalents") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/docs/configuration.md
----------------------------------------------------------------------
diff --git a/docs/configuration.md b/docs/configuration.md
index 3700051..5ec097c 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -69,7 +69,7 @@ val sc = new SparkContext(new SparkConf())
 
 Then, you can supply configuration values at runtime:
 {% highlight bash %}
-./bin/spark-submit --name "My app" --master local[4] --conf 
spark.shuffle.spill=false
+./bin/spark-submit --name "My app" --master local[4] --conf 
spark.eventLog.enabled=false
   --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails 
-XX:+PrintGCTimeStamps" myApp.jar
 {% endhighlight %}
 
@@ -449,8 +449,8 @@ Apart from these, the following properties are also 
available, and may be useful
   <td><code>spark.shuffle.memoryFraction</code></td>
   <td>0.2</td>
   <td>
-    Fraction of Java heap to use for aggregation and cogroups during shuffles, 
if
-    <code>spark.shuffle.spill</code> is true. At any given time, the 
collective size of
+    Fraction of Java heap to use for aggregation and cogroups during shuffles.
+    At any given time, the collective size of
     all in-memory maps used for shuffles is bounded by this limit, beyond 
which the contents will
     begin to spill to disk. If spills are often, consider increasing this 
value at the expense of
     <code>spark.storage.memoryFraction</code>.
@@ -484,14 +484,6 @@ Apart from these, the following properties are also 
available, and may be useful
   </td>
 </tr>
 <tr>
-  <td><code>spark.shuffle.spill</code></td>
-  <td>true</td>
-  <td>
-    If set to "true", limits the amount of memory used during reduces by 
spilling data out to disk.
-    This spilling threshold is specified by 
<code>spark.shuffle.memoryFraction</code>.
-  </td>
-</tr>
-<tr>
   <td><code>spark.shuffle.spill.compress</code></td>
   <td>true</td>
   <td>

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 82d4243..7ae9244 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1936,13 +1936,6 @@ that these options will be deprecated in future release 
as more optimizations ar
       Configures the number of partitions to use when shuffling data for joins 
or aggregations.
     </td>
   </tr>
-  <tr>
-    <td><code>spark.sql.planner.externalSort</code></td>
-    <td>true</td>
-    <td>
-      When true, performs sorts spilling to disk as needed otherwise sort each 
partition in memory.
-    </td>
-  </tr>
 </table>
 
 # Distributed SQL Engine

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index ab5aab1..73d7d9a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -48,7 +48,7 @@ from pyspark.statcounter import StatCounter
 from pyspark.rddsampler import RDDSampler, RDDRangeSampler, 
RDDStratifiedSampler
 from pyspark.storagelevel import StorageLevel
 from pyspark.resultiterable import ResultIterable
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
+from pyspark.shuffle import Aggregator, ExternalMerger, \
     get_used_memory, ExternalSorter, ExternalGroupBy
 from pyspark.traceback_utils import SCCallSiteSync
 
@@ -580,12 +580,11 @@ class RDD(object):
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
 
-        spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 
"true")
         memory = 
_parse_memory(self.ctx._conf.get("spark.python.worker.memory", "512m"))
         serializer = self._jrdd_deserializer
 
         def sortPartition(iterator):
-            sort = ExternalSorter(memory * 0.9, serializer).sorted if spill 
else sorted
+            sort = ExternalSorter(memory * 0.9, serializer).sorted
             return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), 
reverse=(not ascending)))
 
         return self.partitionBy(numPartitions, 
partitionFunc).mapPartitions(sortPartition, True)
@@ -610,12 +609,11 @@ class RDD(object):
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
 
-        spill = self._can_spill()
         memory = self._memory_limit()
         serializer = self._jrdd_deserializer
 
         def sortPartition(iterator):
-            sort = ExternalSorter(memory * 0.9, serializer).sorted if spill 
else sorted
+            sort = ExternalSorter(memory * 0.9, serializer).sorted
             return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), 
reverse=(not ascending)))
 
         if numPartitions == 1:
@@ -1770,13 +1768,11 @@ class RDD(object):
             numPartitions = self._defaultReducePartitions()
 
         serializer = self.ctx.serializer
-        spill = self._can_spill()
         memory = self._memory_limit()
         agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
 
         def combineLocally(iterator):
-            merger = ExternalMerger(agg, memory * 0.9, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory * 0.9, serializer)
             merger.mergeValues(iterator)
             return merger.items()
 
@@ -1784,8 +1780,7 @@ class RDD(object):
         shuffled = locally_combined.partitionBy(numPartitions)
 
         def _mergeCombiners(iterator):
-            merger = ExternalMerger(agg, memory, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory, serializer)
             merger.mergeCombiners(iterator)
             return merger.items()
 
@@ -1824,9 +1819,6 @@ class RDD(object):
 
         return self.combineByKey(lambda v: func(createZero(), v), func, func, 
numPartitions)
 
-    def _can_spill(self):
-        return self.ctx._conf.get("spark.shuffle.spill", "True").lower() == 
"true"
-
     def _memory_limit(self):
         return _parse_memory(self.ctx._conf.get("spark.python.worker.memory", 
"512m"))
 
@@ -1857,14 +1849,12 @@ class RDD(object):
             a.extend(b)
             return a
 
-        spill = self._can_spill()
         memory = self._memory_limit()
         serializer = self._jrdd_deserializer
         agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
 
         def combine(iterator):
-            merger = ExternalMerger(agg, memory * 0.9, serializer) \
-                if spill else InMemoryMerger(agg)
+            merger = ExternalMerger(agg, memory * 0.9, serializer)
             merger.mergeValues(iterator)
             return merger.items()
 
@@ -1872,8 +1862,7 @@ class RDD(object):
         shuffled = locally_combined.partitionBy(numPartitions)
 
         def groupByKey(it):
-            merger = ExternalGroupBy(agg, memory, serializer)\
-                if spill else InMemoryMerger(agg)
+            merger = ExternalGroupBy(agg, memory, serializer)
             merger.mergeCombiners(it)
             return merger.items()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index b8118bd..e974cda 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -131,36 +131,6 @@ class Merger(object):
         raise NotImplementedError
 
 
-class InMemoryMerger(Merger):
-
-    """
-    In memory merger based on in-memory dict.
-    """
-
-    def __init__(self, aggregator):
-        Merger.__init__(self, aggregator)
-        self.data = {}
-
-    def mergeValues(self, iterator):
-        """ Combine the items by creator and combiner """
-        # speed up attributes lookup
-        d, creator = self.data, self.agg.createCombiner
-        comb = self.agg.mergeValue
-        for k, v in iterator:
-            d[k] = comb(d[k], v) if k in d else creator(v)
-
-    def mergeCombiners(self, iterator):
-        """ Merge the combined items by mergeCombiner """
-        # speed up attributes lookup
-        d, comb = self.data, self.agg.mergeCombiners
-        for k, v in iterator:
-            d[k] = comb(d[k], v) if k in d else v
-
-    def items(self):
-        """ Return the merged items ad iterator """
-        return iter(self.data.items())
-
-
 def _compressed_serializer(self, serializer=None):
     # always use PickleSerializer to simplify implementation
     ser = PickleSerializer()

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/python/pyspark/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 647504c..f11aaf0 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -62,7 +62,7 @@ from pyspark.serializers import read_int, BatchedSerializer, 
MarshalSerializer,
     CloudPickleSerializer, CompressedSerializer, UTF8Deserializer, 
NoOpSerializer, \
     PairDeserializer, CartesianDeserializer, AutoBatchedSerializer, 
AutoSerializer, \
     FlattenedValuesSerializer
-from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, 
ExternalSorter
+from pyspark.shuffle import Aggregator, ExternalMerger, ExternalSorter
 from pyspark import shuffle
 from pyspark.profiler import BasicProfiler
 
@@ -95,17 +95,6 @@ class MergerTests(unittest.TestCase):
                               lambda x, y: x.append(y) or x,
                               lambda x, y: x.extend(y) or x)
 
-    def test_in_memory(self):
-        m = InMemoryMerger(self.agg)
-        m.mergeValues(self.data)
-        self.assertEqual(sum(sum(v) for k, v in m.items()),
-                         sum(xrange(self.N)))
-
-        m = InMemoryMerger(self.agg)
-        m.mergeCombiners(map(lambda x_y: (x_y[0], [x_y[1]]), self.data))
-        self.assertEqual(sum(sum(v) for k, v in m.items()),
-                         sum(xrange(self.N)))
-
     def test_small_dataset(self):
         m = ExternalMerger(self.agg, 1000)
         m.mergeValues(self.data)

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 9de75f4..b9fb90d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -330,11 +330,6 @@ private[spark] object SQLConf {
 
   // Options that control which operators can be chosen by the query planner.  
These should be
   // considered hints and may be ignored by future versions of Spark SQL.
-  val EXTERNAL_SORT = booleanConf("spark.sql.planner.externalSort",
-    defaultValue = Some(true),
-    doc = "When true, performs sorts spilling to disk as needed otherwise sort 
each partition in" +
-      " memory.")
-
   val SORTMERGE_JOIN = booleanConf("spark.sql.planner.sortMergeJoin",
     defaultValue = Some(true),
     doc = "When true, use sort merge join (as opposed to hash join) by default 
for large joins.")
@@ -422,6 +417,7 @@ private[spark] object SQLConf {
 
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
+    val EXTERNAL_SORT = "spark.sql.planner.externalSort"
   }
 }
 
@@ -476,8 +472,6 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf {
 
   private[spark] def metastorePartitionPruning: Boolean = 
getConf(HIVE_METASTORE_PARTITION_PRUNING)
 
-  private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT)
-
   private[spark] def sortMergeJoinEnabled: Boolean = getConf(SORTMERGE_JOIN)
 
   private[spark] def codegenEnabled: Boolean = getConf(CODEGEN_ENABLED, 
getConf(TUNGSTEN_ENABLED))

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 5e40d77..41b215c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -312,8 +312,6 @@ private[sql] abstract class SparkStrategies extends 
QueryPlanner[SparkPlan] {
       if (sqlContext.conf.unsafeEnabled && sqlContext.conf.codegenEnabled &&
         TungstenSort.supportsSchema(child.schema)) {
         execution.TungstenSort(sortExprs, global, child)
-      } else if (sqlContext.conf.externalSortEnabled) {
-        execution.ExternalSort(sortExprs, global, child)
       } else {
         execution.Sort(sortExprs, global, child)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 95209e6..af28e2d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -105,6 +105,15 @@ case class SetCommand(kv: Option[(String, 
Option[String])]) extends RunnableComm
       }
       (keyValueOutput, runFunc)
 
+    case Some((SQLConf.Deprecated.EXTERNAL_SORT, Some(value))) =>
+      val runFunc = (sqlContext: SQLContext) => {
+        logWarning(
+          s"Property ${SQLConf.Deprecated.EXTERNAL_SORT} is deprecated and 
will be ignored. " +
+            s"External sort will continue to be used.")
+        Seq(Row(SQLConf.Deprecated.EXTERNAL_SORT, "true"))
+      }
+      (keyValueOutput, runFunc)
+
     // Configures a single property.
     case Some((key, Some(value))) =>
       val runFunc = (sqlContext: SQLContext) => {

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
index 40ef7c3..27f2624 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/sort.scala
@@ -31,38 +31,12 @@ import org.apache.spark.{SparkEnv, InternalAccumulator, 
TaskContext}
 // This file defines various sort operators.
 
////////////////////////////////////////////////////////////////////////////////////////////////////
 
-
-/**
- * Performs a sort on-heap.
- * @param global when true performs a global sort of all partitions by 
shuffling the data first
- *               if necessary.
- */
-case class Sort(
-    sortOrder: Seq[SortOrder],
-    global: Boolean,
-    child: SparkPlan)
-  extends UnaryNode {
-  override def requiredChildDistribution: Seq[Distribution] =
-    if (global) OrderedDistribution(sortOrder) :: Nil else 
UnspecifiedDistribution :: Nil
-
-  protected override def doExecute(): RDD[InternalRow] = attachTree(this, 
"sort") {
-    child.execute().mapPartitions( { iterator =>
-      val ordering = newOrdering(sortOrder, child.output)
-      iterator.map(_.copy()).toArray.sorted(ordering).iterator
-    }, preservesPartitioning = true)
-  }
-
-  override def output: Seq[Attribute] = child.output
-
-  override def outputOrdering: Seq[SortOrder] = sortOrder
-}
-
 /**
  * Performs a sort, spilling to disk as needed.
  * @param global when true performs a global sort of all partitions by 
shuffling the data first
  *               if necessary.
  */
-case class ExternalSort(
+case class Sort(
     sortOrder: Seq[SortOrder],
     global: Boolean,
     child: SparkPlan)
@@ -93,7 +67,7 @@ case class ExternalSort(
 }
 
 /**
- * Optimized version of [[ExternalSort]] that operates on binary data 
(implemented as part of
+ * Optimized version of [[Sort]] that operates on binary data (implemented as 
part of
  * Project Tungsten).
  *
  * @param global when true performs a global sort of all partitions by 
shuffling the data first

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index f998135..05b4127 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -581,28 +581,12 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
       mapData.collect().sortBy(_.data(1)).reverse.map(Row.fromTuple).toSeq)
   }
 
-  test("sorting") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false") {
-      sortTest()
-    }
-  }
-
   test("external sorting") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true") {
-      sortTest()
-    }
-  }
-
-  test("SPARK-6927 sorting with codegen on") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "false",
-      SQLConf.CODEGEN_ENABLED.key -> "true") {
-      sortTest()
-    }
+    sortTest()
   }
 
   test("SPARK-6927 external sorting with codegen on") {
-    withSQLConf(SQLConf.EXTERNAL_SORT.key -> "true",
-      SQLConf.CODEGEN_ENABLED.key -> "true") {
+    withSQLConf(SQLConf.CODEGEN_ENABLED.key -> "true") {
       sortTest()
     }
   }
@@ -1731,10 +1715,8 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   }
 
   test("external sorting updates peak execution memory") {
-    withSQLConf((SQLConf.EXTERNAL_SORT.key, "true")) {
-      AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external 
sort") {
-        sortTest()
-      }
+    AccumulatorSuite.verifyPeakExecutionMemorySet(sparkContext, "external 
sort") {
+      sortTest()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
index 4492e37..5dc37e5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/RowFormatConvertersSuite.scala
@@ -32,7 +32,7 @@ class RowFormatConvertersSuite extends SparkPlanTest with 
SharedSQLContext {
     case c: ConvertToSafe => c
   }
 
-  private val outputsSafe = ExternalSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
+  private val outputsSafe = Sort(Nil, false, PhysicalRDD(Seq.empty, null, 
"name"))
   assert(!outputsSafe.outputsUnsafeRows)
   private val outputsUnsafe = TungstenSort(Nil, false, PhysicalRDD(Seq.empty, 
null, "name"))
   assert(outputsUnsafe.outputsUnsafeRows)

http://git-wip-us.apache.org/repos/asf/spark/blob/2117eea7/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
index 3073d49..847c188 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala
@@ -36,13 +36,13 @@ class SortSuite extends SparkPlanTest with SharedSQLContext 
{
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      ExternalSort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
+      Sort('a.asc :: 'b.asc :: Nil, global = true, _: SparkPlan),
       input.sortBy(t => (t._1, t._2)).map(Row.fromTuple),
       sortAnswers = false)
 
     checkAnswer(
       input.toDF("a", "b", "c"),
-      ExternalSort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
+      Sort('b.asc :: 'a.asc :: Nil, global = true, _: SparkPlan),
       input.sortBy(t => (t._2, t._1)).map(Row.fromTuple),
       sortAnswers = false)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to