prakharjain09 commented on a change in pull request #27864:
URL: https://github.com/apache/spark/pull/27864#discussion_r413844899
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1761,6 +1775,58 @@ private[spark] class BlockManager(
blocksToRemove.size
}
+ def decommissionBlockManager(): Unit = {
+ if (!blockManagerDecommissioning) {
+ logInfo("Starting block manager decommissioning process")
+ blockManagerDecommissioning = true
+ decommissionManager = Some(new BlockManagerDecommissionManager)
+ decommissionManager.foreach(_.start())
+ } else {
+ logDebug(s"Block manager already in decommissioning state")
+ }
+ }
+
+ /**
+ * Tries to offload all cached RDD blocks from this BlockManager to peer
BlockManagers
+ * Visible for testing
+ */
+ def offloadRddCacheBlocks(): Unit = {
+ val replicateBlocksInfo =
master.getReplicateInfoForRDDBlocks(blockManagerId)
+
+ if (replicateBlocksInfo.nonEmpty) {
+ logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
+ s"for block manager decommissioning")
+ }
+
+ // Maximum number of storage replication failure which replicateBlock can
handle
+ // before giving up for one block
+ val maxReplicationFailures = conf.get(
+ config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+ // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
+ // so that we end up prioritize them over each other
+ val blocksFailedReplication = replicateBlocksInfo.filterNot {
+ case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
+ val replicatedSuccessfully = replicateBlock(
Review comment:
@holdenk I am not sure how this will behave when multiple executors on
same host machine are decommissioning. And each one of them is doing it in
parallel - may cause some sort of network congestion?
I have replaced code to do it in ThreadPool of size 4. Maybe we should make
this configurable? any suggestions?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]