Repository: spark
Updated Branches:
  refs/heads/master 278fa1eb3 -> 33791a8ce


[SPARK-18113] Use ask to replace askWithRetry in canCommit and make receiver 
idempotent.

## What changes were proposed in this pull request?

Method canCommit sends AskPermissionToCommitOutput using askWithRetry. If 
timeout, it will send again. Thus AskPermissionToCommitOutput can be received 
multi times. Method canCommit should return the same value when called by the 
same attempt multi times.

In implementation before this fix, method handleAskPermissionToCommit just 
check if there is committer already registered, which is not enough. When 
worker retries AskPermissionToCommitOutput it will get CommitDeniedException, 
then the task will fail with reason TaskCommitDenied, which is not regarded as 
a task failure(SPARK-11178), so TaskScheduler will schedule this task 
infinitely.

In this fix, use `ask` to replace `askWithRetry` in `canCommit` and make 
receiver idempotent.

## How was this patch tested?

Added a new unit test to OutputCommitCoordinatorSuite.

Author: jinxing <jinx...@meituan.com>

Closes #16503 from jinxing64/SPARK-18113.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33791a8c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33791a8c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33791a8c

Branch: refs/heads/master
Commit: 33791a8ced61d1ffa09f68033d240f874fdb1593
Parents: 278fa1e
Author: jinxing <jinx...@meituan.com>
Authored: Wed Jan 18 10:47:22 2017 -0800
Committer: Marcelo Vanzin <van...@cloudera.com>
Committed: Wed Jan 18 10:47:22 2017 -0800

----------------------------------------------------------------------
 .../scheduler/OutputCommitCoordinator.scala      | 19 +++++++++++++++----
 .../scheduler/OutputCommitCoordinatorSuite.scala | 16 ++++++++++++++++
 2 files changed, 31 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
index 7bed685..08d220b 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala
@@ -22,6 +22,7 @@ import scala.collection.mutable
 import org.apache.spark._
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEndpoint, RpcEndpointRef, 
RpcEnv}
+import org.apache.spark.util.{RpcUtils, ThreadUtils}
 
 private sealed trait OutputCommitCoordinationMessage extends Serializable
 
@@ -88,7 +89,8 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, 
isDriver: Boolean)
     val msg = AskPermissionToCommitOutput(stage, partition, attemptNumber)
     coordinatorRef match {
       case Some(endpointRef) =>
-        endpointRef.askWithRetry[Boolean](msg)
+        ThreadUtils.awaitResult(endpointRef.ask[Boolean](msg),
+          RpcUtils.askRpcTimeout(conf).duration)
       case None =>
         logError(
           "canCommit called after coordinator was stopped (is SparkEnv 
shutdown in progress)?")
@@ -165,9 +167,18 @@ private[spark] class OutputCommitCoordinator(conf: 
SparkConf, isDriver: Boolean)
             authorizedCommitters(partition) = attemptNumber
             true
           case existingCommitter =>
-            logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
-              s"partition=$partition; existingCommitter = $existingCommitter")
-            false
+            // Coordinator should be idempotent when receiving 
AskPermissionToCommit.
+            if (existingCommitter == attemptNumber) {
+              logWarning(s"Authorizing duplicate request to commit for " +
+                s"attemptNumber=$attemptNumber to commit for stage=$stage," +
+                s" partition=$partition; existingCommitter = 
$existingCommitter." +
+                s" This can indicate dropped network traffic.")
+              true
+            } else {
+              logDebug(s"Denying attemptNumber=$attemptNumber to commit for 
stage=$stage, " +
+                s"partition=$partition; existingCommitter = 
$existingCommitter")
+              false
+            }
         }
       case None =>
         logDebug(s"Stage $stage has completed, so not allowing attempt number 
$attemptNumber of" +

http://git-wip-us.apache.org/repos/asf/spark/blob/33791a8c/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
index 8c4e389..0c362b8 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala
@@ -189,6 +189,12 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(
       !outputCommitCoordinator.canCommit(stage, partition, 
nonAuthorizedCommitter + 3))
   }
+
+  test("Duplicate calls to canCommit from the authorized committer gets 
idempotent responses.") {
+    val rdd = sc.parallelize(Seq(1), 1)
+    sc.runJob(rdd, 
OutputCommitFunctions(tempDir.getAbsolutePath).callCanCommitMultipleTimes _,
+       0 until rdd.partitions.size)
+  }
 }
 
 /**
@@ -221,6 +227,16 @@ private case class OutputCommitFunctions(tempDirPath: 
String) {
       if (ctx.attemptNumber == 0) failingOutputCommitter else 
successfulOutputCommitter)
   }
 
+  // Receiver should be idempotent for AskPermissionToCommitOutput
+  def callCanCommitMultipleTimes(iter: Iterator[Int]): Unit = {
+    val ctx = TaskContext.get()
+    val canCommit1 = SparkEnv.get.outputCommitCoordinator
+      .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
+    val canCommit2 = SparkEnv.get.outputCommitCoordinator
+      .canCommit(ctx.stageId(), ctx.partitionId(), ctx.attemptNumber())
+    assert(canCommit1 && canCommit2)
+  }
+
   private def runCommitWithProvidedCommitter(
       ctx: TaskContext,
       iter: Iterator[Int],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to