jiangxb1987 commented on a change in pull request #27395: [SPARK-30667][CORE] 
Add allGather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27395#discussion_r377393331
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala
 ##########
 @@ -153,27 +180,69 @@ private[spark] class BarrierCoordinator(
         }
         // Add the requester to array of RPCCallContexts pending for reply.
         requesters += requester
+        allGatherMessages += allGatherMessage
         logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received 
update from Task " +
-          s"$taskId, current progress: ${requesters.size}/$numTasks.")
-        if (maybeFinishAllRequesters(requesters, numTasks)) {
+          s"$taskAttemptId, current progress: ${requesters.size}/$numTasks.")
+        if (maybeFinishAllRequesters(requesters, numTasks, requestMethod)) {
           // Finished current barrier() call successfully, clean up 
ContextBarrierState and
           // increase the barrier epoch.
           logInfo(s"Barrier sync epoch $barrierEpoch from $barrierId received 
all updates from " +
             s"tasks, finished successfully.")
           barrierEpoch += 1
           requesters.clear()
+          allGatherMessages.clear()
           cancelTimerTask()
         }
       }
     }
 
+    def handleBarrierRequest(
+      requester: RpcCallContext,
+      request: BarrierRequestToSync
+    ): Unit = synchronized {
+      handleRequest(
+        requester,
+        request.numTasks,
+        request.stageId,
+        request.taskAttemptId,
+        request.barrierEpoch,
+        request.requestMethod
+      )
+    }
+
+    def handleAllGatherRequest(
+      requester: RpcCallContext,
+      request: AllGatherRequestToSync
+    ): Unit = synchronized {
+      handleRequest(
+        requester,
+        request.numTasks,
+        request.stageId,
+        request.taskAttemptId,
+        request.barrierEpoch,
+        request.requestMethod,
+        request.allGatherMessage
+      )
+    }
+
     // Finish all the blocking barrier sync requests from a stage attempt 
successfully if we
     // have received all the sync requests.
     private def maybeFinishAllRequesters(
         requesters: ArrayBuffer[RpcCallContext],
-        numTasks: Int): Boolean = {
+        numTasks: Int,
+        requestMethod: RequestMethod.Value): Boolean = {
 
 Review comment:
   We don't need `requestMethod`, instead we shall reuse `requestMethodToSync`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to