Repository: spark
Updated Branches:
  refs/heads/master 116c581d2 -> d6e1958a2


[SPARK-23189][CORE][WEB UI] Reflect stage level blacklisting on executor tab

## What changes were proposed in this pull request?

The purpose of this PR to reflect the stage level blacklisting on the executor 
tab for the currently active stages.

After this change in the executor tab at the Status column one of the following 
label will be:

- "Blacklisted" when the executor is blacklisted application level (old flag)
- "Dead" when the executor is not Blacklisted and not Active
- "Blacklisted in Stages: [...]" when the executor is Active but the there are 
active blacklisted stages for the executor. Within the [] coma separated active 
stageIDs are listed.
- "Active" when the executor is Active and there is no active blacklisted 
stages for the executor

## How was this patch tested?

Both with unit tests and manually.

#### Manual test

Spark was started as:

```bash
 bin/spark-shell --master "local-cluster[2,1,1024]" --conf 
"spark.blacklist.enabled=true" --conf 
"spark.blacklist.stage.maxFailedTasksPerExecutor=1" --conf 
"spark.blacklist.application.maxFailedTasksPerExecutor=10"
```

And the job was:
```scala
import org.apache.spark.SparkEnv

val pairs = sc.parallelize(1 to 10000, 10).map { x =>
  if (SparkEnv.get.executorId.toInt == 0) throw new RuntimeException("Bad 
executor")
  else  {
    Thread.sleep(10)
    (x % 10, x)
  }
}

val all = pairs.cogroup(pairs)

all.collect()
```

UI screenshots about the running:

- One executor is blacklisted in the two stages:

![One executor is blacklisted in two 
stages](https://issues.apache.org/jira/secure/attachment/12908314/multiple_stages_1.png)

- One stage completes the other one is still running:

![One stage completes the other is still 
running](https://issues.apache.org/jira/secure/attachment/12908315/multiple_stages_2.png)

- Both stages are completed:

![Both stages are 
completed](https://issues.apache.org/jira/secure/attachment/12908316/multiple_stages_3.png)

### Unit tests

In AppStatusListenerSuite.scala both the node blacklisting for a stage and the 
executor blacklisting for stage are tested.

Author: “attilapiros” <piros.attila.zs...@gmail.com>

Closes #20408 from attilapiros/SPARK-23189.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6e1958a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6e1958a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6e1958a

Branch: refs/heads/master
Commit: d6e1958a2472898e60bd013902c2f35111596e40
Parents: 116c581
Author: “attilapiros” <piros.attila.zs...@gmail.com>
Authored: Tue Feb 13 09:54:52 2018 -0600
Committer: Imran Rashid <iras...@cloudera.com>
Committed: Tue Feb 13 09:54:52 2018 -0600

----------------------------------------------------------------------
 .../org/apache/spark/ui/static/executorspage.js | 21 ++++++---
 .../apache/spark/status/AppStatusListener.scala | 49 +++++++++++++++-----
 .../org/apache/spark/status/LiveEntity.scala    |  7 ++-
 .../org/apache/spark/status/api/v1/api.scala    |  3 +-
 .../executor_list_json_expectation.json         |  3 +-
 .../executor_memory_usage_expectation.json      | 15 ++++--
 .../executor_node_blacklisting_expectation.json | 15 ++++--
 ...blacklisting_unblacklisting_expectation.json | 15 ++++--
 .../spark/status/AppStatusListenerSuite.scala   | 21 +++++++++
 9 files changed, 113 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
----------------------------------------------------------------------
diff --git 
a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js 
b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
index d430d8c..6717af3 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js
@@ -25,12 +25,18 @@ function getThreadDumpEnabled() {
     return threadDumpEnabled;
 }
 
-function formatStatus(status, type) {
+function formatStatus(status, type, row) {
+    if (row.isBlacklisted) {
+        return "Blacklisted";
+    }
+
     if (status) {
-        return "Active"
-    } else {
-        return "Dead"
+        if (row.blacklistedInStages.length == 0) {
+            return "Active"
+        }
+        return "Active (Blacklisted in Stages: [" + 
row.blacklistedInStages.join(", ") + "])";
     }
+    return "Dead"
 }
 
 jQuery.extend(jQuery.fn.dataTableExt.oSort, {
@@ -415,9 +421,10 @@ $(document).ready(function () {
                             }
                         },
                         {data: 'hostPort'},
-                        {data: 'isActive', render: function (data, type, row) {
-                            if (row.isBlacklisted) return "Blacklisted";
-                            else return formatStatus (data, type);
+                        {
+                            data: 'isActive',
+                            render: function (data, type, row) {
+                                return formatStatus (data, type, row);
                             }
                         },
                         {data: 'rddBlocks'},

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
index ab01cdd..79a17e2 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusListener.scala
@@ -213,11 +213,13 @@ private[spark] class AppStatusListener(
 
   override def onExecutorBlacklistedForStage(
       event: SparkListenerExecutorBlacklistedForStage): Unit = {
+    val now = System.nanoTime()
+
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      val now = System.nanoTime()
-      val esummary = stage.executorSummary(event.executorId)
-      esummary.isBlacklisted = true
-      maybeUpdate(esummary, now)
+      setStageBlackListStatus(stage, now, event.executorId)
+    }
+    liveExecutors.get(event.executorId).foreach { exec =>
+      addBlackListedStageTo(exec, event.stageId, now)
     }
   }
 
@@ -226,16 +228,29 @@ private[spark] class AppStatusListener(
 
     // Implicitly blacklist every available executor for the stage associated 
with this node
     Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { 
stage =>
-      liveExecutors.values.foreach { exec =>
-        if (exec.hostname == event.hostId) {
-          val esummary = stage.executorSummary(exec.executorId)
-          esummary.isBlacklisted = true
-          maybeUpdate(esummary, now)
-        }
-      }
+      val executorIds = liveExecutors.values.filter(_.host == 
event.hostId).map(_.executorId).toSeq
+      setStageBlackListStatus(stage, now, executorIds: _*)
+    }
+    liveExecutors.values.filter(_.hostname == event.hostId).foreach { exec =>
+      addBlackListedStageTo(exec, event.stageId, now)
     }
   }
 
+  private def addBlackListedStageTo(exec: LiveExecutor, stageId: Int, now: 
Long): Unit = {
+    exec.blacklistedInStages += stageId
+    liveUpdate(exec, now)
+  }
+
+  private def setStageBlackListStatus(stage: LiveStage, now: Long, 
executorIds: String*): Unit = {
+    executorIds.foreach { executorId =>
+      val executorStageSummary = stage.executorSummary(executorId)
+      executorStageSummary.isBlacklisted = true
+      maybeUpdate(executorStageSummary, now)
+    }
+    stage.blackListedExecutors ++= executorIds
+    maybeUpdate(stage, now)
+  }
+
   override def onExecutorUnblacklisted(event: 
SparkListenerExecutorUnblacklisted): Unit = {
     updateBlackListStatus(event.executorId, false)
   }
@@ -594,12 +609,24 @@ private[spark] class AppStatusListener(
 
       stage.executorSummaries.values.foreach(update(_, now))
       update(stage, now, last = true)
+
+      val executorIdsForStage = stage.blackListedExecutors
+      executorIdsForStage.foreach { executorId =>
+        liveExecutors.get(executorId).foreach { exec =>
+          removeBlackListedStageFrom(exec, event.stageInfo.stageId, now)
+        }
+      }
     }
 
     appSummary = new AppSummary(appSummary.numCompletedJobs, 
appSummary.numCompletedStages + 1)
     kvstore.write(appSummary)
   }
 
+  private def removeBlackListedStageFrom(exec: LiveExecutor, stageId: Int, 
now: Long) = {
+    exec.blacklistedInStages -= stageId
+    liveUpdate(exec, now)
+  }
+
   override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): 
Unit = {
     // This needs to set fields that are already set by onExecutorAdded 
because the driver is
     // considered an "executor" in the UI, but does not have a 
SparkListenerExecutorAdded event.

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala 
b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
index d5f9e19..79e3f13 100644
--- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
+++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala
@@ -20,6 +20,7 @@ package org.apache.spark.status
 import java.util.Date
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.immutable.{HashSet, TreeSet}
 import scala.collection.mutable.HashMap
 
 import com.google.common.collect.Interners
@@ -254,6 +255,7 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
   var totalShuffleRead = 0L
   var totalShuffleWrite = 0L
   var isBlacklisted = false
+  var blacklistedInStages: Set[Int] = TreeSet()
 
   var executorLogs = Map[String, String]()
 
@@ -299,7 +301,8 @@ private class LiveExecutor(val executorId: String, 
_addTime: Long) extends LiveE
       Option(removeTime),
       Option(removeReason),
       executorLogs,
-      memoryMetrics)
+      memoryMetrics,
+      blacklistedInStages)
     new ExecutorSummaryWrapper(info)
   }
 
@@ -371,6 +374,8 @@ private class LiveStage extends LiveEntity {
 
   val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
 
+  var blackListedExecutors = new HashSet[String]()
+
   // Used for cleanup of tasks after they reach the configured limit. Not 
written to the store.
   @volatile var cleaning = false
   var savedTasks = new AtomicInteger(0)

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 550eac3..a333f1a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -95,7 +95,8 @@ class ExecutorSummary private[spark](
     val removeTime: Option[Date],
     val removeReason: Option[String],
     val executorLogs: Map[String, String],
-    val memoryMetrics: Option[MemoryMetrics])
+    val memoryMetrics: Option[MemoryMetrics],
+    val blacklistedInStages: Set[Int])
 
 class MemoryMetrics private[spark](
     val usedOnHeapStorageMemory: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
index 942e6d8..7bb8fe8 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_list_json_expectation.json
@@ -19,5 +19,6 @@
   "isBlacklisted" : false,
   "maxMemory" : 278302556,
   "addTime" : "2015-02-03T16:43:00.906GMT",
-  "executorLogs" : { }
+  "executorLogs" : { },
+  "blacklistedInStages" : [ ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
index ed33c90..dd5b1dc 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_memory_usage_expectation.json
@@ -25,7 +25,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -56,7 +57,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 } ,{
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -87,7 +89,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -118,7 +121,8 @@
     "usedOffHeapStorageMemory": 0,
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -149,5 +153,6 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
index 73519f1..3e55d3d 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_expectation.json
@@ -25,7 +25,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.167:51485",
@@ -56,7 +57,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.167:51487",
@@ -87,7 +89,8 @@
     "usedOffHeapStorageMemory" : 0,
     "totalOnHeapStorageMemory" : 384093388,
     "totalOffHeapStorageMemory" : 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.167:51490",
@@ -118,7 +121,8 @@
     "usedOffHeapStorageMemory": 0,
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.167:51491",
@@ -149,5 +153,6 @@
     "usedOffHeapStorageMemory": 0,
     "totalOnHeapStorageMemory": 384093388,
     "totalOffHeapStorageMemory": 524288000
-  }
+  },
+  "blacklistedInStages" : [ ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
index 6931fea..e87f3e7 100644
--- 
a/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
+++ 
b/core/src/test/resources/HistoryServerExpectations/executor_node_blacklisting_unblacklisting_expectation.json
@@ -19,7 +19,8 @@
   "isBlacklisted" : false,
   "maxMemory" : 384093388,
   "addTime" : "2016-11-15T23:20:38.836GMT",
-  "executorLogs" : { }
+  "executorLogs" : { },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "3",
   "hostPort" : "172.22.0.111:64543",
@@ -44,7 +45,8 @@
   "executorLogs" : {
     "stdout" : 
"http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64521/logPage/?appId=app-20161115172038-0000&executorId=3&logType=stderr";
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "2",
   "hostPort" : "172.22.0.111:64539",
@@ -69,7 +71,8 @@
   "executorLogs" : {
     "stdout" : 
"http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64519/logPage/?appId=app-20161115172038-0000&executorId=2&logType=stderr";
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "1",
   "hostPort" : "172.22.0.111:64541",
@@ -94,7 +97,8 @@
   "executorLogs" : {
     "stdout" : 
"http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64518/logPage/?appId=app-20161115172038-0000&executorId=1&logType=stderr";
-  }
+  },
+  "blacklistedInStages" : [ ]
 }, {
   "id" : "0",
   "hostPort" : "172.22.0.111:64540",
@@ -119,5 +123,6 @@
   "executorLogs" : {
     "stdout" : 
"http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stdout";,
     "stderr" : 
"http://172.22.0.111:64517/logPage/?appId=app-20161115172038-0000&executorId=0&logType=stderr";
-  }
+  },
+  "blacklistedInStages" : [ ]
 } ]

http://git-wip-us.apache.org/repos/asf/spark/blob/d6e1958a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
index b74d6ee..7495027 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusListenerSuite.scala
@@ -273,6 +273,10 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
       assert(exec.info.isBlacklistedForStage === expectedBlacklistedFlag)
     }
 
+    check[ExecutorSummaryWrapper](execIds.head) { exec =>
+      assert(exec.info.blacklistedInStages === Set(stages.head.stageId))
+    }
+
     // Blacklisting node for stage
     time += 1
     listener.onNodeBlacklistedForStage(SparkListenerNodeBlacklistedForStage(
@@ -439,6 +443,10 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
       assert(stage.info.numCompleteTasks === pending.size)
     }
 
+    check[ExecutorSummaryWrapper](execIds.head) { exec =>
+      assert(exec.info.blacklistedInStages === Set())
+    }
+
     // Submit stage 2.
     time += 1
     stages.last.submissionTime = Some(time)
@@ -453,6 +461,19 @@ class AppStatusListenerSuite extends SparkFunSuite with 
BeforeAndAfter {
       assert(stage.info.submissionTime === Some(new 
Date(stages.last.submissionTime.get)))
     }
 
+    // Blacklisting node for stage
+    time += 1
+    listener.onNodeBlacklistedForStage(SparkListenerNodeBlacklistedForStage(
+      time = time,
+      hostId = "1.example.com",
+      executorFailures = 1,
+      stageId = stages.last.stageId,
+      stageAttemptId = stages.last.attemptId))
+
+    check[ExecutorSummaryWrapper](execIds.head) { exec =>
+      assert(exec.info.blacklistedInStages === Set(stages.last.stageId))
+    }
+
     // Start and fail all tasks of stage 2.
     time += 1
     val s2Tasks = createTasks(4, execIds)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to