dongjoon-hyun commented on a change in pull request #28818:
URL: https://github.com/apache/spark/pull/28818#discussion_r441178546
##########
File path:
core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala
##########
@@ -333,11 +335,26 @@ private[spark] class ExecutorMonitor(
}
override def onBlockUpdated(event: SparkListenerBlockUpdated): Unit = {
- if (!event.blockUpdatedInfo.blockId.isInstanceOf[RDDBlockId]) {
- return
- }
val exec =
ensureExecutorIsTracked(event.blockUpdatedInfo.blockManagerId.executorId,
UNKNOWN_RESOURCE_PROFILE_ID)
+
+ // Check if it is a shuffle file, or RDD to pick the correct codepath for
update
+ if (event.blockUpdatedInfo.blockId.isInstanceOf[ShuffleDataBlockId] &&
shuffleTrackingEnabled) {
+ /**
+ * The executor monitor keeps track of locations of cache and shuffle
blocks and this can be
+ * used to decide which executor(s) Spark should shutdown first. Since
we move shuffle blocks
+ * around now this wires it up so that it keeps track of it. We only do
this for data blocks
+ * as index and other blocks blocks do not necessarily mean the entire
block has been
+ * committed.
+ */
+ event.blockUpdatedInfo.blockId match {
+ case ShuffleDataBlockId(shuffleId, _, _) => exec.addShuffle(shuffleId)
Review comment:
Since we are touching `ExecutorMonitor`, when do we have a counter
operation, `exec.removeShuffle`? In this PR, it seems that `executorsKilled` is
used. Is it enough?
cc @dbtsai
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]