Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207296580 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -39,8 +44,58 @@ private[spark] class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} + private val barrierCoordinator: RpcEndpointRef = { + val env = SparkEnv.get + RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + private var barrierEpoch = 0 + + private lazy val numTasks = getTaskInfos().size + + override def barrier(): Unit = { + val callSite = Utils.getCallSite() + logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") + logTrace(s"Current callSite: $callSite") + + val startTime = System.currentTimeMillis() + val timerTask = new TimerTask { + override def run(): Unit = { + logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + + s"under the global sync since $startTime, has been waiting for " + + s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + + s"is $barrierEpoch.") + } + } + // Log the update of global sync every 60 seconds. + timer.schedule(timerTask, 60000, 60000) + + try { + barrierCoordinator.askSync[Unit]( + message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, + barrierEpoch), + // Set a fixed timeout for RPC here, so users shall get a SparkException thrown by + // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. --- End diff -- The `FiniteDuration` used in `RPCTimeout` is limited to +-(2^63-1)ns (ca. 292 years)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org