Repository: spark
Updated Branches:
  refs/heads/master 92c2f00bd -> 39dfaf2fd


[SPARK-24519] Make the threshold for highly compressed map status configurable

**Problem**
MapStatus uses hardcoded value of 2000 partitions to determine if it should use 
highly compressed map status. We should make it configurable to allow users to 
more easily tune their jobs with respect to this without having for them to 
modify their code to change the number of partitions.  Note we can leave this 
as an internal/undocumented config for now until we have more advise for the 
users on how to set this config.
Some of my reasoning:
The config gives you a way to easily change something without the user having 
to change code, redeploy jar, and then run again. You can simply change the 
config and rerun. It also allows for easier experimentation. Changing the # of 
partitions has other side affects, whether good or bad is situation dependent. 
It can be worse are you could be increasing # of output files when you don't 
want to be, affects the # of tasks needs and thus executors to run in parallel, 
etc.
There have been various talks about this number at spark summits where people 
have told customers to increase it to be 2001 partitions. Note if you just do a 
search for spark 2000 partitions you will fine various things all talking about 
this number.  This shows that people are modifying their code to take this into 
account so it seems to me having this configurable would be better.
Once we have more advice for users we could expose this and document 
information on it.

**What changes were proposed in this pull request?**
I make the hardcoded value mentioned above to be configurable under the name 
_SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_, which has default value to be 2000. 
Users can set it to the value they want by setting the property name 
_spark.shuffle.minNumPartitionsToHighlyCompress_

**How was this patch tested?**
I wrote a unit test to make sure that the default value is 2000, and  
_IllegalArgumentException_ will be thrown if user set it to a non-positive 
value. The unit test also checks that highly compressed map status is correctly 
used when the number of partition is greater than 
_SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS_.

Author: Hieu Huynh <“hieu.hu...@oath.com”>

Closes #21527 from hthuynh2/spark_branch_1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39dfaf2f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39dfaf2f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39dfaf2f

Branch: refs/heads/master
Commit: 39dfaf2fd167cafc84ec9cc637c114ed54a331e3
Parents: 92c2f00
Author: Hieu Huynh <“hieu.hu...@oath.com”>
Authored: Fri Jun 22 09:16:14 2018 -0500
Committer: Thomas Graves <tgra...@apache.org>
Committed: Fri Jun 22 09:16:14 2018 -0500

----------------------------------------------------------------------
 .../apache/spark/internal/config/package.scala  |  7 +++++
 .../org/apache/spark/scheduler/MapStatus.scala  |  4 ++-
 .../apache/spark/scheduler/MapStatusSuite.scala | 28 ++++++++++++++++++++
 3 files changed, 38 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/39dfaf2f/core/src/main/scala/org/apache/spark/internal/config/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala 
b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index a54b091..38a043c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -552,4 +552,11 @@ package object config {
       .timeConf(TimeUnit.SECONDS)
       .createWithDefaultString("1h")
 
+  private[spark] val SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS =
+    ConfigBuilder("spark.shuffle.minNumPartitionsToHighlyCompress")
+      .internal()
+      .doc("Number of partitions to determine if MapStatus should use 
HighlyCompressedMapStatus")
+      .intConf
+      .checkValue(v => v > 0, "The value should be a positive integer.")
+      .createWithDefault(2000)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/39dfaf2f/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
index 2ec2f20..659694d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/MapStatus.scala
@@ -50,7 +50,9 @@ private[spark] sealed trait MapStatus {
 private[spark] object MapStatus {
 
   def apply(loc: BlockManagerId, uncompressedSizes: Array[Long]): MapStatus = {
-    if (uncompressedSizes.length > 2000) {
+    if (uncompressedSizes.length >  Option(SparkEnv.get)
+      .map(_.conf.get(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS))
+      
.getOrElse(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS.defaultValue.get)) {
       HighlyCompressedMapStatus(loc, uncompressedSizes)
     } else {
       new CompressedMapStatus(loc, uncompressedSizes)

http://git-wip-us.apache.org/repos/asf/spark/blob/39dfaf2f/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
index 2155a0f..354e638 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/MapStatusSuite.scala
@@ -188,4 +188,32 @@ class MapStatusSuite extends SparkFunSuite {
       assert(count === 3000)
     }
   }
+
+  test("SPARK-24519: HighlyCompressedMapStatus has configurable threshold") {
+    val conf = new SparkConf()
+    val env = mock(classOf[SparkEnv])
+    doReturn(conf).when(env).conf
+    SparkEnv.set(env)
+    val sizes = Array.fill[Long](500)(150L)
+    // Test default value
+    val status = MapStatus(null, sizes)
+    assert(status.isInstanceOf[CompressedMapStatus])
+    // Test Non-positive values
+    for (s <- -1 to 0) {
+      assertThrows[IllegalArgumentException] {
+        conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
+        val status = MapStatus(null, sizes)
+      }
+    }
+    // Test positive values
+    Seq(1, 100, 499, 500, 501).foreach { s =>
+      conf.set(config.SHUFFLE_MIN_NUM_PARTS_TO_HIGHLY_COMPRESS, s)
+      val status = MapStatus(null, sizes)
+      if(sizes.length > s) {
+        assert(status.isInstanceOf[HighlyCompressedMapStatus])
+      } else {
+        assert(status.isInstanceOf[CompressedMapStatus])
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to