Ngone51 commented on a change in pull request #27395: [SPARK-30667][CORE] Add 
allGather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27395#discussion_r378605743
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
 ##########
 @@ -59,49 +67,31 @@ class BarrierTaskContext private[spark] (
   // from different tasks within the same barrier stage attempt to succeed.
   private lazy val numTasks = getTaskInfos().size
 
-  /**
-   * :: Experimental ::
-   * Sets a global barrier and waits until all tasks in this stage hit this 
barrier. Similar to
-   * MPI_Barrier function in MPI, the barrier() function call blocks until all 
tasks in the same
-   * stage have reached this routine.
-   *
-   * CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
-   * possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
-   * timeout. Some examples of '''misuses''' are listed below:
-   * 1. Only call barrier() function on a subset of all the tasks in the same 
barrier stage, it
-   * shall lead to timeout of the function call.
-   * {{{
-   *   rdd.barrier().mapPartitions { iter =>
-   *       val context = BarrierTaskContext.get()
-   *       if (context.partitionId() == 0) {
-   *           // Do nothing.
-   *       } else {
-   *           context.barrier()
-   *       }
-   *       iter
-   *   }
-   * }}}
-   *
-   * 2. Include barrier() function in a try-catch code block, this may lead to 
timeout of the
-   * second function call.
-   * {{{
-   *   rdd.barrier().mapPartitions { iter =>
-   *       val context = BarrierTaskContext.get()
-   *       try {
-   *           // Do something that might throw an Exception.
-   *           doSomething()
-   *           context.barrier()
-   *       } catch {
-   *           case e: Exception => logWarning("...", e)
-   *       }
-   *       context.barrier()
-   *       iter
-   *   }
-   * }}}
-   */
-  @Experimental
-  @Since("2.4.0")
-  def barrier(): Unit = {
+  private def getRequestToSync(
+    numTasks: Int,
+    stageId: Int,
+    stageAttemptNumber: Int,
+    taskAttemptId: Long,
+    barrierEpoch: Int,
+    partitionId: Int,
+    requestMethod: RequestMethod.Value,
+    allGatherMessage: String
+  ): RequestToSync = {
+    requestMethod match {
+      case RequestMethod.BARRIER =>
+        return BarrierRequestToSync(numTasks, stageId, stageAttemptNumber, 
taskAttemptId,
+          barrierEpoch, partitionId, requestMethod)
 
 Review comment:
    nit: `return` is unnecessary in Scala.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to