rmcyang commented on a change in pull request #34158:
URL: https://github.com/apache/spark/pull/34158#discussion_r720629743



##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2603,17 +2603,19 @@ private[spark] object Utils extends Logging {
    *   - IO encryption disabled
    *   - serializer(such as KryoSerializer) supports relocation of serialized 
objects
    */
-  def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
+  def isPushBasedShuffleEnabled(conf: SparkConf,
+      isDriver: Boolean,
+      checkSerializer: Boolean = true): Boolean = {
     val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
     if (pushBasedShuffleEnabled) {
-      val serializer = 
Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
-        .newInstance(conf).asInstanceOf[Serializer]
+      lazy val serializer = Option(SparkEnv.get).map(_.serializer)
+        .getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf, 
isDriver))
       val canDoPushBasedShuffle = conf.get(IS_TESTING).getOrElse(false) ||

Review comment:
       Thanks @JoshRosen.
   Agree that defining a few helper variables makes it's easy to understand and 
makes the final condition simpler. However, `(shuffleServiceOnYarn || 
isTesting) && ioEncryptionDisabled && serializerisSupported` seems not right -- 
it breaks below UTs in `DAGSchedulerSuite`:
   ```
   - SPARK-32920: shuffle merge finalization *** FAILED ***
     0 did not equal 2 (DAGSchedulerSuite.scala:3448)
   - SPARK-32920: merger locations not empty *** FAILED ***
     List() was empty (DAGSchedulerSuite.scala:3470)
   - SPARK-32920: merger locations reuse from shuffle dependency *** FAILED ***
     List() was empty (DAGSchedulerSuite.scala:3493)
   - SPARK-32920: Ensure child stage should not start before all the parent 
stages are completed with shuffle merge finalized for all the parent stages *** 
FAILED ***
     List() was empty (DAGSchedulerSuite.scala:3563)
   - SPARK-32920: Merge results should be unregistered if the running stage is 
cancelled before shuffle merge is finalized *** FAILED ***
     0 did not equal 2 (DAGSchedulerSuite.scala:3693)
   - SPARK-32920: SPARK-35549: Merge results should not get registered after 
shuffle merge finalization *** FAILED ***
     0 did not equal 1 (DAGSchedulerSuite.scala:3735)
   - SPARK-32923: handle stage failure for indeterminate map stage with 
push-based shuffle *** FAILED ***
     0 did not equal 2 (DAGSchedulerSuite.scala:3833)
   ```
   Before the [first 
PR](https://github.com/apache/spark/pull/33976/files#diff-2ecc6aef4b0c50bbf146e6c0b3b8b2249375f06a83e2a224c7718cfc850c3af7L2605)
 for [SPARK-36705](https://issues.apache.org/jira/browse/SPARK-36705), it looks 
like the condition logic is 
   ```isTesting || (isShuffleService && isYarn)```
    and the proposed change in this PR is actually equivalent to 
   ```isTesting || (isShuffleService && isYarn && ioEncryptionDisabled && 
serializerIsSupported)```.
   So combine with your idea, I still think it should be like ```isTesting || 
(shuffleServiceOnYarn && ioEncryptionDisabled && serializerIsSupported)```.
   
   Another thing is when `checkSerializer` is `false`, the if-else should 
return true instead of false. The purpose here is to skip 
`supportsRelocationOfSerializedObjects` when we don't want check serializer 
while keep the push-based shuffle enabled. So overall I think the logic here 
should be more like below (this also won't fail above UTs):
   ```
         val canDoPushBasedShuffle = {
           val isTesting = conf.get(IS_TESTING).getOrElse(false)
           val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
               conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
           val serializerIsSupported = {
             if (checkSerializer) {
               Option(SparkEnv.get)
                 .map(_.serializer)
                 
.getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf, 
isDriver))
                 .supportsRelocationOfSerializedObjects
             } else {
               // if no need to check Serializer, always set 
serializerIsSupported as true
               true
             }
           }
           // TODO: [SPARK-36744] needs to support IO encryption for push-based 
shuffle
           val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
           isTesting || (isShuffleServiceAndYarn && ioEncryptionDisabled && 
serializerIsSupported)
         }
   ``` 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to