srowen commented on a change in pull request #27640: [SPARK-30667][CORE] Add 
all gather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27640#discussion_r393029589
 
 

 ##########
 File path: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
 ##########
 @@ -163,6 +155,73 @@ class BarrierTaskContext private[spark] (
       timerTask.cancel()
       timer.purge()
     }
+    json
+  }
+
+  /**
+   * :: Experimental ::
+   * Sets a global barrier and waits until all tasks in this stage hit this 
barrier. Similar to
+   * MPI_Barrier function in MPI, the barrier() function call blocks until all 
tasks in the same
+   * stage have reached this routine.
+   *
+   * CAUTION! In a barrier stage, each task must have the same number of 
barrier() calls, in all
+   * possible code branches. Otherwise, you may get the job hanging or a 
SparkException after
+   * timeout. Some examples of '''misuses''' are listed below:
+   * 1. Only call barrier() function on a subset of all the tasks in the same 
barrier stage, it
+   * shall lead to timeout of the function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { iter =>
+   *       val context = BarrierTaskContext.get()
+   *       if (context.partitionId() == 0) {
+   *           // Do nothing.
+   *       } else {
+   *           context.barrier()
+   *       }
+   *       iter
+   *   }
+   * }}}
+   *
+   * 2. Include barrier() function in a try-catch code block, this may lead to 
timeout of the
+   * second function call.
+   * {{{
+   *   rdd.barrier().mapPartitions { iter =>
+   *       val context = BarrierTaskContext.get()
+   *       try {
+   *           // Do something that might throw an Exception.
+   *           doSomething()
+   *           context.barrier()
+   *       } catch {
+   *           case e: Exception => logWarning("...", e)
+   *       }
+   *       context.barrier()
+   *       iter
+   *   }
+   * }}}
+   */
+  @Experimental
+  @Since("2.4.0")
+  def barrier(): Unit = {
+    runBarrier(RequestMethod.BARRIER)
+    ()
+  }
+
+  /**
+   * :: Experimental ::
+   * Blocks until all tasks in the same stage have reached this routine. Each 
task passes in
+   * a message and returns with a list of all the messages passed in by each 
of those tasks.
+   *
+   * CAUTION! The allGather method requires the same precautions as the 
barrier method
+   *
+   * The message is type String rather than Array[Byte] because it is more 
convenient for
+   * the user at the cost of worse performance.
+   */
+  @Experimental
+  @Since("3.0.0")
+  def allGather(message: String): ArrayBuffer[String] = {
 
 Review comment:
   Fair point; why not just `Seq`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to