Github user jiangxb1987 commented on a diff in the pull request:
https://github.com/apache/spark/pull/21898#discussion_r207821774
--- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala ---
@@ -39,6 +44,22 @@ class BarrierTaskContext(
extends TaskContextImpl(stageId, stageAttemptNumber, partitionId,
taskAttemptId, attemptNumber,
taskMemoryManager, localProperties, metricsSystem, taskMetrics) {
+ // Find the driver side RPCEndpointRef of the coordinator that handles
all the barrier() calls.
+ private val barrierCoordinator: RpcEndpointRef = {
+ val env = SparkEnv.get
+ RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv)
+ }
+
+ private val timer = new Timer("Barrier task timer for barrier() calls.")
+
+ // Local barrierEpoch that identify a barrier() call from current task,
it shall be identical
+ // with the driver side epoch.
+ private var barrierEpoch = 0
+
+ // Number of tasks of the current barrier stage, a barrier() call must
collect enough requests
+ // from different tasks within the same barrier stage attempt to succeed.
+ private lazy val numTasks = getTaskInfos().size
--- End diff --
If change it to a `def` then we have to call `getTaskInfos()` every time,
the current `lazy val` shall only call `getTaskInfos()` once.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]