[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21898 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207846166 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- Add a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207845712 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- Add a comment above this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207827294 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- I opened https://issues.apache.org/jira/browse/SPARK-25030 to track the issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207823603 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- This is certainly a potential bug in `SparkSubmit` and not related to the changes made in this PR, I don't feel it shall block this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207822218 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- This is to fix https://github.com/apache/spark/pull/21898#issuecomment-410499090 , previously LiveListenerBus was stopped before we stop DAGScheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207821774 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -39,6 +44,22 @@ class BarrierTaskContext( extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, taskMetrics) { + // Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls. + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + // Local barrierEpoch that identify a barrier() call from current task, it shall be identical + // with the driver side epoch. + private var barrierEpoch = 0 + + // Number of tasks of the current barrier stage, a barrier() call must collect enough requests + // from different tasks within the same barrier stage attempt to succeed. + private lazy val numTasks = getTaskInfos().size --- End diff -- If change it to a `def` then we have to call `getTaskInfos()` every time, the current `lazy val` shall only call `getTaskInfos()` once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207814955 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- Will we identify the underlying reason before merging to master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758158 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -566,6 +579,9 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +if (barrierCoordinator != null) { --- End diff -- maybe we should not use `lazy val`, but use `var` and control the initialization ourselves. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758062 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758063 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { -// TODO SPARK-24817 implement global barrier. +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") --- End diff -- or simpler: `logTrace("Current callSite: " + Utils.getCallSite())` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758062 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- why this change? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758063 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { -// TODO SPARK-24817 implement global barrier. +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") --- End diff -- or simpler: `logTrace("Current callSite: " + Utils.getCallSite())` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207758008 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { -// TODO SPARK-24817 implement global barrier. +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207757953 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -39,6 +44,22 @@ class BarrierTaskContext( extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, taskMetrics) { + // Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls. + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + // Local barrierEpoch that identify a barrier() call from current task, it shall be identical + // with the driver side epoch. + private var barrierEpoch = 0 + + // Number of tasks of the current barrier stage, a barrier() call must collect enough requests + // from different tasks within the same barrier stage attempt to succeed. + private lazy val numTasks = getTaskInfos().size --- End diff -- this can be a `def`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207744998 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -566,6 +579,9 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +if (barrierCoordinator != null) { --- End diff -- Ah, i see. How about leaving a comment? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207744089 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -566,6 +579,9 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +if (barrierCoordinator != null) { --- End diff -- It shall first get inited and then get stopped. Do we have a better way for this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207744005 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -566,6 +579,9 @@ private[spark] class TaskSchedulerImpl( if (taskResultGetter != null) { taskResultGetter.stop() } +if (barrierCoordinator != null) { --- End diff -- What is the case if `barrierCoordinator == null`? `barrierCoordinator` is `lazy val`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743879 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -567,4 +567,14 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val BARRIER_SYNC_TIMEOUT = +ConfigBuilder("spark.barrier.sync.timeout") + .doc("The timeout in seconds for each barrier() call from a barrier task. If the " + +"coordinator didn't receive all the sync messages from barrier tasks within the " + +"configed time, throw a SparkException to fail all the tasks. The default value is set " + +"to 31536000(3600 * 24 * 365) so the barrier() call shall wait for one year.") + .timeConf(TimeUnit.SECONDS) + .checkValue(v => v > 0, "The value should be a positive int value.") --- End diff -- nit: `positive int value` -> `positive time value`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743781 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { -// TODO SPARK-24817 implement global barrier. +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") --- End diff -- How about the following since `Utils.getCallSite()` is not lightweight? ``` if (isTraceEnabled) { val callSite = Utils.getCallSite() logTrace(s"Current callSite: $callSite") } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743563 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Record all active stage attempts that make barrier() call(s), and the corresponding internal + // state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + override def onStop(): Unit = { +try { + states.forEachValue(1, clearStateConsumer) + states.clear() + listenerBus.removeListener(listener) +} finally { + super.onStop() +} + } + + /** + * Provide the current state of a barrier() call. A state is created when a new stage attempt + * sends out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743465 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Record all active stage attempts that make barrier() call(s), and the corresponding internal + // state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + override def onStop(): Unit = { +try { + states.forEachValue(1, clearStateConsumer) + states.clear() + listenerBus.removeListener(listener) +} finally { + super.onStop() +} + } + + /** + * Provide the current state of a barrier() call. A state is created when a new stage attempt + * sends out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. --- End diff -- nit: `fail` -> `fails` --- - To unsubscribe, e-mail:
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743456 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Record all active stage attempts that make barrier() call(s), and the corresponding internal + // state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + override def onStop(): Unit = { +try { + states.forEachValue(1, clearStateConsumer) + states.clear() + listenerBus.removeListener(listener) +} finally { + super.onStop() +} + } + + /** + * Provide the current state of a barrier() call. A state is created when a new stage attempt + * sends out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or --- End diff -- nit: `succeed` ->`succeeds` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail:
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207743412 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Record all active stage attempts that make barrier() call(s), and the corresponding internal + // state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + override def onStop(): Unit = { +try { + states.forEachValue(1, clearStateConsumer) + states.clear() + listenerBus.removeListener(listener) +} finally { + super.onStop() +} + } + + /** + * Provide the current state of a barrier() call. A state is created when a new stage attempt + * sends out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207737051 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207736706 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. --- End diff -- Actually, a barrier stage attempt can trigger multiple barrier() calls, but only one call can be active at any time. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207732505 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207732072 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, --- End diff -- -> `timeoutInSecs` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731918 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731620 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send --- End diff -- `Provide current state of a barrier() call, the state` -> `Provide the current state of a barrier() call. The state` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731501 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding --- End diff -- `Remember` -> `Record` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731274 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect --- End diff -- `doesn't collect` -> `is unable to collect` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731107 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731198 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. --- End diff -- ```Scala Since only one barrier() call is triggered by each barrier stage attempt, we use (stageId, stageAttemptId) to identify the stage attempt which the barrier() call is from. ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731249 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect --- End diff -- `received all the requests for a group of `barrier()` calls` -> `all the requests for a group of `barrier()` calls are received` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207731446 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. --- End diff -- `due to timeout` -> `and return a timeout exception` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730912 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730852 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,225 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + cleanupBarrierStage(barrierId) +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null + +// Init a TimerTask for a barrier() call. +private def initTimerTask(): Unit =
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207730025 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -567,4 +567,14 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val BARRIER_SYNC_TIMEOUT = +ConfigBuilder("spark.barrier.sync.timeout") + .doc("The timeout in seconds for each barrier() call from a barrier task. If the " + +"coordinator didn't receive all the sync messages from barrier tasks within the " + +"configed time, throw a SparkException to fail all the tasks. The default value is set " + +"to 31536000(3600 * 24 * 365) so the barrier() call shall wait for one year.") + .intConf --- End diff -- `.timeConf(TimeUnit.SECONDS)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207723947 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala --- @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Random + +import org.apache.spark._ + +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { + + test("global sync by barrier() call") { +val conf = new SparkConf() + // Init local cluster here so each barrier task runs in a separated process, thus `barrier()` + // call is actually useful. + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + Seq(System.currentTimeMillis()).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish global sync within a short time slot. +assert(times.max - times.min <= 1000) + } + + test("support multiple barrier() call within a single task") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time1 = System.currentTimeMillis() + // Sleep for a random time between two global syncs. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time2 = System.currentTimeMillis() + Seq((time1, time2)).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish the first round of global sync within a short time slot. +val times1 = times.map(_._1) +assert(times1.max - times1.min <= 1000) + +// All the tasks shall finish the second round of global sync within a short time slot. +val times2 = times.map(_._2) +assert(times2.max - times2.min <= 1000) + } + + test("throw exception on barrier() call timeout") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Task 3 shall sleep 2000ms to ensure barrier() call timeout + if (context.taskAttemptId == 3) { +Thread.sleep(2000) + } + context.barrier() + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The coordinator didn't get all barrier sync requests")) +assert(error.contains("within 1s")) + } + + test("throw exception if barrier() call doesn't happen on every task") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + if (context.taskAttemptId != 0) { +context.barrier() + } + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207723925 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207710500 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Random + +import org.apache.spark._ + +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { + + test("global sync by barrier() call") { +val conf = new SparkConf() + // Init local cluster here so each barrier task runs in a separated process, thus `barrier()` + // call is actually useful. + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + Seq(System.currentTimeMillis()).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish global sync within a short time slot. +assert(times.max - times.min <= 1000) + } + + test("support multiple barrier() call within a single task") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time1 = System.currentTimeMillis() + // Sleep for a random time between two global syncs. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time2 = System.currentTimeMillis() + Seq((time1, time2)).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish the first round of global sync within a short time slot. +val times1 = times.map(_._1) +assert(times1.max - times1.min <= 1000) + +// All the tasks shall finish the second round of global sync within a short time slot. +val times2 = times.map(_._2) +assert(times2.max - times2.min <= 1000) + } + + test("throw exception on barrier() call timeout") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Task 3 shall sleep 2000ms to ensure barrier() call timeout + if (context.taskAttemptId == 3) { +Thread.sleep(2000) + } + context.barrier() + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The coordinator didn't get all barrier sync requests")) +assert(error.contains("within 1s")) + } + + test("throw exception if barrier() call doesn't happen on every task") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + if (context.taskAttemptId != 0) { +context.barrier() + } + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207710151 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207704241 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala --- @@ -0,0 +1,148 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Random + +import org.apache.spark._ + +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { + + test("global sync by barrier() call") { +val conf = new SparkConf() + // Init local cluster here so each barrier task runs in a separated process, thus `barrier()` + // call is actually useful. + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + Seq(System.currentTimeMillis()).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish global sync within a short time slot. +assert(times.max - times.min <= 1000) + } + + test("support multiple barrier() call within a single task") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time1 = System.currentTimeMillis() + // Sleep for a random time between two global syncs. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time2 = System.currentTimeMillis() + Seq((time1, time2)).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish the first round of global sync within a short time slot. +val times1 = times.map(_._1) +assert(times1.max - times1.min <= 1000) + +// All the tasks shall finish the second round of global sync within a short time slot. +val times2 = times.map(_._2) +assert(times2.max - times2.min <= 1000) + } + + test("throw exception on barrier() call timeout") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Task 3 shall sleep 2000ms to ensure barrier() call timeout + if (context.taskAttemptId == 3) { +Thread.sleep(2000) + } + context.barrier() + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The coordinator didn't get all barrier sync requests")) +assert(error.contains("within 1s")) + } + + test("throw exception if barrier() call doesn't happen on every task") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "1") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + if (context.taskAttemptId != 0) { +context.barrier() + } + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207704198 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207704135 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207704089 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207704051 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207703995 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) --- End diff -- we can call `cleanupBarrierStage` here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207703930 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207703895 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207591774 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207590791 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207592141 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -80,7 +101,45 @@ class BarrierTaskContext( @Experimental @Since("2.4.0") def barrier(): Unit = { -// TODO SPARK-24817 implement global barrier. +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") + +val startTime = System.currentTimeMillis() +val timerTask = new TimerTask { + override def run(): Unit = { +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + + s"under the global sync since $startTime, has been waiting for " + + s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + + s"is $barrierEpoch.") + } +} +// Log the update of global sync every 60 seconds. +timer.schedule(timerTask, 6, 6) + +try { + barrierCoordinator.askSync[Unit]( +message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, + barrierEpoch), +// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by +// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. +timeout = new RpcTimeout(31536000 /** = 3600 * 24 * 365 */ seconds, "barrierTimeout")) --- End diff -- nit: "`/** ... */`" -> "`/* ... */`" (the former is for JavaDoc string) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207590284 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207590340 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207589726 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * Only one barrier() call shall happen on a barrier stage attempt at each time, we can use + * (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Listen to StageCompleted event, clear corresponding ContextBarrierState. + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + val barrierId = ContextBarrierId(stageInfo.stageId, stageInfo.attemptNumber) + // Clear ContextBarrierState from a finished stage attempt. + val barrierState = states.remove(barrierId) + if (barrierState != null) { +barrierState.clear() + } +} + } + + // Remember all active stage attempts that make barrier() call(s), and the corresponding + // internal state. + private val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Provide current state of a barrier() call, the state is created when a new stage attempt send + * out a barrier() call, and recycled on stage completed. + * + * @param barrierId Identifier of the barrier stage that make a barrier() call. + * @param numTasks Number of tasks of the barrier stage, all barrier() calls from the stage shall + * collect `numTasks` requests to succeed. + */ + private class ContextBarrierState( + val barrierId: ContextBarrierId, + val numTasks: Int) { + +// There may be multiple barrier() calls from a barrier stage attempt, `barrierEpoch` is used +// to identify each barrier() call. It shall get increased when a barrier() call succeed, or +// reset when a barrier() call fail due to timeout. +private var barrierEpoch: Int = 0 + +// An array of RPCCallContexts for barrier tasks that are waiting for reply of a barrier() +// call. +private val requesters: ArrayBuffer[RpcCallContext] = new ArrayBuffer[RpcCallContext](numTasks) + +// A timer task that ensures we may timeout for a barrier() call. +private var timerTask: TimerTask = null +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207444701 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -39,6 +44,17 @@ class BarrierTaskContext( extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, taskMetrics) { + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) --- End diff -- It would be nice to define `"barrierSync"` as a constant. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207442439 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() --- End diff -- Agree with @cloud-fan that this is not necessary. It only explicitly clears the ArrayBuffer object instead of the contexts. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207444103 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int):
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207443099 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int):
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207444041 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int):
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207442661 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int):
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r20759 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int):
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207413435 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), --- End diff -- ~~~scala syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), new ArrayBuffer[RpcCallContext](numTasks))) syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) ~~~ --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207350823 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207348484 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) + cancelTimerTask(stageInfo.stageId, stageInfo.attemptNumber) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), Int] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + // Remember all the TimerTasks for each barrier (stage, attempt). + private val timerTaskByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), TimerTask] + + // Number of tasks for each stage. + private val numTasksByStage = new ConcurrentHashMap[Int, Int] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), --- End diff -- I think that this code always allocate `ArrayBuffer()` regardless of existence of a value. If allocating an array is acceptable, the following code looks simpler. ``` val value = new ArrayBuffer[RpcCallContext](numTasks); val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), value) if (requests != null) { requests } else { value } ``` If allocating an array is not acceptable, we may need to use `computeIfAbsent` like this ``` private val allocateArrayBuffer = new JFunction[Int, ArrayBuffer[RpcCallContext]] { override def apply(numTasks: Int): ArrayBuffer[RpcCallContext]= new ArrayBuffer[RpcCallContext](numTasks) } private def getOrInitSyncRequests(...): ArrayBuffer[RpcCallContext] = {
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207296580 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -39,8 +44,58 @@ private[spark] class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + private var barrierEpoch = 0 + + private lazy val numTasks = getTaskInfos().size + + override def barrier(): Unit = { +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") + +val startTime = System.currentTimeMillis() +val timerTask = new TimerTask { + override def run(): Unit = { +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + + s"under the global sync since $startTime, has been waiting for " + + s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + + s"is $barrierEpoch.") + } +} +// Log the update of global sync every 60 seconds. +timer.schedule(timerTask, 6, 6) + +try { + barrierCoordinator.askSync[Unit]( +message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, + barrierEpoch), +// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by +// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. --- End diff -- The `FiniteDuration` used in `RPCTimeout` is limited to +-(2^63-1)ns (ca. 292 years) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207121392 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207107551 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207106350 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207106000 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207105381 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207105004 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() --- End diff -- This is just to be safe, in case the requests are held in other places, we can still GC the `RpcCallContext`s --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207098735 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala --- @@ -61,6 +61,9 @@ private[spark] trait TaskScheduler { */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean + // Kill all the running task attempts in a stage. + def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit --- End diff -- IIRC killing all tasks is just the best effort, we can guarantee the tasks are all killed. Shall we tolerate this in the barrier scheduling? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207098100 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -39,8 +44,58 @@ private[spark] class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + private var barrierEpoch = 0 + + private lazy val numTasks = getTaskInfos().size + + override def barrier(): Unit = { +val callSite = Utils.getCallSite() +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) has entered " + + s"the global sync, current barrier epoch is $barrierEpoch.") +logTrace(s"Current callSite: $callSite") + +val startTime = System.currentTimeMillis() +val timerTask = new TimerTask { + override def run(): Unit = { +logInfo(s"Task $taskAttemptId from Stage $stageId(Attempt $stageAttemptNumber) waiting " + + s"under the global sync since $startTime, has been waiting for " + + s"${(System.currentTimeMillis() - startTime) / 1000} seconds, current barrier epoch " + + s"is $barrierEpoch.") + } +} +// Log the update of global sync every 60 seconds. +timer.schedule(timerTask, 6, 6) + +try { + barrierCoordinator.askSync[Unit]( +message = RequestToSync(numTasks, stageId, stageAttemptNumber, taskAttemptId, + barrierEpoch), +// Set a fixed timeout for RPC here, so users shall get a SparkException thrown by +// BarrierCoordinator on timeout, instead of RPCTimeoutException from the RPC framework. --- End diff -- do we have a max timeout for the RPC framework? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097912 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097716 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097631 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097401 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097180 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207097024 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207096937 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207096760 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() +} +logInfo(s"Removed all the pending barrier sync requests from Stage $stageId (Attempt " + + s"$stageAttemptId).") + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): AtomicInteger = { +val barrierEpoch = barrierEpochByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new AtomicInteger(0)) +if (barrierEpoch == null) { + barrierEpochByStageIdAndAttempt.get((stageId,
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207096540 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { --- End diff -- when will we use the default value `0`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207096425 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * received all the requests for a group of `barrier()` calls. If the coordinator doesn't collect + * enough global sync requests within a configured time, fail all the requests due to timeout. + */ +private[spark] class BarrierCoordinator( +timeout: Int, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + private val listener = new SparkListener { +override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = { + val stageInfo = stageCompleted.stageInfo + // Remove internal data from a finished stage attempt. + cleanupSyncRequests(stageInfo.stageId, stageInfo.attemptNumber) + barrierEpochByStageIdAndAttempt.remove((stageInfo.stageId, stageInfo.attemptNumber)) +} + } + + // Epoch counter for each barrier (stage, attempt). + private val barrierEpochByStageIdAndAttempt = new ConcurrentHashMap[(Int, Int), AtomicInteger] + + // Remember all the blocking global sync requests for each barrier (stage, attempt). + private val syncRequestsByStageIdAndAttempt = +new ConcurrentHashMap[(Int, Int), ArrayBuffer[RpcCallContext]] + + override def onStart(): Unit = { +super.onStart() +listenerBus.addToStatusQueue(listener) + } + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = { +val requests = syncRequestsByStageIdAndAttempt.putIfAbsent((stageId, stageAttemptId), + new ArrayBuffer[RpcCallContext](numTasks)) +if (requests == null) { + syncRequestsByStageIdAndAttempt.get((stageId, stageAttemptId)) +} else { + requests +} + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = { +val requests = syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId)) +if (requests != null) { + requests.clear() --- End diff -- is this needed? when we call `syncRequestsByStageIdAndAttempt.remove((stageId, stageAttemptId))`, the array buffer becomes dangling and will be GCed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206972303 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala --- @@ -61,6 +61,9 @@ private[spark] trait TaskScheduler { */ def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean + // Kill all the running task attempts in a stage. + def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit --- End diff -- Submitted #21943 , however, still need this here until #21943 is merged, because otherwise the test cases shall fail. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206709783 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] + + // Any access to this should be synchronized. + private val syncRequestsByStageIdAndAttempt = +new HashMap[Int, HashMap[Int, ArrayBuffer[RpcCallContext]]] + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = synchronized { +val syncRequestsByStage = syncRequestsByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, ArrayBuffer[RpcCallContext]]) +syncRequestsByStage.getOrElseUpdate(stageAttemptId, new ArrayBuffer[RpcCallContext](numTasks)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = synchronized { +syncRequestsByStageIdAndAttempt.get(stageId).foreach { syncRequestByStage => + syncRequestByStage.get(stageAttemptId).foreach { syncRequests => +syncRequests.clear() + } + syncRequestByStage -= stageAttemptId + if (syncRequestByStage.isEmpty) { +syncRequestsByStageIdAndAttempt -= stageId + } + logInfo(s"Removed all the pending barrier sync requests from Stage $stageId(Attempt " + +s"$stageAttemptId).") +} + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): Int = synchronized { +val barrierEpochByStage = barrierEpochByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, Int]) +val barrierEpoch = barrierEpochByStage.getOrElseUpdate(stageAttemptId, 0) +logInfo(s"Current barrier epoch for Stage $stageId(Attempt $stageAttemptId) is $barrierEpoch.") +barrierEpoch + } + + /** + * Update the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def updateBarrierEpoch( + stageId: Int, + stageAttemptId: Int, + newBarrierEpoch: Int): Unit = synchronized { +val barrierEpochByStage = barrierEpochByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, Int]) +barrierEpochByStage.put(stageAttemptId, newBarrierEpoch) +logInfo(s"Current barrier epoch for Stage $stageId(Attempt $stageAttemptId) is " + + s"$newBarrierEpoch.") + } + + /** + * Send failure to all the blocking barrier sync requests from a stage attempt with proper + * failure message. + */ + private def failAllSyncRequests( + syncRequests: ArrayBuffer[RpcCallContext], + message: String): Unit = { +syncRequests.foreach(_.sendFailure(new SparkException(message))) +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206716473 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -39,8 +44,53 @@ private[spark] class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + private var barrierEpoch = 0 --- End diff -- Using AtomicLong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206628185 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] --- End diff -- +1. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206708836 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] + + // Any access to this should be synchronized. + private val syncRequestsByStageIdAndAttempt = +new HashMap[Int, HashMap[Int, ArrayBuffer[RpcCallContext]]] + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = synchronized { +val syncRequestsByStage = syncRequestsByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, ArrayBuffer[RpcCallContext]]) +syncRequestsByStage.getOrElseUpdate(stageAttemptId, new ArrayBuffer[RpcCallContext](numTasks)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = synchronized { +syncRequestsByStageIdAndAttempt.get(stageId).foreach { syncRequestByStage => + syncRequestByStage.get(stageAttemptId).foreach { syncRequests => +syncRequests.clear() + } + syncRequestByStage -= stageAttemptId + if (syncRequestByStage.isEmpty) { +syncRequestsByStageIdAndAttempt -= stageId + } + logInfo(s"Removed all the pending barrier sync requests from Stage $stageId(Attempt " + +s"$stageAttemptId).") +} + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): Int = synchronized { +val barrierEpochByStage = barrierEpochByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, Int]) +val barrierEpoch = barrierEpochByStage.getOrElseUpdate(stageAttemptId, 0) +logInfo(s"Current barrier epoch for Stage $stageId(Attempt $stageAttemptId) is $barrierEpoch.") +barrierEpoch + } + + /** + * Update the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def updateBarrierEpoch( + stageId: Int, + stageAttemptId: Int, + newBarrierEpoch: Int): Unit = synchronized { +val barrierEpochByStage = barrierEpochByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, Int]) +barrierEpochByStage.put(stageAttemptId, newBarrierEpoch) +logInfo(s"Current barrier epoch for Stage $stageId(Attempt $stageAttemptId) is " + + s"$newBarrierEpoch.") + } + + /** + * Send failure to all the blocking barrier sync requests from a stage attempt with proper + * failure message. + */ + private def failAllSyncRequests( + syncRequests: ArrayBuffer[RpcCallContext], + message: String): Unit = { +syncRequests.foreach(_.sendFailure(new SparkException(message))) +
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206718449 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Random + +import org.apache.spark._ + +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { + + test("global sync by barrier() call") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + Seq(System.currentTimeMillis()).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish global sync within a short time slot. +assert(times.max - times.min <= 5) + } + + test("support multiple barrier() call within a single task") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time1 = System.currentTimeMillis() + // Sleep for a random time between two global syncs. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + val time2 = System.currentTimeMillis() + Seq((time1, time2)).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish the first round of global sync within a short time slot. +val times1 = times.map(_._1) +assert(times1.max - times1.min <= 5) + +// All the tasks shall finish the second round of global sync within a short time slot. +val times2 = times.map(_._2) +assert(times2.max - times2.min <= 5) + } + + test("throw exception on barrier() call timeout") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "100") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Task 3 shall sleep 200ms to ensure barrier() call timeout + if (context.taskAttemptId() == 3) { +Thread.sleep(200) + } + context.barrier() + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The coordinator didn't get all barrier sync requests")) +assert(error.contains("within 100 ms")) + } + + test("throw exception if barrier() call doesn't happen on every task") { +val conf = new SparkConf() + .set("spark.barrier.sync.timeout", "100") + .set("spark.test.noStageRetry", "true") + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + if (context.taskAttemptId() != 0) { +context.barrier() + } + it +} + +val error = intercept[SparkException] { + rdd2.collect() +}.getMessage +assert(error.contains("The coordinator didn't get all barrier sync requests")) +assert(error.contains("within 100 ms")) + } + + ignore("throw exception
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206709039 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] + + // Any access to this should be synchronized. + private val syncRequestsByStageIdAndAttempt = +new HashMap[Int, HashMap[Int, ArrayBuffer[RpcCallContext]]] + + /** + * Get the array of [[RpcCallContext]]s that correspond to a barrier sync request from a stage + * attempt. + */ + private def getOrInitSyncRequests( + stageId: Int, + stageAttemptId: Int, + numTasks: Int = 0): ArrayBuffer[RpcCallContext] = synchronized { +val syncRequestsByStage = syncRequestsByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, ArrayBuffer[RpcCallContext]]) +syncRequestsByStage.getOrElseUpdate(stageAttemptId, new ArrayBuffer[RpcCallContext](numTasks)) + } + + /** + * Clean up the array of [[RpcCallContext]]s that correspond to a barrier sync request from a + * stage attempt. + */ + private def cleanupSyncRequests(stageId: Int, stageAttemptId: Int): Unit = synchronized { +syncRequestsByStageIdAndAttempt.get(stageId).foreach { syncRequestByStage => + syncRequestByStage.get(stageAttemptId).foreach { syncRequests => +syncRequests.clear() + } + syncRequestByStage -= stageAttemptId + if (syncRequestByStage.isEmpty) { +syncRequestsByStageIdAndAttempt -= stageId + } + logInfo(s"Removed all the pending barrier sync requests from Stage $stageId(Attempt " + +s"$stageAttemptId).") +} + } + + /** + * Get the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def getOrInitBarrierEpoch(stageId: Int, stageAttemptId: Int): Int = synchronized { +val barrierEpochByStage = barrierEpochByStageIdAndAttempt + .getOrElseUpdate(stageId, new HashMap[Int, Int]) +val barrierEpoch = barrierEpochByStage.getOrElseUpdate(stageAttemptId, 0) +logInfo(s"Current barrier epoch for Stage $stageId(Attempt $stageAttemptId) is $barrierEpoch.") +barrierEpoch + } + + /** + * Update the barrier epoch that correspond to a barrier sync request from a stage attempt. + */ + private def updateBarrierEpoch( --- End diff -- Do we have a use case to set an arbitrary epoch number? If not, we might not want to define the method this way. It should be either increment or reset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206649993 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] + + // Any access to this should be synchronized. --- End diff -- Then shall we switch to Java's ConcurrentHashMap? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206627185 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( --- End diff -- * package private * add ScalDoc --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206718064 --- Diff: core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.util.Random + +import org.apache.spark._ + +class BarrierTaskContextSuite extends SparkFunSuite with LocalSparkContext { + + test("global sync by barrier() call") { +val conf = new SparkConf() + .setMaster("local-cluster[4, 1, 1024]") + .setAppName("test-cluster") +sc = new SparkContext(conf) +val rdd = sc.makeRDD(1 to 10, 4) +val rdd2 = rdd.barrier().mapPartitions { (it, context) => + // Sleep for a random time before global sync. + Thread.sleep(Random.nextInt(1000)) + context.barrier() + Seq(System.currentTimeMillis()).iterator +} + +val times = rdd2.collect() +// All the tasks shall finish global sync within a short time slot. +assert(times.max - times.min <= 5) --- End diff -- 5ms seem too risky to me. Actually, 1 second is perhaps okay here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206710204 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -27,6 +27,33 @@ trait BarrierTaskContext extends TaskContext { * Sets a global barrier and waits until all tasks in this stage hit this barrier. Similar to * MPI_Barrier function in MPI, the barrier() function call blocks until all tasks in the same * stage have reached this routine. + * + * This function is expected to be called by EVERY tasks in the same barrier stage in the SAME + * pattern, otherwise you may get a SparkException. Some examples of misuses listed below: + * 1. Only call barrier() function on a subset of all the tasks in the same barrier stage, it + * shall lead to time out of the function call. + * rdd.barrier().mapPartitions { (iter, context) => --- End diff -- This won't be rendered correctly in ScalaDoc/JavaDoc. See https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/functions.scala#L160. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206707616 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} + +import scala.collection.mutable.{ArrayBuffer, HashMap} + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} + +class BarrierCoordinator( +timeout: Long, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private val timer = new Timer("BarrierCoordinator barrier epoch increment timer") + + // Barrier epoch for each stage attempt, fail a sync request if the barrier epoch in the request + // mismatches the barrier epoch in the coordinator. + private val barrierEpochByStageIdAndAttempt = new HashMap[Int, HashMap[Int, Int]] --- End diff -- Also, how about using `AtomicLong` to remember the epoch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206716080 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContextImpl.scala --- @@ -39,8 +44,53 @@ private[spark] class BarrierTaskContextImpl( taskMemoryManager, localProperties, metricsSystem, taskMetrics) with BarrierTaskContext { - // TODO SPARK-24817 implement global barrier. - override def barrier(): Unit = {} + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + private var barrierEpoch = 0 + + private lazy val numTasks = localProperties.getProperty("numTasks", "0").toInt --- End diff -- Shall we use `getTaskInfos().size` instead? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r206716701 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -567,4 +567,14 @@ package object config { .intConf .checkValue(v => v > 0, "The value should be a positive integer.") .createWithDefault(2000) + + private[spark] val BARRIER_SYNC_TIMEOUT = +ConfigBuilder("spark.barrier.sync.timeout") + .doc("The timeout in milliseconds for each barrier() call from a barrier task. If the " + --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org