Github user squito commented on a diff in the pull request:
https://github.com/apache/spark/pull/20203#discussion_r161884916
--- Diff:
core/src/main/scala/org/apache/spark/scheduler/TaskSetBlacklist.scala ---
@@ -128,13 +130,17 @@ private[scheduler] class TaskSetBlacklist(val conf:
SparkConf, val stageId: Int,
}
// Check if enough tasks have failed on the executor to blacklist it
for the entire stage.
- if (execFailures.numUniqueTasksWithFailures >=
MAX_FAILURES_PER_EXEC_STAGE) {
+ val numFailures = execFailures.numUniqueTasksWithFailures
+ if (numFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
if (blacklistedExecs.add(exec)) {
logInfo(s"Blacklisting executor ${exec} for stage $stageId")
// This executor has been pushed into the blacklist for this
stage. Let's check if it
// pushes the whole node into the blacklist.
val blacklistedExecutorsOnNode =
execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
+ val now = clock.getTimeMillis()
+ listenerBus.post(
+ SparkListenerExecutorBlacklistedForStage(now, exec, numFailures,
stageId, stageAttemptId))
if (blacklistedExecutorsOnNode.size >=
MAX_FAILED_EXEC_PER_NODE_STAGE) {
if (blacklistedNodes.add(host)) {
logInfo(s"Blacklisting ${host} for stage $stageId")
--- End diff --
if we're going to do this for executors, we should do it for nodes too. In
the UI, you'd just show for each executor that it was blacklisted for the
stage, I dont think you would need to distinguish whether it was blacklisted
b/c of the entire node, or just the one executor was blacklisted.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]