JoshRosen commented on code in PR #49350:
URL: https://github.com/apache/spark/pull/49350#discussion_r1908127185
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -667,10 +671,19 @@ class BlockManagerMasterEndpoint(
).map(_.flatten.toSeq)
}
- private def externalShuffleServiceIdOnHost(blockManagerId: BlockManagerId):
BlockManagerId = {
+ private def externalShuffleServiceIdOnHost(
+ blockManagerId: BlockManagerId,
+ blockManagerRef: RpcEndpointRef): BlockManagerId = {
+ val essPort = if (LocalSparkCluster.get.isDefined) {
+ blockManagerRef.askSync[Int](GetShuffleServicePort)
+ } else {
+ externalShuffleServicePort
+ }
// we need to keep the executor ID of the original executor to let the
shuffle service know
// which local directories should be used to look for the file
- BlockManagerId(blockManagerId.executorId, blockManagerId.host,
externalShuffleServicePort)
+ val essId = BlockManagerId(blockManagerId.executorId, blockManagerId.host,
essPort)
+ shuffleServiceIdSet.add(essId)
Review Comment:
Guard this with a `if (LocalSparkCluster.get.isDefined)` so that we don't
add to it?
But if we do that we can just factor it out into a `val isLocalClusterEss =
LocalSparkCluster.get.isDefined` and use it in both checks.
##########
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala:
##########
@@ -667,10 +671,19 @@ class BlockManagerMasterEndpoint(
).map(_.flatten.toSeq)
}
- private def externalShuffleServiceIdOnHost(blockManagerId: BlockManagerId):
BlockManagerId = {
+ private def externalShuffleServiceIdOnHost(
+ blockManagerId: BlockManagerId,
+ blockManagerRef: RpcEndpointRef): BlockManagerId = {
+ val essPort = if (LocalSparkCluster.get.isDefined) {
+ blockManagerRef.askSync[Int](GetShuffleServicePort)
+ } else {
+ externalShuffleServicePort
+ }
// we need to keep the executor ID of the original executor to let the
shuffle service know
// which local directories should be used to look for the file
- BlockManagerId(blockManagerId.executorId, blockManagerId.host,
externalShuffleServicePort)
+ val essId = BlockManagerId(blockManagerId.executorId, blockManagerId.host,
essPort)
+ shuffleServiceIdSet.add(essId)
Review Comment:
Guard this with a `if (LocalSparkCluster.get.isDefined)` so that we don't
add to it outside of local mode?
But if we do that we can just factor it out into a `val isLocalClusterEss =
LocalSparkCluster.get.isDefined` and use it in both checks.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]