jiangxb1987 commented on a change in pull request #27395: [SPARK-30667][CORE] 
Add allGather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27395#discussion_r378460869
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
 ##########
 @@ -130,9 +140,31 @@ private[spark] class BarrierCoordinator(
 
     // Process the global sync request. The barrier() call succeed if 
collected enough requests
     // within a configured time, otherwise fail all the pending requests.
-    def handleRequest(requester: RpcCallContext, request: RequestToSync): Unit 
= synchronized {
+    def handleRequest(
+      requester: RpcCallContext,
+      request: RequestToSync
+    ): Unit = synchronized {
       val taskId = request.taskAttemptId
       val epoch = request.barrierEpoch
+      val requestMethod = request.requestMethod
+      val partitionId = request.partitionId
+      val allGatherMessage = request match {
+        case ag: AllGatherRequestToSync => ag.allGatherMessage
+        case _ => ""
+      }
+
+      if (requesters.size == 0) {
+        requestMethodToSync = requestMethod
+      }
+
+      if (requestMethodToSync != requestMethod) {
 
 Review comment:
   We should also call `cleanupBarrierStage()` here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to