agrawaldevesh commented on a change in pull request #29211:
URL: https://github.com/apache/spark/pull/29211#discussion_r459787138
##########
File path: core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
##########
@@ -668,7 +668,7 @@ private[deploy] class Worker(
finishedApps += id
maybeCleanupApplication(id)
- case DecommissionSelf =>
+ case WorkerDecommission(_, _) =>
Review comment:
nit: Should this be DecommissionWorker ? That sounds more like a command
to me.
Whereas `WorkerDecommissioned` sounds like a state.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -80,6 +79,9 @@ private[spark] class CoarseGrainedExecutorBackend(
*/
private[executor] val taskResources = new mutable.HashMap[Long, Map[String,
ResourceInformation]]
+ // Track our decommissioning status internally.
Review comment:
nit: I think the comment is superfluous: Boolean volatile var flag =>
Got to be used for tracking something. And yeah it is internal.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+ // We can only trust allBlocksMigrated boolean value if there
were no tasks running
+ // since the start of computing it.
+ if (allBlocksMigrated._2 &&
+ (allBlocksMigrated._1 > lastTaskRunningTime)) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver =
true)
+ } else {
+ logInfo("All blocks not yet migrated.")
+ }
+ } else {
+ logInfo("No running tasks, no block migration configured,
stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver = true)
+ }
+ Thread.sleep(sleep_time)
Review comment:
nit: Can this Thread.sleep be unified with the one below ?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
}
}
+ /*
+ * Returns the last migration time and a boolean for if all blocks have
been migrated.
Review comment:
"for if all" -> "denoting if all the" ?
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val allBlocksMigrated = env.blockManager.lastMigrationInfo()
Review comment:
nit: How about unpacking the tuple into two named values to make it
clear ? The _1 and _2 aren't very readable.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+ // We can only trust allBlocksMigrated boolean value if there
were no tasks running
+ // since the start of computing it.
+ if (allBlocksMigrated._2 &&
+ (allBlocksMigrated._1 > lastTaskRunningTime)) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver =
true)
+ } else {
+ logInfo("All blocks not yet migrated.")
Review comment:
Should we print the number of leftover blocks to get a sense of the
burndown ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala
##########
@@ -136,4 +136,7 @@ private[spark] object CoarseGrainedClusterMessages {
// The message to check if `CoarseGrainedSchedulerBackend` thinks the
executor is alive or not.
case class IsExecutorAlive(executorId: String) extends
CoarseGrainedClusterMessage
+
+ // Used to ask an executor to decommission it's self.
Review comment:
it's self -> itself.
Can you perhaps also add that the executor sends this message to itself, or
is that already clear ? The "Used to ask" implies to me that someone else asks
the executor.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -38,6 +38,9 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite
with Matchers {
private val sparkConf = new SparkConf(false)
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, true)
.set(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED, true)
+ // Just replicate blocks as fast as we can during testing, there isn't
another
+ // workload we need to worry about.
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
Review comment:
Should this also be 10 seconds ?
"as fast as we can" does not usually jibe well with "interval of 10 seconds"
:-P.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+ // We can only trust allBlocksMigrated boolean value if there
were no tasks running
+ // since the start of computing it.
+ if (allBlocksMigrated._2 &&
Review comment:
nit: i think this line needn't be split.
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
Review comment:
I think this should be a daemon thread ? Also can you name the thread so
that it shows up in the jstack ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -327,4 +357,28 @@ private[storage] class BlockManagerDecommissioner(
}
logInfo("Stopped storage decommissioner")
}
+
+ /*
+ * Returns the last migration time and a boolean for if all blocks have
been migrated.
+ * If there are any tasks running since that time the boolean may be
incorrect.
+ */
+ private[storage] def lastMigrationInfo(): (Long, Boolean) = {
+ if (stopped || (stoppedRDD && stoppedShuffle)) {
+ (System.nanoTime(), true)
Review comment:
I didn't follow why is System.nanoTime returned here
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
Review comment:
Should we log number of running tasks ? Also, should we bound the number
of loops here and then print what the current iteration is ?
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
##########
@@ -442,6 +442,15 @@ class CoarseGrainedSchedulerBackend(scheduler:
TaskSchedulerImpl, val rpcEnv: Rp
case e: Exception =>
logError(s"Unexpected error during decommissioning ${e.toString}",
e)
}
+ // Send decommission message to the executor (it could have originated
on the executor
+ // but not necessarily.
Review comment:
unterminated parenthesis.
To bring your point out more clearly, should we say: Send a decommission
message again to the executor ? (the use of word 'again')
##########
File path:
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
##########
@@ -277,12 +283,53 @@ private[spark] class CoarseGrainedExecutorBackend(
if (executor != null) {
executor.decommission()
}
- logInfo("Done decommissioning self.")
+ // Shutdown the executor once all tasks are gone & any configured
migrations completed.
+ // Detecting migrations completion doesn't need to be perfect and we
want to minimize the
+ // overhead for executors that are not in decommissioning state as
overall that will be
+ // more of the executors. For example, this will not catch a block which
is already in
+ // the process of being put from a remote executor before migration
starts. This trade-off
+ // is viewed as acceptable to minimize introduction of any new locking
structures in critical
+ // code paths.
+
+ val shutdownThread = new Thread() {
+ var lastTaskRunningTime = System.nanoTime()
+ val sleep_time = 1000 // 1s
+
+ while (true) {
+ logInfo("Checking to see if we can shutdown.")
+ if (executor == null || executor.numRunningTasks == 0) {
+ if (env.conf.get(STORAGE_DECOMMISSION_ENABLED)) {
+ logInfo("No running tasks, checking migrations")
+ val allBlocksMigrated = env.blockManager.lastMigrationInfo()
+ // We can only trust allBlocksMigrated boolean value if there
were no tasks running
+ // since the start of computing it.
+ if (allBlocksMigrated._2 &&
+ (allBlocksMigrated._1 > lastTaskRunningTime)) {
+ logInfo("No running tasks, all blocks migrated, stopping.")
+ exitExecutor(0, "Finished decommissioning", notifyDriver =
true)
Review comment:
Will exitExecutor "break" the loop or should we put in an explicit
"break" right after it to exit the thread ? Both here and below.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -88,5 +92,6 @@ class BlockManagerDecommissionUnitSuite extends SparkFunSuite
with Matchers {
} finally {
bmDecomManager.stop()
}
+ bmDecomManager.stop()
Review comment:
Why do we need this ? The finally block above should run right ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -91,10 +98,12 @@ private[storage] class BlockManagerDecommissioner(
null)// class tag, we don't need for shuffle
logDebug(s"Migrated sub block ${blockId}")
}
- logInfo(s"Migrated ${shuffleBlockInfo} to ${peer}")
+ logDebug(s"Migrated ${shuffleBlockInfo} to ${peer}")
} else {
logError(s"Skipping block ${shuffleBlockInfo} because it has
failed ${retryCount}")
}
+ logInfo(s"Migrated ${shuffleBlockInfo}")
Review comment:
Is it intentional that 'peer' was left out in this log message when
moved from line 94 above ?
##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -1822,6 +1822,14 @@ private[spark] class BlockManager(
}
}
+ /*
+ * Returns the last migration time and a boolean for if all blocks have
been migrated.
+ * If there are any tasks running since that time the boolean may be
incorrect.
+ */
+ private[spark] def lastMigrationInfo(): (Long, Boolean) = {
Review comment:
What do you think of directly exposing the "BlocksMigrated" and
"BlocksToMigrate" instead of a single boolean ? I think it will provide more
debugging value than a single flag.
ie, instead of returning "BlocksMigrated >= BlocksToMigrate", return the two
counters directly.
Or maybe this may be a lot more work because you want to track this for both
shuffle as well as persisted blocks.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -115,12 +124,19 @@ private[storage] class BlockManagerDecommissioner(
// Shuffles which are either in queue for migrations or migrated
private val migratingShuffles = mutable.HashSet[ShuffleBlockInfo]()
+ // Shuffles which have migrated. This used to know when we are "done", being
done can change
+ // if a new shuffle file is created by a running task.
+ private val numMigratedShuffles = new AtomicInteger(0)
+
// Shuffles which are queued for migration & number of retries so far.
+ // Visible in storage for testing.
private[storage] val shufflesToMigrate =
new java.util.concurrent.ConcurrentLinkedQueue[(ShuffleBlockInfo, Int)]()
// Set if we encounter an error attempting to migrate and stop.
@volatile private var stopped = false
+ @volatile private var stoppedRDD = false
Review comment:
Should we initialize this to
`!conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)` ?
This may obviate the change to `migrateBlock` below.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionUnitSuite.scala
##########
@@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.duration._
import org.mockito.{ArgumentMatchers => mc}
-import org.mockito.Mockito.{mock, times, verify, when}
+import org.mockito.Mockito.{atLeast => least, mock, times, verify, when}
Review comment:
Why is atLeast renamed as least ?
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -41,6 +42,12 @@ private[storage] class BlockManagerDecommissioner(
private val maxReplicationFailuresForDecommission =
conf.get(config.STORAGE_DECOMMISSION_MAX_REPLICATION_FAILURE_PER_BLOCK)
+ // Used for tracking if our migrations are complete.
+ @volatile private var lastRDDMigrationTime: Long = 0
+ @volatile private var lastShuffleMigrationTime: Long = 0
+ @volatile private var rddBlocksLeft: Boolean = true
+ @volatile private var shuffleBlocksLeft: Boolean = true
+
Review comment:
Should we fold all these into a separate class with mutable fields such
that we can simply return a clone of it when `lastMigrationInfo` ?
I am not sure if it is worth the trouble, since it is only for info logging:
In case we notice an executor that is not going away.
##########
File path:
core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
##########
@@ -59,7 +59,7 @@ class BlockManagerDecommissionIntegrationSuite extends
SparkFunSuite with LocalS
.set(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED, shuffle)
// Just replicate blocks as fast as we can during testing, there isn't
another
// workload we need to worry about.
- .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 1L)
+ .set(config.STORAGE_DECOMMISSION_REPLICATION_REATTEMPT_INTERVAL, 10L)
Review comment:
Why was this changed to 10 seconds ? If it is related to this PR (as
opposed to just reducing flakyness), then please document that.
##########
File path:
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala
##########
@@ -133,22 +149,24 @@ private[storage] class BlockManagerDecommissioner(
override def run(): Unit = {
assert(conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED))
- while (!stopped && !Thread.interrupted()) {
+ while (!stopped && !stoppedRDD && !Thread.interrupted()) {
logInfo("Iterating on migrating from the block manager.")
try {
+ val startTime = System.nanoTime()
logDebug("Attempting to replicate all cached RDD blocks")
- decommissionRddCacheBlocks()
+ rddBlocksLeft = decommissionRddCacheBlocks()
+ lastRDDMigrationTime = startTime
logInfo("Attempt to replicate all cached blocks done")
logInfo(s"Waiting for ${sleepInterval} before refreshing
migrations.")
Thread.sleep(sleepInterval)
} catch {
case e: InterruptedException =>
- logInfo("Interrupted during migration, will not refresh
migrations.")
- stopped = true
+ logInfo("Interrupted during RDD migration, stopping")
Review comment:
Thanks for changing this to 'stopping'. "will not refresh migrations"
was not clear. :-)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]