Repository: spark
Updated Branches:
  refs/heads/master 25c2776dd -> 22f3d3334


[SPARK-23389][CORE] When the shuffle dependency specifies aggregation ,and 
`dependency.mapSideCombine =false`, we should be able to use serialized sorting.

## What changes were proposed in this pull request?
When the shuffle dependency specifies aggregation ,and 
`dependency.mapSideCombine=false`, in the map side,there is no need for 
aggregation and sorting, so we should be able to use serialized sorting.

## How was this patch tested?
Existing unit test

Author: liuxian <liu.xi...@zte.com.cn>

Closes #20576 from 10110346/mapsidecombine.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/22f3d333
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/22f3d333
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/22f3d333

Branch: refs/heads/master
Commit: 22f3d3334c85c042c6e90f5a02f308d7cd1c1498
Parents: 25c2776
Author: liuxian <liu.xi...@zte.com.cn>
Authored: Thu Mar 1 14:28:28 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Mar 1 14:28:28 2018 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/Dependency.scala   |  3 +++
 .../spark/shuffle/BlockStoreShuffleReader.scala    |  1 -
 .../spark/shuffle/sort/SortShuffleManager.scala    |  6 +++---
 .../spark/shuffle/sort/SortShuffleWriter.scala     |  2 --
 .../shuffle/sort/SortShuffleManagerSuite.scala     | 17 +++++++++--------
 5 files changed, 15 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/22f3d333/core/src/main/scala/org/apache/spark/Dependency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala 
b/core/src/main/scala/org/apache/spark/Dependency.scala
index ca52eca..9ea6d2f 100644
--- a/core/src/main/scala/org/apache/spark/Dependency.scala
+++ b/core/src/main/scala/org/apache/spark/Dependency.scala
@@ -76,6 +76,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: 
ClassTag](
     val mapSideCombine: Boolean = false)
   extends Dependency[Product2[K, V]] {
 
+  if (mapSideCombine) {
+    require(aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
+  }
   override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, 
V]]]
 
   private[spark] val keyClassName: String = 
reflect.classTag[K].runtimeClass.getName

http://git-wip-us.apache.org/repos/asf/spark/blob/22f3d333/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala 
b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
index 0562d45..edd6971 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/BlockStoreShuffleReader.scala
@@ -90,7 +90,6 @@ private[spark] class BlockStoreShuffleReader[K, C](
         dep.aggregator.get.combineValuesByKey(keyValuesIterator, context)
       }
     } else {
-      require(!dep.mapSideCombine, "Map-side combine without Aggregator 
specified!")
       interruptibleIter.asInstanceOf[Iterator[Product2[K, C]]]
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/22f3d333/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index bfb4dc6..d9fad64 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -188,9 +188,9 @@ private[spark] object SortShuffleManager extends Logging {
       log.debug(s"Can't use serialized shuffle for shuffle $shufId because the 
serializer, " +
         s"${dependency.serializer.getClass.getName}, does not support object 
relocation")
       false
-    } else if (dependency.aggregator.isDefined) {
-      log.debug(
-        s"Can't use serialized shuffle for shuffle $shufId because an 
aggregator is defined")
+    } else if (dependency.mapSideCombine) {
+      log.debug(s"Can't use serialized shuffle for shuffle $shufId because we 
need to do " +
+        s"map-side aggregation")
       false
     } else if (numPartitions > 
MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
       log.debug(s"Can't use serialized shuffle for shuffle $shufId because it 
has more than " +

http://git-wip-us.apache.org/repos/asf/spark/blob/22f3d333/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 636b88e..274399b 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -50,7 +50,6 @@ private[spark] class SortShuffleWriter[K, V, C](
   /** Write a bunch of records to this task's output */
   override def write(records: Iterator[Product2[K, V]]): Unit = {
     sorter = if (dep.mapSideCombine) {
-      require(dep.aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
       new ExternalSorter[K, V, C](
         context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, 
dep.serializer)
     } else {
@@ -107,7 +106,6 @@ private[spark] object SortShuffleWriter {
   def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): 
Boolean = {
     // We cannot bypass sorting if we need to do map-side aggregation.
     if (dep.mapSideCombine) {
-      require(dep.aggregator.isDefined, "Map-side combine without Aggregator 
specified!")
       false
     } else {
       val bypassMergeThreshold: Int = 
conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)

http://git-wip-us.apache.org/repos/asf/spark/blob/22f3d333/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
index 55cebe7..f29dac9 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleManagerSuite.scala
@@ -85,6 +85,14 @@ class SortShuffleManagerSuite extends SparkFunSuite with 
Matchers {
       mapSideCombine = false
     )))
 
+    // We support serialized shuffle if we do not need to do map-side 
aggregation
+    assert(canUseSerializedShuffle(shuffleDep(
+      partitioner = new HashPartitioner(2),
+      serializer = kryo,
+      keyOrdering = None,
+      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
+      mapSideCombine = false
+    )))
   }
 
   test("unsupported shuffle dependencies for serialized shuffle") {
@@ -111,14 +119,7 @@ class SortShuffleManagerSuite extends SparkFunSuite with 
Matchers {
       mapSideCombine = false
     )))
 
-    // We do not support shuffles that perform aggregation
-    assert(!canUseSerializedShuffle(shuffleDep(
-      partitioner = new HashPartitioner(2),
-      serializer = kryo,
-      keyOrdering = None,
-      aggregator = Some(mock(classOf[Aggregator[Any, Any, Any]])),
-      mapSideCombine = false
-    )))
+    // We do not support serialized shuffle if we need to do map-side 
aggregation
     assert(!canUseSerializedShuffle(shuffleDep(
       partitioner = new HashPartitioner(2),
       serializer = kryo,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to