Ngone51 commented on a change in pull request #28370:
URL: https://github.com/apache/spark/pull/28370#discussion_r425798700
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1829,7 +1901,58 @@ private[spark] class BlockManager(
data.dispose()
}
+ /**
+ * Class to handle block manager decommissioning retries
+ * It creates a Thread to retry offloading all RDD cache blocks
+ */
+ private class BlockManagerDecommissionManager(conf: SparkConf) {
+ @volatile private var stopped = false
+ private val sleepInterval = conf.get(
+ config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
+
+ private val blockReplicationThread = new Thread {
+ override def run(): Unit = {
+ var failures = 0
+ while (blockManagerDecommissioning
+ && !stopped
+ && !Thread.interrupted()
+ && failures < 20) {
+ try {
+ logDebug("Attempting to replicate all cached RDD blocks")
+ decommissionRddCacheBlocks()
+ logInfo("Attempt to replicate all cached blocks done")
+ Thread.sleep(sleepInterval)
+ } catch {
+ case _: InterruptedException =>
+ logInfo("Interrupted during migration, will not refresh
migrations.")
+ stopped = true
+ case NonFatal(e) =>
+ failures += 1
+ logError("Error occurred while trying to replicate cached RDD
blocks" +
+ s" for block manager decommissioning (failure count:
$failures)", e)
+ }
+ }
+ }
+ }
+ blockReplicationThread.setDaemon(true)
+ blockReplicationThread.setName("block-replication-thread")
Review comment:
Use `Runnable` for the decommissioning and `ThreadUtils` to execute the
`Runnable`?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1829,7 +1901,58 @@ private[spark] class BlockManager(
data.dispose()
}
+ /**
+ * Class to handle block manager decommissioning retries
+ * It creates a Thread to retry offloading all RDD cache blocks
+ */
+ private class BlockManagerDecommissionManager(conf: SparkConf) {
Review comment:
Do we really need a wrapped manager class? It seems overkill to me.
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1551,30 +1555,36 @@ private[spark] class BlockManager(
}
/**
- * Called for pro-active replenishment of blocks lost due to executor
failures
+ * Replicates a block to peer block managers based on existingReplicas and
maxReplicas
*
* @param blockId blockId being replicate
* @param existingReplicas existing block managers that have a replica
* @param maxReplicas maximum replicas needed
+ * @param maxReplicationFailures number of replication failures to tolerate
before
+ * giving up.
+ * @return whether block was successfully replicated or not
*/
def replicateBlock(
blockId: BlockId,
existingReplicas: Set[BlockManagerId],
- maxReplicas: Int): Unit = {
+ maxReplicas: Int,
+ maxReplicationFailures: Option[Int] = None): Boolean = {
logInfo(s"Using $blockManagerId to pro-actively replicate $blockId")
- blockInfoManager.lockForReading(blockId).foreach { info =>
+ blockInfoManager.lockForReading(blockId).forall { info =>
Review comment:
use `map`?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1829,7 +1901,58 @@ private[spark] class BlockManager(
data.dispose()
}
+ /**
+ * Class to handle block manager decommissioning retries
+ * It creates a Thread to retry offloading all RDD cache blocks
+ */
+ private class BlockManagerDecommissionManager(conf: SparkConf) {
+ @volatile private var stopped = false
+ private val sleepInterval = conf.get(
+ config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
+
+ private val blockReplicationThread = new Thread {
+ override def run(): Unit = {
+ var failures = 0
+ while (blockManagerDecommissioning
+ && !stopped
+ && !Thread.interrupted()
+ && failures < 20) {
+ try {
+ logDebug("Attempting to replicate all cached RDD blocks")
+ decommissionRddCacheBlocks()
Review comment:
Don't you need to set `stop=true` here?
Or you mean we need to do multiple time `decommissionRddCacheBlocks`? If so,
why we need to do it for multiple times? There should be no rdd blocks change
after decommissioning?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]