venkata91 commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r518291789
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(
}
+ private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
+ if (!shuffleMergerLocations.contains(blockManagerId.host) &&
!blockManagerId.isDriver) {
+ val shuffleServerId = BlockManagerId(blockManagerId.executorId,
blockManagerId.host,
+ StorageUtils.externalShuffleServicePort(conf))
+ if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
+ shuffleMergerLocations -= shuffleMergerLocations.head._1
Review comment:
This would be useful in a cloud based deployment where the nodes keeps
coming up and going down and so the number of External shuffle services for a
cluster is not static.
> Besides, the removed oldest merger may store more merged shuffle data than
others...would be better if we could remove depends on merged shuffle data
size...but this should be another topic though..
Agree, the first part of that is to have a pluggable API
(https://issues.apache.org/jira/browse/SPARK-33329) that way we can come up
with specific implementation based on cloud/on-prem/ as well as based on
scheduler backends. This is the basic implementation but eventually we need
load balancing between shuffle mergers
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]