mridulm commented on code in PR #43627:
URL: https://github.com/apache/spark/pull/43627#discussion_r1386011282
##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -627,6 +631,7 @@ class SparkContext(config: SparkConf) extends Logging {
}
_ui.foreach(_.setAppId(_applicationId))
_env.blockManager.initialize(_applicationId)
+ _env.blockManager.setShuffleManager(shuffleManager)
FallbackStorage.registerBlockManagerIfNeeded(_env.blockManager.master,
_conf)
Review Comment:
The changes in this class are not related to driver classpath, and so we can
revert all changes here and move the logic to `SparkEnv.create` to initialize
`shuffleManager` when it is `driver`.
##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -185,6 +191,10 @@ class SparkEnv (
releasePythonWorker(
pythonExec, workerModule, PythonWorkerFactory.defaultDaemonModule,
envVars, worker)
}
+
+ private[spark] def setShuffleManager(shuffleManager: ShuffleManager): Unit =
{
+ _shuffleManager = shuffleManager
+ }
Review Comment:
Instead of setting it, expose an initialize method.
In driver, we should directly set the variable as part of create - while we
use `initializeShuffleManager` for executor after classpath has been fixed up
(see more below in comment for `shuffleBlockGetterFn`).
```suggestion
private[spark] def initiailzeShuffleManager(): Unit = {
Preconditions.checkState(null == _shuffleManager, "Shuffle manager
already initialized")
// Must not be driver
Preconditions.checkState(executorId != SparkContext.DRIVER_IDENTIFIER,
"Should not be called in driver")
_shuffleManager = ShuffleManager.create(_conf, false)
}
```
##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -355,16 +370,6 @@ object SparkEnv extends Logging {
new MapOutputTrackerMasterEndpoint(
rpcEnv, mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))
- // Let the user specify short names for shuffle managers
- val shortShuffleMgrNames = Map(
- "sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
- "tungsten-sort" ->
classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
- val shuffleMgrName = conf.get(config.SHUFFLE_MANAGER)
- val shuffleMgrClass =
- shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT),
shuffleMgrName)
- val shuffleManager =
Utils.instantiateSerializerOrShuffleManager[ShuffleManager](
- shuffleMgrClass, conf, isDriver)
-
val memoryManager: MemoryManager = UnifiedMemoryManager(conf,
numUsableCores)
Review Comment:
Instead, do:
`val shuffleManager: ShuffleManager = if (isDriver)
ShuffleManager.create(conf, true) else null`
and keep rest of this method the same.
##########
core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala:
##########
@@ -81,9 +81,10 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
conf, securityMgr, serializerManager, "localhost", "localhost", 0, 1)
val memManager = memoryManager.getOrElse(UnifiedMemoryManager(conf,
numCores = 1))
Review Comment:
With the proposed changes, we can revert all changes to this file
##########
core/src/main/scala/org/apache/spark/executor/Executor.scala:
##########
@@ -329,14 +329,22 @@ private[spark] class Executor(
}
updateDependencies(initialUserFiles, initialUserJars, initialUserArchives,
defaultSessionState)
- // Plugins need to load using a class loader that includes the executor's
user classpath.
- // Plugins also needs to be initialized after the heartbeater started
- // to avoid blocking to send heartbeat (see SPARK-32175).
+ // Plugins and shuffle managers need to load using a class loader that
includes the executor's
+ // user classpath. Plugins also needs to be initialized after the
heartbeater started
+ // to avoid blocking to send heartbeat (see SPARK-32175 and SPARK-45762).
private val plugins: Option[PluginContainer] =
Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
PluginContainer(env, resources.asJava)
}
+ private val shuffleManager =
+ Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
+ ShuffleManager.create(conf, true)
+ }
+
+ env.setShuffleManager(shuffleManager)
+ env.blockManager.setShuffleManager(shuffleManager)
Review Comment:
```suggestion
Utils.withContextClassLoader(defaultSessionState.replClassLoader) {
env.initiailzeShuffleManager()
}
```
I have not tested this, but I think this should work. If it does not, most
of my suggestions will need to be discarded :-)
##########
streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala:
##########
@@ -93,7 +93,8 @@ abstract class
BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val blockManagerInfo = new mutable.HashMap[BlockManagerId,
BlockManagerInfo]()
blockManagerMaster = new
BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
Review Comment:
Here as well, revert all changes.
##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -277,6 +287,11 @@ object SparkEnv extends Logging {
hostname, numCores, ioEncryptionKey, isLocal)
}
+ private def shuffleBlockGetterFn(shuffleId: Int, mapId: Long): Seq[BlockId]
= {
+ val env = SparkEnv.get
+ env.shuffleManager.shuffleBlockResolver.getBlocksForShuffle(shuffleId,
mapId)
+ }
+
Review Comment:
Drop this ?
In `BlockManagerMasterEndpoint`:
* We change constructor to:
`private val _shuffleManager: ShuffleManager,`
* And add a field:
`private lazy val shuffleManager =
Option(_shuffleManager).getOrElse(SparkEnv.get.shuffleManager)`
Do the same for `BlockManager` as well.
See more below in `create`.
##########
core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala:
##########
@@ -143,10 +143,11 @@ class BlockManagerSuite extends SparkFunSuite with
Matchers with PrivateMethodTe
None
}
Review Comment:
Same as with `BlockManagerReplicationSuite`, all changes can be reverted
here as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]