Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207592141 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { - // TODO SPARK-24817 implement global barrier. + val callSite = Utils.getCallSite() + logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") + logTrace(s"Current callSite: $callSite") + + val startTime = System.currentTimeMillis() + val timerTask = new TimerTask { + override def run(): Unit = { + logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + + s"under the global sync since $startTime, has been waiting for " + + s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + + s"is $barrierEpoch.") + } + } + // Log the update of global sync every 60 seconds. + timer.schedule(timerTask, 60000, 60000) + + try { + barrierCoordinator.askSync[Unit]( + message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, + barrierEpoch), + // Set a fixed timeout for RPC here, so users shall get a SparkException thrown by + // BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. + timeout = new RpcTimeout(31536000 /** = 3600 * 24 * 365 */ seconds, "barrierTimeout")) --- End diff -- nit: "`/** ... */`" -> "`/* ... */`" (the former is for JavaDoc string)
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org