spark git commit: [SPARK-13465] Add a task failure listener to TaskContext
Repository: spark Updated Branches: refs/heads/branch-1.6 fedb81360 -> 1ce2c1235 [SPARK-13465] Add a task failure listener to TaskContext ## What changes were proposed in this pull request? TaskContext supports task completion callback, which gets called regardless of task failures. However, there is no way for the listener to know if there is an error. This patch adds a new listener that gets called when a task fails. ## How was this patch tested? New unit test case and integration test case covering the code path Author: Davies LiuCloses #11478 from davies/add_failure_1.6. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ce2c123 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ce2c123 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ce2c123 Branch: refs/heads/branch-1.6 Commit: 1ce2c123565630914cdcafe6549582843047d874 Parents: fedb813 Author: Davies Liu Authored: Thu Mar 3 08:43:38 2016 -0800 Committer: Davies Liu Committed: Thu Mar 3 08:43:38 2016 -0800 -- .../scala/org/apache/spark/TaskContext.scala| 28 +++- .../org/apache/spark/TaskContextImpl.scala | 33 -- .../scala/org/apache/spark/scheduler/Task.scala | 5 ++ .../spark/util/TaskCompletionListener.scala | 33 -- .../util/TaskCompletionListenerException.scala | 34 -- .../org/apache/spark/util/taskListeners.scala | 68 .../spark/JavaTaskCompletionListenerImpl.java | 39 --- .../spark/JavaTaskContextCompileCheck.java | 30 + .../spark/scheduler/TaskContextSuite.scala | 52 +-- project/MimaExcludes.scala | 4 +- 10 files changed, 205 insertions(+), 121 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index af558d6..190c8ea 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source -import org.apache.spark.util.TaskCompletionListener +import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener} object TaskContext { @@ -108,6 +108,8 @@ abstract class TaskContext extends Serializable { * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. */ def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext @@ -115,8 +117,30 @@ abstract class TaskContext extends Serializable { * Adds a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situations - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. + */ + def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { +addTaskCompletionListener(new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = f(context) +}) + } + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext + def addTaskFailureListener(listener: TaskFailureListener): TaskContext + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. + */ + def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = { +addTaskFailureListener(new TaskFailureListener { + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error) +}) + } /** * Adds a callback function to be executed on task completion. An example use http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContextImpl.scala -- diff --git
spark git commit: [SPARK-13465] Add a task failure listener to TaskContext
Repository: spark Updated Branches: refs/heads/master 0598a2b81 -> 391755dc6 [SPARK-13465] Add a task failure listener to TaskContext ## What changes were proposed in this pull request? TaskContext supports task completion callback, which gets called regardless of task failures. However, there is no way for the listener to know if there is an error. This patch adds a new listener that gets called when a task fails. ## How was the this patch tested? New unit test case and integration test case covering the code path Author: Reynold XinCloses #11340 from rxin/SPARK-13465. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/391755dc Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/391755dc Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/391755dc Branch: refs/heads/master Commit: 391755dc6ed2e156b8df8a530ac8df6ed7ba7f8a Parents: 0598a2b Author: Reynold Xin Authored: Fri Feb 26 12:49:16 2016 -0800 Committer: Davies Liu Committed: Fri Feb 26 12:49:16 2016 -0800 -- .../scala/org/apache/spark/TaskContext.scala| 28 +++- .../org/apache/spark/TaskContextImpl.scala | 33 -- .../scala/org/apache/spark/scheduler/Task.scala | 5 ++ .../spark/util/TaskCompletionListener.scala | 33 -- .../util/TaskCompletionListenerException.scala | 34 -- .../org/apache/spark/util/taskListeners.scala | 68 .../spark/JavaTaskCompletionListenerImpl.java | 39 --- .../spark/JavaTaskContextCompileCheck.java | 30 + .../spark/scheduler/TaskContextSuite.scala | 44 - project/MimaExcludes.scala | 4 +- 10 files changed, 201 insertions(+), 117 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/391755dc/core/src/main/scala/org/apache/spark/TaskContext.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala b/core/src/main/scala/org/apache/spark/TaskContext.scala index 9f49cf1..bfcacbf 100644 --- a/core/src/main/scala/org/apache/spark/TaskContext.scala +++ b/core/src/main/scala/org/apache/spark/TaskContext.scala @@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.memory.TaskMemoryManager import org.apache.spark.metrics.source.Source -import org.apache.spark.util.TaskCompletionListener +import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener} object TaskContext { @@ -106,6 +106,8 @@ abstract class TaskContext extends Serializable { * Adds a (Java friendly) listener to be executed on task completion. * This will be called in all situation - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. */ def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext @@ -113,8 +115,30 @@ abstract class TaskContext extends Serializable { * Adds a listener in the form of a Scala closure to be executed on task completion. * This will be called in all situations - success, failure, or cancellation. * An example use is for HadoopRDD to register a callback to close the input stream. + * + * Exceptions thrown by the listener will result in failure of the task. */ - def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext + def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = { +addTaskCompletionListener(new TaskCompletionListener { + override def onTaskCompletion(context: TaskContext): Unit = f(context) +}) + } + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. + */ + def addTaskFailureListener(listener: TaskFailureListener): TaskContext + + /** + * Adds a listener to be executed on task failure. + * Operations defined here must be idempotent, as `onTaskFailure` can be called multiple times. + */ + def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext = { +addTaskFailureListener(new TaskFailureListener { + override def onTaskFailure(context: TaskContext, error: Throwable): Unit = f(context, error) +}) + } /** * The ID of the stage that this task belong to. http://git-wip-us.apache.org/repos/asf/spark/blob/391755dc/core/src/main/scala/org/apache/spark/TaskContextImpl.scala -- diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala