spark git commit: [SPARK-13465] Add a task failure listener to TaskContext

2016-03-03 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 fedb81360 -> 1ce2c1235


[SPARK-13465] Add a task failure listener to TaskContext

## What changes were proposed in this pull request?

TaskContext supports task completion callback, which gets called regardless of 
task failures. However, there is no way for the listener to know if there is an 
error. This patch adds a new listener that gets called when a task fails.

## How was this patch tested?

New unit test case and integration test case covering the code path

Author: Davies Liu 

Closes #11478 from davies/add_failure_1.6.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ce2c123
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ce2c123
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ce2c123

Branch: refs/heads/branch-1.6
Commit: 1ce2c123565630914cdcafe6549582843047d874
Parents: fedb813
Author: Davies Liu 
Authored: Thu Mar 3 08:43:38 2016 -0800
Committer: Davies Liu 
Committed: Thu Mar 3 08:43:38 2016 -0800

--
 .../scala/org/apache/spark/TaskContext.scala| 28 +++-
 .../org/apache/spark/TaskContextImpl.scala  | 33 --
 .../scala/org/apache/spark/scheduler/Task.scala |  5 ++
 .../spark/util/TaskCompletionListener.scala | 33 --
 .../util/TaskCompletionListenerException.scala  | 34 --
 .../org/apache/spark/util/taskListeners.scala   | 68 
 .../spark/JavaTaskCompletionListenerImpl.java   | 39 ---
 .../spark/JavaTaskContextCompileCheck.java  | 30 +
 .../spark/scheduler/TaskContextSuite.scala  | 52 +--
 project/MimaExcludes.scala  |  4 +-
 10 files changed, 205 insertions(+), 121 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index af558d6..190c8ea 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}
 
 
 object TaskContext {
@@ -108,6 +108,8 @@ abstract class TaskContext extends Serializable {
* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input 
stream.
+   *
+   * Exceptions thrown by the listener will result in failure of the task.
*/
   def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
 
@@ -115,8 +117,30 @@ abstract class TaskContext extends Serializable {
* Adds a listener in the form of a Scala closure to be executed on task 
completion.
* This will be called in all situations - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input 
stream.
+   *
+   * Exceptions thrown by the listener will result in failure of the task.
+   */
+  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+addTaskCompletionListener(new TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = f(context)
+})
+  }
+
+  /**
+   * Adds a listener to be executed on task failure.
+   * Operations defined here must be idempotent, as `onTaskFailure` can be 
called multiple times.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
+  def addTaskFailureListener(listener: TaskFailureListener): TaskContext
+
+  /**
+   * Adds a listener to be executed on task failure.
+   * Operations defined here must be idempotent, as `onTaskFailure` can be 
called multiple times.
+   */
+  def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext 
= {
+addTaskFailureListener(new TaskFailureListener {
+  override def onTaskFailure(context: TaskContext, error: Throwable): Unit 
= f(context, error)
+})
+  }
 
   /**
* Adds a callback function to be executed on task completion. An example use

http://git-wip-us.apache.org/repos/asf/spark/blob/1ce2c123/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
--
diff --git 

spark git commit: [SPARK-13465] Add a task failure listener to TaskContext

2016-02-26 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 0598a2b81 -> 391755dc6


[SPARK-13465] Add a task failure listener to TaskContext

## What changes were proposed in this pull request?

TaskContext supports task completion callback, which gets called regardless of 
task failures. However, there is no way for the listener to know if there is an 
error. This patch adds a new listener that gets called when a task fails.

## How was the this patch tested?
New unit test case and integration test case covering the code path

Author: Reynold Xin 

Closes #11340 from rxin/SPARK-13465.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/391755dc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/391755dc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/391755dc

Branch: refs/heads/master
Commit: 391755dc6ed2e156b8df8a530ac8df6ed7ba7f8a
Parents: 0598a2b
Author: Reynold Xin 
Authored: Fri Feb 26 12:49:16 2016 -0800
Committer: Davies Liu 
Committed: Fri Feb 26 12:49:16 2016 -0800

--
 .../scala/org/apache/spark/TaskContext.scala| 28 +++-
 .../org/apache/spark/TaskContextImpl.scala  | 33 --
 .../scala/org/apache/spark/scheduler/Task.scala |  5 ++
 .../spark/util/TaskCompletionListener.scala | 33 --
 .../util/TaskCompletionListenerException.scala  | 34 --
 .../org/apache/spark/util/taskListeners.scala   | 68 
 .../spark/JavaTaskCompletionListenerImpl.java   | 39 ---
 .../spark/JavaTaskContextCompileCheck.java  | 30 +
 .../spark/scheduler/TaskContextSuite.scala  | 44 -
 project/MimaExcludes.scala  |  4 +-
 10 files changed, 201 insertions(+), 117 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/391755dc/core/src/main/scala/org/apache/spark/TaskContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContext.scala 
b/core/src/main/scala/org/apache/spark/TaskContext.scala
index 9f49cf1..bfcacbf 100644
--- a/core/src/main/scala/org/apache/spark/TaskContext.scala
+++ b/core/src/main/scala/org/apache/spark/TaskContext.scala
@@ -23,7 +23,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.memory.TaskMemoryManager
 import org.apache.spark.metrics.source.Source
-import org.apache.spark.util.TaskCompletionListener
+import org.apache.spark.util.{TaskCompletionListener, TaskFailureListener}
 
 
 object TaskContext {
@@ -106,6 +106,8 @@ abstract class TaskContext extends Serializable {
* Adds a (Java friendly) listener to be executed on task completion.
* This will be called in all situation - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input 
stream.
+   *
+   * Exceptions thrown by the listener will result in failure of the task.
*/
   def addTaskCompletionListener(listener: TaskCompletionListener): TaskContext
 
@@ -113,8 +115,30 @@ abstract class TaskContext extends Serializable {
* Adds a listener in the form of a Scala closure to be executed on task 
completion.
* This will be called in all situations - success, failure, or cancellation.
* An example use is for HadoopRDD to register a callback to close the input 
stream.
+   *
+   * Exceptions thrown by the listener will result in failure of the task.
*/
-  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext
+  def addTaskCompletionListener(f: (TaskContext) => Unit): TaskContext = {
+addTaskCompletionListener(new TaskCompletionListener {
+  override def onTaskCompletion(context: TaskContext): Unit = f(context)
+})
+  }
+
+  /**
+   * Adds a listener to be executed on task failure.
+   * Operations defined here must be idempotent, as `onTaskFailure` can be 
called multiple times.
+   */
+  def addTaskFailureListener(listener: TaskFailureListener): TaskContext
+
+  /**
+   * Adds a listener to be executed on task failure.
+   * Operations defined here must be idempotent, as `onTaskFailure` can be 
called multiple times.
+   */
+  def addTaskFailureListener(f: (TaskContext, Throwable) => Unit): TaskContext 
= {
+addTaskFailureListener(new TaskFailureListener {
+  override def onTaskFailure(context: TaskContext, error: Throwable): Unit 
= f(context, error)
+})
+  }
 
   /**
* The ID of the stage that this task belong to.

http://git-wip-us.apache.org/repos/asf/spark/blob/391755dc/core/src/main/scala/org/apache/spark/TaskContextImpl.scala
--
diff --git a/core/src/main/scala/org/apache/spark/TaskContextImpl.scala