holdenk commented on a change in pull request #27864:
URL: https://github.com/apache/spark/pull/27864#discussion_r414121587
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1761,6 +1775,58 @@ private[spark] class BlockManager(
blocksToRemove.size
}
+ def decommissionBlockManager(): Unit = {
+ if (!blockManagerDecommissioning) {
+ logInfo("Starting block manager decommissioning process")
+ blockManagerDecommissioning = true
+ decommissionManager = Some(new BlockManagerDecommissionManager)
+ decommissionManager.foreach(_.start())
+ } else {
+ logDebug(s"Block manager already in decommissioning state")
+ }
+ }
+
+ /**
+ * Tries to offload all cached RDD blocks from this BlockManager to peer
BlockManagers
+ * Visible for testing
+ */
+ def offloadRddCacheBlocks(): Unit = {
+ val replicateBlocksInfo =
master.getReplicateInfoForRDDBlocks(blockManagerId)
+
+ if (replicateBlocksInfo.nonEmpty) {
+ logInfo(s"Need to replicate ${replicateBlocksInfo.size} blocks " +
+ s"for block manager decommissioning")
+ }
+
+ // Maximum number of storage replication failure which replicateBlock can
handle
+ // before giving up for one block
+ val maxReplicationFailures = conf.get(
+ config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+
+ // TODO: We can sort these blocks based on some policy (LRU/blockSize etc)
+ // so that we end up prioritize them over each other
+ val blocksFailedReplication = replicateBlocksInfo.filterNot {
+ case ReplicateBlock(blockId, existingReplicas, maxReplicas) =>
+ val replicatedSuccessfully = replicateBlock(
Review comment:
Network congestion is certainly a possibility, I think that for now this
strike a good balance between simple code and avoiding hanging all transfers if
we have one slow target host. We can revisit this in the future if it turns out
we need more control in production environments. Sound good?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]