holdenk commented on a change in pull request #31102:
URL: https://github.com/apache/spark/pull/31102#discussion_r555351231
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -66,86 +66,90 @@ private[storage] class BlockManagerDecommissioner(
* the chance of migrating all shuffle blocks before the executor is forced
to exit.
*/
private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
- @volatile var running = true
+ @volatile var keepRunning = true
+
+ def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): Boolean =
{
+ if (failureNum < maxReplicationFailuresForDecommission) {
+ logInfo(s"Add $shuffleBlock back to migration queue for " +
+ s"retry ($failureNum / $maxReplicationFailuresForDecommission)")
+ // The block needs to retry so we should not mark it as finished
+ shufflesToMigrate.add((shuffleBlock, failureNum))
+ } else {
+ logWarning(s"Give up migrating $shuffleBlock since it's been " +
+ s"failed for $maxReplicationFailuresForDecommission times")
+ false
+ }
+ }
+
override def run(): Unit = {
- var migrating: Option[(ShuffleBlockInfo, Int)] = None
- logInfo(s"Starting migration thread for ${peer}")
+ logInfo(s"Starting shuffle block migration thread for $peer")
// Once a block fails to transfer to an executor stop trying to transfer
more blocks
- try {
- while (running && !Thread.interrupted()) {
- migrating = Option(shufflesToMigrate.poll())
- migrating match {
- case None =>
- logDebug("Nothing to migrate")
- // Nothing to do right now, but maybe a transfer will fail or a
new block
- // will finish being committed.
- val SLEEP_TIME_SECS = 1
- Thread.sleep(SLEEP_TIME_SECS * 1000L)
- case Some((shuffleBlockInfo, retryCount)) =>
- if (retryCount < maxReplicationFailuresForDecommission) {
- val blocks =
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
- if (blocks.isEmpty) {
- logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
+ while (keepRunning) {
+ try {
+ val (shuffleBlockInfo, retryCount) = shufflesToMigrate.take()
Review comment:
This blocks and could leave us with stuck threads at exit time.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -66,86 +66,90 @@ private[storage] class BlockManagerDecommissioner(
* the chance of migrating all shuffle blocks before the executor is forced
to exit.
*/
private class ShuffleMigrationRunnable(peer: BlockManagerId) extends
Runnable {
- @volatile var running = true
+ @volatile var keepRunning = true
+
+ def allowRetry(shuffleBlock: ShuffleBlockInfo, failureNum: Int): Boolean =
{
+ if (failureNum < maxReplicationFailuresForDecommission) {
+ logInfo(s"Add $shuffleBlock back to migration queue for " +
+ s"retry ($failureNum / $maxReplicationFailuresForDecommission)")
+ // The block needs to retry so we should not mark it as finished
+ shufflesToMigrate.add((shuffleBlock, failureNum))
+ } else {
+ logWarning(s"Give up migrating $shuffleBlock since it's been " +
+ s"failed for $maxReplicationFailuresForDecommission times")
+ false
+ }
+ }
+
override def run(): Unit = {
- var migrating: Option[(ShuffleBlockInfo, Int)] = None
- logInfo(s"Starting migration thread for ${peer}")
+ logInfo(s"Starting shuffle block migration thread for $peer")
// Once a block fails to transfer to an executor stop trying to transfer
more blocks
- try {
- while (running && !Thread.interrupted()) {
- migrating = Option(shufflesToMigrate.poll())
- migrating match {
- case None =>
- logDebug("Nothing to migrate")
- // Nothing to do right now, but maybe a transfer will fail or a
new block
- // will finish being committed.
- val SLEEP_TIME_SECS = 1
- Thread.sleep(SLEEP_TIME_SECS * 1000L)
- case Some((shuffleBlockInfo, retryCount)) =>
- if (retryCount < maxReplicationFailuresForDecommission) {
- val blocks =
bm.migratableResolver.getMigrationBlocks(shuffleBlockInfo)
- if (blocks.isEmpty) {
- logInfo(s"Ignore empty shuffle block $shuffleBlockInfo")
+ while (keepRunning) {
Review comment:
Can you explain why you no longer check if the thread is interrupted.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -154,7 +158,7 @@ private[storage] class BlockManagerDecommissioner(
// Shuffles which are queued for migration & number of retries so far.
// Visible in storage for testing.
private[storage] val shufflesToMigrate =
- new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
+ new java.util.concurrent.LinkedBlockingQueue[(ShuffleBlockInfo, Int)]()
Review comment:
This seems like it's adding locking overhead compared to the
concurrentlinkedqueue, can you elaborate on this choice?
With ConcurrentLinkedQueue the producers have contention and the consumers
have no contention (from
https://www.baeldung.com/java-queue-linkedblocking-concurrentlinked ) whereas
with the LinkedBlockingQueue the consumers are going to have lock contention
more generally I believe.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
- private lazy val rddBlockMigrationExecutor =
- ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+ private val rddBlockMigrationExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
+ } else None
private val rddBlockMigrationRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !stoppedRDD && !Thread.interrupted()) {
- logInfo("Iterating on migrating from the block manager.")
- // Validate we have peers to migrate to.
- val peers = bm.getPeers(false)
- // If we have no peers give up.
- if (peers.isEmpty) {
- stopped = true
+ logInfo("Attempting to migrate all RDD blocks")
+ while (!stopped && !stoppedRDD) {
+ // Validate if we have peers to migrate to. Otherwise, give up
migration.
+ if (bm.getPeers(false).isEmpty) {
+ logWarning("No available peers to receive RDD blocks, stop
migration.")
stoppedRDD = true
- }
- try {
- val startTime = System.nanoTime()
- logDebug("Attempting to replicate all cached RDD blocks")
- rddBlocksLeft = decommissionRddCacheBlocks()
- lastRDDMigrationTime = startTime
- logInfo("Attempt to replicate all cached blocks done")
- logInfo(s"Waiting for ${sleepInterval} before refreshing
migrations.")
- Thread.sleep(sleepInterval)
- } catch {
- case e: InterruptedException =>
- logInfo("Interrupted during RDD migration, stopping")
- stoppedRDD = true
- case NonFatal(e) =>
- logError("Error occurred replicating RDD for block manager
decommissioning.",
- e)
- stoppedRDD = true
+ } else {
+ try {
+ val startTime = System.nanoTime()
+ logInfo("Attempting to migrate all cached RDD blocks")
+ rddBlocksLeft = decommissionRddCacheBlocks()
+ lastRDDMigrationTime = startTime
+ logInfo(s"Finished current round RDD blocks migration, " +
+ s"waiting for ${sleepInterval}ms before the next round
migration.")
+ Thread.sleep(sleepInterval)
+ } catch {
+ case _: InterruptedException if stopped =>
+ logInfo("Stop RDD blocks migration.")
+ case NonFatal(e) =>
+ logError("Error occurred during RDD blocks migration.", e)
+ stoppedRDD = true
+ }
}
}
}
}
- private lazy val shuffleBlockMigrationRefreshExecutor =
-
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle")
+ private val shuffleBlockMigrationRefreshExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle"))
+ } else None
private val shuffleBlockMigrationRefreshRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
- while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
+ logInfo("Attempting to migrate all shuffle blocks")
+ while (!stopped && !stoppedShuffle) {
Review comment:
Same concern for thread interruption
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
- private lazy val rddBlockMigrationExecutor =
- ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+ private val rddBlockMigrationExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
+ } else None
private val rddBlockMigrationRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !stoppedRDD && !Thread.interrupted()) {
- logInfo("Iterating on migrating from the block manager.")
- // Validate we have peers to migrate to.
- val peers = bm.getPeers(false)
- // If we have no peers give up.
- if (peers.isEmpty) {
- stopped = true
+ logInfo("Attempting to migrate all RDD blocks")
+ while (!stopped && !stoppedRDD) {
+ // Validate if we have peers to migrate to. Otherwise, give up
migration.
+ if (bm.getPeers(false).isEmpty) {
+ logWarning("No available peers to receive RDD blocks, stop
migration.")
stoppedRDD = true
- }
- try {
- val startTime = System.nanoTime()
- logDebug("Attempting to replicate all cached RDD blocks")
- rddBlocksLeft = decommissionRddCacheBlocks()
- lastRDDMigrationTime = startTime
- logInfo("Attempt to replicate all cached blocks done")
- logInfo(s"Waiting for ${sleepInterval} before refreshing
migrations.")
- Thread.sleep(sleepInterval)
- } catch {
- case e: InterruptedException =>
- logInfo("Interrupted during RDD migration, stopping")
- stoppedRDD = true
- case NonFatal(e) =>
- logError("Error occurred replicating RDD for block manager
decommissioning.",
- e)
- stoppedRDD = true
+ } else {
+ try {
+ val startTime = System.nanoTime()
+ logInfo("Attempting to migrate all cached RDD blocks")
+ rddBlocksLeft = decommissionRddCacheBlocks()
+ lastRDDMigrationTime = startTime
+ logInfo(s"Finished current round RDD blocks migration, " +
+ s"waiting for ${sleepInterval}ms before the next round
migration.")
+ Thread.sleep(sleepInterval)
+ } catch {
+ case _: InterruptedException if stopped =>
+ logInfo("Stop RDD blocks migration.")
+ case NonFatal(e) =>
+ logError("Error occurred during RDD blocks migration.", e)
+ stoppedRDD = true
+ }
}
}
}
}
- private lazy val shuffleBlockMigrationRefreshExecutor =
-
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle")
+ private val shuffleBlockMigrationRefreshExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle"))
+ } else None
private val shuffleBlockMigrationRefreshRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED))
- while (!stopped && !stoppedShuffle && !Thread.interrupted()) {
+ logInfo("Attempting to migrate all shuffle blocks")
+ while (!stopped && !stoppedShuffle) {
try {
- logDebug("Attempting to replicate all shuffle blocks")
val startTime = System.nanoTime()
- shuffleBlocksLeft = refreshOffloadingShuffleBlocks()
+ shuffleBlocksLeft = refreshMigratableShuffleBlocks()
lastShuffleMigrationTime = startTime
- logInfo("Done starting workers to migrate shuffle blocks")
+ logInfo(s"Finished current round refreshing migratable shuffle
blocks, " +
+ s"waiting for ${sleepInterval}ms before the next round
refreshing.")
Thread.sleep(sleepInterval)
} catch {
- case e: InterruptedException =>
- logInfo("Interrupted during migration, will not refresh
migrations.")
- stoppedShuffle = true
+ case _: InterruptedException if stopped =>
Review comment:
...?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
- private lazy val rddBlockMigrationExecutor =
- ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+ private val rddBlockMigrationExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
+ } else None
private val rddBlockMigrationRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !stoppedRDD && !Thread.interrupted()) {
- logInfo("Iterating on migrating from the block manager.")
- // Validate we have peers to migrate to.
- val peers = bm.getPeers(false)
- // If we have no peers give up.
- if (peers.isEmpty) {
- stopped = true
+ logInfo("Attempting to migrate all RDD blocks")
+ while (!stopped && !stoppedRDD) {
+ // Validate if we have peers to migrate to. Otherwise, give up
migration.
+ if (bm.getPeers(false).isEmpty) {
+ logWarning("No available peers to receive RDD blocks, stop
migration.")
stoppedRDD = true
- }
- try {
- val startTime = System.nanoTime()
- logDebug("Attempting to replicate all cached RDD blocks")
- rddBlocksLeft = decommissionRddCacheBlocks()
- lastRDDMigrationTime = startTime
- logInfo("Attempt to replicate all cached blocks done")
- logInfo(s"Waiting for ${sleepInterval} before refreshing
migrations.")
- Thread.sleep(sleepInterval)
- } catch {
- case e: InterruptedException =>
- logInfo("Interrupted during RDD migration, stopping")
- stoppedRDD = true
- case NonFatal(e) =>
- logError("Error occurred replicating RDD for block manager
decommissioning.",
- e)
- stoppedRDD = true
+ } else {
+ try {
+ val startTime = System.nanoTime()
+ logInfo("Attempting to migrate all cached RDD blocks")
+ rddBlocksLeft = decommissionRddCacheBlocks()
+ lastRDDMigrationTime = startTime
+ logInfo(s"Finished current round RDD blocks migration, " +
+ s"waiting for ${sleepInterval}ms before the next round
migration.")
+ Thread.sleep(sleepInterval)
+ } catch {
+ case _: InterruptedException if stopped =>
+ logInfo("Stop RDD blocks migration.")
+ case NonFatal(e) =>
+ logError("Error occurred during RDD blocks migration.", e)
+ stoppedRDD = true
+ }
}
}
}
}
- private lazy val shuffleBlockMigrationRefreshExecutor =
-
ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-shuffle")
+ private val shuffleBlockMigrationRefreshExecutor =
Review comment:
Why switching from lazy val to option?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
- private lazy val rddBlockMigrationExecutor =
- ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+ private val rddBlockMigrationExecutor =
Review comment:
I don't understand what this refactoring gives us.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -166,67 +170,65 @@ private[storage] class BlockManagerDecommissioner(
private val migrationPeers =
mutable.HashMap[BlockManagerId, ShuffleMigrationRunnable]()
- private lazy val rddBlockMigrationExecutor =
- ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd")
+ private val rddBlockMigrationExecutor =
+ if (conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)) {
+
Some(ThreadUtils.newDaemonSingleThreadExecutor("block-manager-decommission-rdd"))
+ } else None
private val rddBlockMigrationRunnable = new Runnable {
val sleepInterval =
conf.get(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL)
override def run(): Unit = {
- assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !stoppedRDD && !Thread.interrupted()) {
- logInfo("Iterating on migrating from the block manager.")
- // Validate we have peers to migrate to.
- val peers = bm.getPeers(false)
- // If we have no peers give up.
- if (peers.isEmpty) {
- stopped = true
+ logInfo("Attempting to migrate all RDD blocks")
+ while (!stopped && !stoppedRDD) {
Review comment:
Similar concerns above around not checking thread interruptions.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -361,37 +350,28 @@ private[storage] class BlockManagerDecommissioner(
stopped = true
}
try {
- rddBlockMigrationExecutor.shutdown()
- } catch {
- case e: Exception =>
- logError(s"Error during shutdown", e)
- }
- try {
- shuffleBlockMigrationRefreshExecutor.shutdown()
- } catch {
- case e: Exception =>
- logError(s"Error during shutdown", e)
- }
- try {
- stopOffloadingShuffleBlocks()
+ rddBlockMigrationExecutor.foreach(_.shutdownNow())
} catch {
- case e: Exception =>
- logError(s"Error during shutdown", e)
+ case NonFatal(e) =>
+ logError(s"Error during shutdown RDD block migration thread", e)
}
- logInfo("Forcing block migrations threads to stop")
- try {
- rddBlockMigrationExecutor.shutdownNow()
- } catch {
- case e: Exception =>
- logError(s"Error during shutdown", e)
- }
- try {
- shuffleBlockMigrationRefreshExecutor.shutdownNow()
- } catch {
- case e: Exception =>
- logError(s"Error during shutdown", e)
+ shuffleBlockMigrationRefreshExecutor.foreach { executor =>
+ try {
+ executor.shutdownNow()
+ } catch {
+ case NonFatal(e) =>
+ logError(s"Error during shutdown shuffle block refreshing thread", e)
+ }
+ try {
+ // invoke inside `foreach` so we can avoid initiating
+ // `shuffleMigrationPool` when it's unnecessary
+ stopMigratingShuffleBlocks()
Review comment:
What about if you just check if shuffle block migrations are configured?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]