Github user mengxr commented on the issue:

    https://github.com/apache/spark/pull/21898
  
    Here is what I mean:
    
    ~~~scala
    case class ContextBarrierId(stageId: Int, stageAttemptId: Int)
    
    class ContextBarrierState(val numTasks: Int) {
      private var epoch: Int = 0
      private val requesters: ArrayBuffer[RpcCallContext] = ...
      private val timerTask: TimerTask = new TimerTask {
        ...
      }
    
      def handleRequest(requester: RpcCallContext, barrierEpoch: Int): Unit = 
synchronized {
        // start timer if this is the first
        // throw exception 
        // always check if requests = numTasks and if yes reply all, clean 
requesters, and increment counter
      }
    
      private def startTimer(): Unit
    
      def clear(): Unit = synchronized {
        // set epoch to -1
       // clear requesters
        // cancel timer if active
      }
    }
    
    val states = new ConcurrentHashMap[ContextBarrierId, ContextBarrierState]
    
    ... 
    
    case RequestToSync( .... ) =>
      val id = ContextBarrierId(...)
      states.putIfAbsent(id, new ContextBarrierState(numTasks))
      val state = states.get(id)
      state.handleRequest(context, barrierEpoch)
    
    ...
    
    def onStop() {
      states.forEachValue(_.clear())
      states.clear()
    }
    
    ...
    ~~~


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to