venkata91 commented on a change in pull request #30164:
URL: https://github.com/apache/spark/pull/30164#discussion_r518291789



##########
File path: 
core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala
##########
@@ -360,6 +371,17 @@ class BlockManagerMasterEndpoint(
 
   }
 
+  private def addMergerLocation(blockManagerId: BlockManagerId): Unit = {
+    if (!shuffleMergerLocations.contains(blockManagerId.host) && 
!blockManagerId.isDriver) {
+      val shuffleServerId = BlockManagerId(blockManagerId.executorId, 
blockManagerId.host,
+        StorageUtils.externalShuffleServicePort(conf))
+      if (shuffleMergerLocations.size >= maxRetainedMergerLocations) {
+        shuffleMergerLocations -= shuffleMergerLocations.head._1

Review comment:
       This would be useful in a cloud based deployment where the nodes keeps 
coming up and going down and so the number of External shuffle services for a 
cluster is not static. 
   
   > Besides, the removed oldest merger may store more merged shuffle data than 
others...would be better if we could remove depends on merged shuffle data 
size...but this should be another topic though..
   
   Agree, the first part of that is to have a pluggable API 
(https://issues.apache.org/jira/browse/SPARK-33329) that way we can come up 
with specific implementation based on cloud/on-prem/ as well as based on 
scheduler backends. This is the basic implementation but eventually we need 
load balancing between shuffle mergers




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to