spark git commit: [SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

2018-04-09 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 bf1dabede -> 0f2aabc6b


[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid 

Closes #20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb8233e6ac838dd4f053052c8556f5b54bd)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0f2aabc6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0f2aabc6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0f2aabc6

Branch: refs/heads/branch-2.3
Commit: 0f2aabc6bc64d2b5d46e59525111bd95fcd73610
Parents: bf1dabe
Author: Imran Rashid 
Authored: Mon Apr 9 11:31:21 2018 -0700
Committer: Marcelo Vanzin 
Committed: Mon Apr 9 11:31:39 2018 -0700

--
 .../org/apache/spark/executor/Executor.scala| 26 +++---
 .../apache/spark/executor/ExecutorSuite.scala   | 92 
 2 files changed, 88 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0f2aabc6/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 2c3a8ef..a9c31c7 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -480,6 +480,19 @@ private[spark] class Executor(
 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
 
   } catch {
+case t: TaskKilledException =>
+  logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
+  setTaskFinishedAndClearInterruptStatus()
+  execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
+
+case _: InterruptedException | NonFatal(_) if
+task != null && task.reasonIfKilled.isDefined =>
+  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
+  logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
+  setTaskFinishedAndClearInterruptStatus()
+  execBackend.statusUpdate(
+taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))
+
 case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
   val reason = task.context.fetchFailed.get.toTaskFailedReason
   if (!t.isInstanceOf[FetchFailedException]) {
@@ -494,19 +507,6 @@ private[spark] class Executor(
   setTaskFinishedAndClearInterruptStatus()
   execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 
-case t: TaskKilledException =>
-  logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
-
-case _: InterruptedException | NonFatal(_) if
-task != null && task.reasonIfKilled.isDefined =>
-  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
-  logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(
-taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))
-
 case CausedBy(cDE: CommitDeniedException) =>
   val reason = cDE.toTaskCommitDeniedReason
   setTaskFinishedAndClearInterruptStatus()

http://git-wip-us.apache.org/repos/asf/spark/blob/0f2aabc6/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 105a178..1a7bebe 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -22,6 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.ByteBuffer
 

spark git commit: [SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

2018-04-09 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master d81f29eca -> 10f45bb82


[SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid 

Closes #20987 from squito/SPARK-23816.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/10f45bb8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/10f45bb8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/10f45bb8

Branch: refs/heads/master
Commit: 10f45bb8233e6ac838dd4f053052c8556f5b54bd
Parents: d81f29e
Author: Imran Rashid 
Authored: Mon Apr 9 11:31:21 2018 -0700
Committer: Marcelo Vanzin 
Committed: Mon Apr 9 11:31:21 2018 -0700

--
 .../org/apache/spark/executor/Executor.scala| 26 +++---
 .../apache/spark/executor/ExecutorSuite.scala   | 92 
 2 files changed, 88 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/10f45bb8/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index dcec3ec..c325222 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -480,6 +480,19 @@ private[spark] class Executor(
 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
 
   } catch {
+case t: TaskKilledException =>
+  logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
+  setTaskFinishedAndClearInterruptStatus()
+  execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
+
+case _: InterruptedException | NonFatal(_) if
+task != null && task.reasonIfKilled.isDefined =>
+  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
+  logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
+  setTaskFinishedAndClearInterruptStatus()
+  execBackend.statusUpdate(
+taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))
+
 case t: Throwable if hasFetchFailure && !Utils.isFatalError(t) =>
   val reason = task.context.fetchFailed.get.toTaskFailedReason
   if (!t.isInstanceOf[FetchFailedException]) {
@@ -494,19 +507,6 @@ private[spark] class Executor(
   setTaskFinishedAndClearInterruptStatus()
   execBackend.statusUpdate(taskId, TaskState.FAILED, 
ser.serialize(reason))
 
-case t: TaskKilledException =>
-  logInfo(s"Executor killed $taskName (TID $taskId), reason: 
${t.reason}")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(taskId, TaskState.KILLED, 
ser.serialize(TaskKilled(t.reason)))
-
-case _: InterruptedException | NonFatal(_) if
-task != null && task.reasonIfKilled.isDefined =>
-  val killReason = task.reasonIfKilled.getOrElse("unknown reason")
-  logInfo(s"Executor interrupted and killed $taskName (TID $taskId), 
reason: $killReason")
-  setTaskFinishedAndClearInterruptStatus()
-  execBackend.statusUpdate(
-taskId, TaskState.KILLED, ser.serialize(TaskKilled(killReason)))
-
 case CausedBy(cDE: CommitDeniedException) =>
   val reason = cDE.toTaskCommitDeniedReason
   setTaskFinishedAndClearInterruptStatus()

http://git-wip-us.apache.org/repos/asf/spark/blob/10f45bb8/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
--
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala 
b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 105a178..1a7bebe 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -22,6 +22,7 @@ import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent.{CountDownLatch, TimeUnit}
+import