Github user mengxr commented on the issue: https://github.com/apache/spark/pull/21898 Here is what I mean: ~~~scala case class ContextBarrierId(stageId: Int, stageAttemptId: Int) class ContextBarrierState(val numTasks: Int) { private var epoch: Int = 0 private val requesters: ArrayBuffer[RpcCallContext] = ... private val timerTask: TimerTask = new TimerTask { ... } def handleRequest(requester: RpcCallContext, barrierEpoch: Int): Unit = synchronized { // start timer if this is the first // throw exception // always check if requests = numTasks and if yes reply all, clean requesters, and increment counter } private def startTimer(): Unit def clear(): Unit = synchronized { // set epoch to -1 // clear requesters // cancel timer if active } } val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState] ... case RequestToSync( .... ) => val id = ContextBarrierId(...) states.putIfAbsent(id, new ContextBarrierState(numTasks)) val state = states.get(id) state.handleRequest(context, barrierEpoch) ... def onStop() { states.forEachValue(_.clear()) states.clear() } ... ~~~
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org