JoshRosen commented on a change in pull request #34158:
URL: https://github.com/apache/spark/pull/34158#discussion_r720558670
##########
File path: core/src/main/scala/org/apache/spark/util/Utils.scala
##########
@@ -2603,17 +2603,19 @@ private[spark] object Utils extends Logging {
* - IO encryption disabled
* - serializer(such as KryoSerializer) supports relocation of serialized
objects
*/
- def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
+ def isPushBasedShuffleEnabled(conf: SparkConf,
+ isDriver: Boolean,
+ checkSerializer: Boolean = true): Boolean = {
val pushBasedShuffleEnabled = conf.get(PUSH_BASED_SHUFFLE_ENABLED)
if (pushBasedShuffleEnabled) {
- val serializer =
Utils.classForName(conf.get(SERIALIZER)).getConstructor(classOf[SparkConf])
- .newInstance(conf).asInstanceOf[Serializer]
+ lazy val serializer = Option(SparkEnv.get).map(_.serializer)
+ .getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf,
isDriver))
val canDoPushBasedShuffle = conf.get(IS_TESTING).getOrElse(false) ||
Review comment:
I think there may have been a pre-existing bug in the existing code here:
When `conf.get(IS_TESTING)` is true then we don't even check whether the
serializer supports relocation.
Looking a few revisions back, it looks like the `IS_TESTING` check was added
to allow the push-based testing code path to be used in tests when Spark isn't
running on YARN:
https://github.com/apache/spark/blame/1a62e6a2c119df707f15101b03ecff0c3dee62f5/core/src/main/scala/org/apache/spark/util/Utils.scala#L2602-L2607
It looks like #33976 changed the precedence when adding the additional
checks involving IO encryption and the serializer.
I find the deeply paren-nested conditionals to be a bit tricky to understand
and suspect that's why this bug slipped through.
As long as we're changing this code, what do you think about defining a few
helper variables so that the final condition is simpler? Maybe something like
this:
```scala
val canDoPushBasedShuffle = {
val isTesting = conf.get(IS_TESTING).getOrElse(false)
val isShuffleServiceAndYarn = (conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn")
val serializerIsSupported = {
if (checkSerializer) {
Option(SparkEnv.get)
.map(_.serializer)
.getOrElse(instantiateClassFromConf[Serializer](SERIALIZER, conf,
isDriver))
.supportsRelocationOfSerializedObjects
} else {
false
}
}
val ioEncryptionDisabled = !conf.get(IO_ENCRYPTION_ENABLED)
(shuffleServiceOnYarn || isTesting) && ioEncryptionDisabled &&
serializerisSupported
}
```
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -194,8 +195,8 @@ private[spark] class BlockManager(
val diskBlockManager = {
// Only perform cleanup if an external service is not serving our shuffle
files.
val deleteFilesOnStop =
- !externalShuffleServiceEnabled || executorId ==
SparkContext.DRIVER_IDENTIFIER
- new DiskBlockManager(conf, deleteFilesOnStop)
+ !externalShuffleServiceEnabled || isDriver
+ new DiskBlockManager(conf, deleteFilesOnStop, isDriver)
Review comment:
Nit: can you name the adjacent boolean parameters being passed here?
e.g.:
```suggestion
new DiskBlockManager(conf, deleteFilesOnStop = deleteFilesOnStop,
isDriver = isDriver)
```
I think this suggestion applies to a couple of other call-sites, too.
This practice helps to avoid mixups of positional parameters, especially
when resolving potential merge conflicts in the future.
##########
File path: core/src/main/scala/org/apache/spark/MapOutputTracker.scala
##########
@@ -1126,7 +1126,7 @@ private[spark] class MapOutputTrackerWorker(conf:
SparkConf) extends MapOutputTr
val mergeStatuses: Map[Int, Array[MergeStatus]] =
new ConcurrentHashMap[Int, Array[MergeStatus]]().asScala
- private val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf)
+ private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf,
isDriver = false)
Review comment:
Maybe add a comment to explain why this is lazy?
```suggestion
// This must be lazy to ensure that it is initialized when the first task
is run and not at
// executor startup time. At startup time, user-added libraries may not
have been
// downloaded to the executor, causing `isPushBasedShuffleEnabled` to fail
when it tries to
// instantiate a serializer. See the followup to SPARK-36705 for more
details.
private lazy val fetchMergeResult = Utils.isPushBasedShuffleEnabled(conf,
isDriver = false)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]