rmcyang commented on a change in pull request #34158:
URL: https://github.com/apache/spark/pull/34158#discussion_r720629743
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2603,17 +2603,19 @@ private[spark] object Utils extends Logging {
* - IO encryption disabled
* - serializer(such as KryoSerializer) supports relocation of serialized
objects
*/
- def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
+ def isPushBasedShuffleEnabled(conf: SparkConf,
+ isDriver: Boolean,
+ checkSerializer: Boolean = true): Boolean = {
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
if (pushBasedShuffleEnabled) {
- val serializer =
Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
- .newInstance(conf).asInstanceOf[Serializer]
+ lazy val serializer = Option(SparkEnv.get).map(_.serializer)
+ .getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf,
isDriver))
val canDoPushBasedShuffle = conf.get(IS_TESTING).getOrElse(false) ||
Review comment:
Thanks @JoshRosen.
Agree that defining a few helper variables makes it's easy to understand and
makes the final condition simpler. However, `(shuffleServiceOnYarn ||
isTesting) && ioEncryptionDisabled && serializerisSupported` seems not right --
it breaks below UTs in `DAGSchedulerSuite`:
```
- SPARK-32920: shuffle merge finalization *** FAILED ***
0 did not equal 2 (DAGSchedulerSuite.scala:3448)
- SPARK-32920: merger locations not empty *** FAILED ***
List() was empty (DAGSchedulerSuite.scala:3470)
- SPARK-32920: merger locations reuse from shuffle dependency *** FAILED ***
List() was empty (DAGSchedulerSuite.scala:3493)
- SPARK-32920: Ensure child stage should not start before all the parent
stages are completed with shuffle merge finalized for all the parent stages ***
FAILED ***
List() was empty (DAGSchedulerSuite.scala:3563)
- SPARK-32920: Merge results should be unregistered if the running stage is
cancelled before shuffle merge is finalized *** FAILED ***
0 did not equal 2 (DAGSchedulerSuite.scala:3693)
- SPARK-32920: SPARK-35549: Merge results should not get registered after
shuffle merge finalization *** FAILED ***
0 did not equal 1 (DAGSchedulerSuite.scala:3735)
- SPARK-32923: handle stage failure for indeterminate map stage with
push-based shuffle *** FAILED ***
0 did not equal 2 (DAGSchedulerSuite.scala:3833)
```
Before the [first
PR](https://github.com/apache/spark/pull/33976/files#diff-2ecc6aef4b0c50bbf146e6c0b3b8b2249375f06a83e2a224c7718cfc850c3af7L2605)
for [SPARK-36705](https://issues.apache.org/jira/browse/SPARK-36705), it looks
like the condition logic is
```isTesting || (isShuffleService && isYarn)```
and the proposed change in this PR is actually equivalent to
```isTesting || (isShuffleService && isYarn && ioEncryptionDisabled &&
serializerIsSupported)```.
So combine with your idea, I still think it should be like ```isTesting ||
(shuffleServiceOnYarn && ioEncryptionDisabled && serializerIsSupported)```.
Another thing is when `checkSerializer` is `false`, the if-else should
return true instead of false. The purpose here is to skip
`supportsRelocationOfSerializedObjects` when we don't want check serializer
while keep the push-based shuffle enabled. So overall I think the logic here
should be more like below (this also won't fail above UTs):
```
val canDoPushBasedShuffle = {
val isTesting = conf.get(IS_TESTING).getOrElse(false)
val isShuffleServiceAndYarn = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"
val serializerIsSupported = {
if (checkSerializer) {
Option(SparkEnv.get)
.map(_.serializer)
.getOrElse(instantiateSerializerFromConf[Serializer](SERIALIZER, conf,
isDriver))
.supportsRelocationOfSerializedObjects
} else {
// if no need to check Serializer, always set
serializerIsSupported as true
true
}
}
// TODO: [SPARK-36744] needs to support IO encryption for push-based
shuffle
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
isTesting || (isShuffleServiceAndYarn && ioEncryptionDisabled &&
serializerIsSupported)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]