JoshRosen commented on a change in pull request #34158:
URL: https://github.com/apache/spark/pull/34158#discussion_r720558670
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2603,17 +2603,19 @@ private[spark] object Utils extends Logging {
* - IO encryption disabled
* - serializer(such as KryoSerializer) supports relocation of serialized
objects
*/
- def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
+ def isPushBasedShuffleEnabled(conf: SparkConf,
+ isDriver: Boolean,
+ checkSerializer: Boolean = true): Boolean = {
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
if (pushBasedShuffleEnabled) {
- val serializer =
Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
- .newInstance(conf).asInstanceOf[Serializer]
+ lazy val serializer = Option(SparkEnv.get).map(_.serializer)
+ .getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf,
isDriver))
val canDoPushBasedShuffle = conf.get(IS_TESTING).getOrElse(false) ||
Review comment:
I think there may have been a pre-existing bug in the existing code here:
When `conf.get(IS_TESTING)` is true then we don't even check whether the
serializer supports relocation.
Looking a few revisions back, it looks like the `IS_TESTING` check was added
to allow the push-based testing code path to be used in tests when Spark isn't
running on YARN:
https://github.com/apache/spark/blame/1a62e6a2c119df707f15101b03ecff0c3dee62f5/core/src/main/scala/org/apache/spark/util/Utils.scala#L2602-L2607
It looks like #33976 changed the precedence when adding the additional
checks involving IO encryption and the serializer.
I find the deeply paren-nested conditionals to be a bit tricky to understand
and suspect that's why this bug slipped through.
As long as we're changing this code, what do you think about defining a few
helper variables so that the final condition is simpler? Maybe something like
this:
```scala
val canDoPushBasedShuffle = {
val isTesting = conf.get(IS_TESTING).getOrElse(false)
val isShuffleServiceAndYarn = (conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")
val serializerIsSupported = {
if (checkSerializer) {
Option(SparkEnv.get)
.map(_.serializer)
.getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf,
isDriver))
.supportsRelocationOfSerializedObjects
} else {
false
}
}
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
(shuffleServiceOnYarn || isTesting) && ioEncryptionDisabled &&
serializerisSupported
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]