Github user viirya commented on a diff in the pull request:
https://github.com/apache/spark/pull/21898#discussion_r205960497
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -27,6 +27,33 @@ trait BarrierTaskContext extends TaskContext {
* Sets a global barrier and waits until all tasks in this stage hit
this barrier. Similar to
* MPI_Barrier function in MPI, the barrier() function call blocks until
all tasks in the same
* stage have reached this routine.
+ *
+ * This function is expected to be called by EVERY tasks in the same
barrier stage in the SAME
+ * pattern, otherwise you may get a SparkException. Some examples of
misuses listed below:
+ * 1. Only call barrier() function on a subset of all the tasks in the
same barrier stage, it
+ * shall lead to time out of the function call.
+ * rdd.barrier().mapPartitions { (iter, context) =>
+ * if (context.partitionId() == 0) {
+ * // Do nothing.
+ * } else {
+ * context.barrier()
+ * }
+ * iter
+ * }
+ *
+ * 2. Include barrier() function in a try-catch code block, this may
lead to mismatched call of
+ * barrier function exception.
+ * rdd.barrier().mapPartitions { (iter, context) =>
+ * try {
+ * // Do something that might throw an Exception.
+ * doSomething()
+ * context.barrier()
+ * } catch {
+ * case e: Exception => logWarning("...", e)
+ * }
+ * context.barrier()
--- End diff --
I guess here should not be another `context.barrier()`?
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]