zhengruifeng commented on a change in pull request #27640: [SPARK-30667][CORE]
Add all gather method to BarrierTaskContext
URL: https://github.com/apache/spark/pull/27640#discussion_r387492294
##########
File path: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala
##########
@@ -163,6 +155,73 @@ class BarrierTaskContext private[spark] (
timerTask.cancel()
timer.purge()
}
+ json
+ }
+
+ /**
+ * :: Experimental ::
+ * Sets a global barrier and waits until all tasks in this stage hit this
barrier. Similar to
+ * MPI_Barrier function in MPI, the barrier() function call blocks until all
tasks in the same
+ * stage have reached this routine.
+ *
+ * CAUTION! In a barrier stage, each task must have the same number of
barrier() calls, in all
+ * possible code branches. Otherwise, you may get the job hanging or a
SparkException after
+ * timeout. Some examples of '''misuses''' are listed below:
+ * 1. Only call barrier() function on a subset of all the tasks in the same
barrier stage, it
+ * shall lead to timeout of the function call.
+ * {{{
+ * rdd.barrier().mapPartitions { iter =>
+ * val context = BarrierTaskContext.get()
+ * if (context.partitionId() == 0) {
+ * // Do nothing.
+ * } else {
+ * context.barrier()
+ * }
+ * iter
+ * }
+ * }}}
+ *
+ * 2. Include barrier() function in a try-catch code block, this may lead to
timeout of the
+ * second function call.
+ * {{{
+ * rdd.barrier().mapPartitions { iter =>
+ * val context = BarrierTaskContext.get()
+ * try {
+ * // Do something that might throw an Exception.
+ * doSomething()
+ * context.barrier()
+ * } catch {
+ * case e: Exception => logWarning("...", e)
+ * }
+ * context.barrier()
+ * iter
+ * }
+ * }}}
+ */
+ @Experimental
+ @Since("2.4.0")
+ def barrier(): Unit = {
+ runBarrier(RequestMethod.BARRIER)
+ ()
+ }
+
+ /**
+ * :: Experimental ::
+ * Blocks until all tasks in the same stage have reached this routine. Each
task passes in
+ * a message and returns with a list of all the messages passed in by each
of those tasks.
+ *
+ * CAUTION! The allGather method requires the same precautions as the
barrier method
+ *
+ * The message is type String rather than Array[Byte] because it is more
convenient for
+ * the user at the cost of worse performance.
+ */
+ @Experimental
+ @Since("3.0.0")
+ def allGather(message: String): ArrayBuffer[String] = {
Review comment:
Just out of curiosity, why return an `ArrayBuffer[String]` instead of an
`Array[String]` here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]