This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 79e148e  [SPARK-36705][SHUFFLE] Disable push based shuffle when IO 
encryption is enabled or serializer is not relocatable
79e148e is described below

commit 79e148ee934404b2f3a748847dc57f13b05dbc87
Author: Minchu Yang <miny...@minyang-mn3.linkedin.biz>
AuthorDate: Mon Sep 13 16:14:35 2021 -0500

    [SPARK-36705][SHUFFLE] Disable push based shuffle when IO encryption is 
enabled or serializer is not relocatable
    
    ### What changes were proposed in this pull request?
    
    Disable push-based shuffle when IO encryption is enabled or serializer does 
not support relocation of serialized objects.
    
    ### Why are the changes needed?
    
    Push based shuffle is not compatible with IO encryption or non-relocatable 
serialization.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added some tests to check whether push-based shuffle can be disabled 
successfully when IO encryption is enabled or a serializer that does not 
support relocation of serialized object is used.
    
    Closes #33976 from rmcyang/SPARK-36705.
    
    Authored-by: Minchu Yang <miny...@minyang-mn3.linkedin.biz>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 999473b1a5bad4ae2ae345df8abf018100c9d918)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../main/scala/org/apache/spark/util/Utils.scala   | 30 +++++++++++++++++-----
 .../scala/org/apache/spark/util/UtilsSuite.scala   | 11 ++++++--
 2 files changed, 32 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala 
b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b130789..a112214 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -70,7 +70,7 @@ import org.apache.spark.internal.config.UI._
 import org.apache.spark.internal.config.Worker._
 import org.apache.spark.launcher.SparkLauncher
 import org.apache.spark.network.util.JavaUtils
-import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, SerializerInstance}
+import org.apache.spark.serializer.{DeserializationStream, 
SerializationStream, Serializer, SerializerInstance}
 import org.apache.spark.status.api.v1.{StackTrace, ThreadStackTrace}
 import org.apache.spark.util.io.ChunkedByteBufferOutputStream
 
@@ -2591,14 +2591,30 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Push based shuffle can only be enabled when the application is submitted
-   * to run in YARN mode, with external shuffle service enabled
+   * Push based shuffle can only be enabled when below conditions are met:
+   *   - the application is submitted to run in YARN mode
+   *   - external shuffle service enabled
+   *   - IO encryption disabled
+   *   - serializer(such as KryoSerializer) supports relocation of serialized 
objects
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
-    conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
-      (conf.get(IS_TESTING).getOrElse(false) ||
-        (conf.get(SHUFFLE_SERVICE_ENABLED) &&
-          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"))
+    val serializer = 
Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
+      .newInstance(conf).asInstanceOf[Serializer]
+    val canDoPushBasedShuffle =
+      conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
+        (conf.get(IS_TESTING).getOrElse(false) ||
+          (conf.get(SHUFFLE_SERVICE_ENABLED) &&
+            conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" &&
+            // TODO: [SPARK-36744] needs to support IO encryption for 
push-based shuffle
+            !conf.get(IO_ENCRYPTION_ENABLED) &&
+            serializer.supportsRelocationOfSerializedObjects))
+
+    if (!canDoPushBasedShuffle) {
+      logWarning("Push-based shuffle can only be enabled when the application 
is submitted" +
+        "to run in YARN mode, with external shuffle service enabled, IO 
encryption disabled, and" +
+        "relocation of serialized objects supported.")
+    }
+    canDoPushBasedShuffle
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index 095dbef..de8f4ce 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1447,10 +1447,17 @@ class UtilsSuite extends SparkFunSuite with 
ResetSystemProperties with Logging {
     assert(Utils.isPushBasedShuffleEnabled(conf) === false)
     conf.set(SHUFFLE_SERVICE_ENABLED, true)
     conf.set(SparkLauncher.SPARK_MASTER, "yarn")
-    conf.set("spark.yarn.maxAttempts", "1")
+    conf.set("spark.yarn.maxAppAttempts", "1")
+    conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
-    conf.set("spark.yarn.maxAttempts", "2")
+    conf.set("spark.yarn.maxAppAttempts", "2")
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
+    conf.set(IO_ENCRYPTION_ENABLED, true)
+    assert(Utils.isPushBasedShuffleEnabled(conf) === false)
+    conf.set(IO_ENCRYPTION_ENABLED, false)
+    assert(Utils.isPushBasedShuffleEnabled(conf) === true)
+    conf.set(SERIALIZER, "org.apache.spark.serializer.JavaSerializer")
+    assert(Utils.isPushBasedShuffleEnabled(conf) === false)
   }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to