This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 9f34cf5  [SPARK-31651][CORE] Improve handling the case where different 
barrier sync types in a single sync
9f34cf5 is described below

commit 9f34cf56e551c8bb678221cb8671432d4cb02ac0
Author: yi.wu <yi...@databricks.com>
AuthorDate: Mon May 18 23:54:41 2020 -0700

    [SPARK-31651][CORE] Improve handling the case where different barrier sync 
types in a single sync
    
    ### What changes were proposed in this pull request?
    
    This PR improves handling the case where different barrier sync types in a 
single sync:
    
    - use `clear` instead of `cleanupBarrierStage `
    
    - make sure all requesters are failed because of "different barrier sync 
types"
    
    ### Why are the changes needed?
    
    Currently, we use `cleanupBarrierStage` to clean up a barrier stage when we 
detecting the case of "different barrier sync types". But this leads to a 
problem that we could create new a `ContextBarrierState` for the same stage 
again if there're on-way requests from tasks. As a result, those task will fail 
because of killing instead of "different barrier sync types".
    
    Besides, we don't handle the current request which is being handling 
properly as it will fail due to epoch mismatch instead of "different barrier 
sync types".
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Updated a existed test.
    
    Closes #28462 from Ngone51/impr_barrier_req.
    
    Authored-by: yi.wu <yi...@databricks.com>
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
    (cherry picked from commit 653ca19b1f26155c2b7656e2ebbe227b18383308)
    Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com>
---
 .../org/apache/spark/BarrierCoordinator.scala      | 30 +++++++++++-----------
 .../spark/scheduler/BarrierTaskContextSuite.scala  |  5 +---
 2 files changed, 16 insertions(+), 19 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala 
b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
index 5663055..04faf7f 100644
--- a/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
+++ b/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
@@ -21,7 +21,7 @@ import java.util.{Timer, TimerTask}
 import java.util.concurrent.ConcurrentHashMap
 import java.util.function.Consumer
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.mutable.{ArrayBuffer, HashSet}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint}
@@ -106,9 +106,11 @@ private[spark] class BarrierCoordinator(
     // The messages will be replied to all tasks once sync finished.
     private val messages = Array.ofDim[String](numTasks)
 
-    // The request method which is called inside this barrier sync. All tasks 
should make sure
-    // that they're calling the same method within the same barrier sync phase.
-    private var requestMethod: RequestMethod.Value = _
+    // Request methods collected from tasks inside this barrier sync. All 
tasks should make sure
+    // that they're calling the same method within the same barrier sync 
phase. In other words,
+    // the size of requestMethods should always be 1 for a legitimate barrier 
sync. Otherwise,
+    // the barrier sync would fail if the size of requestMethods becomes 
greater than 1.
+    private val requestMethods = new HashSet[RequestMethod.Value]
 
     // A timer task that ensures we may timeout for a barrier() call.
     private var timerTask: TimerTask = null
@@ -141,17 +143,14 @@ private[spark] class BarrierCoordinator(
       val taskId = request.taskAttemptId
       val epoch = request.barrierEpoch
       val curReqMethod = request.requestMethod
-
-      if (requesters.isEmpty) {
-        requestMethod = curReqMethod
-      } else if (requestMethod != curReqMethod) {
-        requesters.foreach(
-          _.sendFailure(new SparkException(s"$barrierId tried to use 
requestMethod " +
-            s"`$curReqMethod` during barrier epoch $barrierEpoch, which does 
not match " +
-            s"the current synchronized requestMethod `$requestMethod`"
-          ))
-        )
-        cleanupBarrierStage(barrierId)
+      requestMethods.add(curReqMethod)
+      if (requestMethods.size > 1) {
+        val error = new SparkException(s"Different barrier sync types found 
for the " +
+          s"sync $barrierId: ${requestMethods.mkString(", ")}. Please use the 
" +
+          s"same barrier sync type within a single sync.")
+        (requesters :+ requester).foreach(_.sendFailure(error))
+        clear()
+        return
       }
 
       // Require the number of tasks is correctly set from the 
BarrierTaskContext.
@@ -184,6 +183,7 @@ private[spark] class BarrierCoordinator(
             s"tasks, finished successfully.")
           barrierEpoch += 1
           requesters.clear()
+          requestMethods.clear()
           cancelTimerTask()
         }
       }
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
index 17bc339..b5614b2 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala
@@ -96,10 +96,7 @@ class BarrierTaskContextSuite extends SparkFunSuite with 
LocalSparkContext with
     val error = intercept[SparkException] {
       rdd2.collect()
     }.getMessage
-    assert(
-      error.contains("does not match the current synchronized requestMethod") 
||
-      error.contains("not properly killed")
-    )
+    assert(error.contains("Different barrier sync types found"))
   }
 
   test("successively sync with allGather and barrier") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to