Github user mengxr commented on the issue:
https://github.com/apache/spark/pull/21898
Here is what I mean:
~~~scala
case class ContextBarrierId(stageId: Int, stageAttemptId: Int)
class ContextBarrierState(val numTasks: Int) {
private var epoch: Int = 0
private val requesters: ArrayBuffer[RpcCallContext] = ...
private val timerTask: TimerTask = new TimerTask {
...
}
def handleRequest(requester: RpcCallContext, barrierEpoch: Int): Unit =
synchronized {
// start timer if this is the first
// throw exception
// always check if requests = numTasks and if yes reply all, clean
requesters, and increment counter
}
private def startTimer(): Unit
def clear(): Unit = synchronized {
// set epoch to -1
// clear requesters
// cancel timer if active
}
}
val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState]
...
case RequestToSync( .... ) =>
val id = ContextBarrierId(...)
states.putIfAbsent(id, new ContextBarrierState(numTasks))
val state = states.get(id)
state.handleRequest(context, barrierEpoch)
...
def onStop() {
states.forEachValue(_.clear())
states.clear()
}
...
~~~
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]