Ngone51 commented on issue #25235: [SPARK-28483][Core] Fix canceling a spark 
job using barrier mode but barrier tasks blocking on 
BarrierTaskContext.barrier()
URL: https://github.com/apache/spark/pull/25235#issuecomment-515773804
 
 
   How about this way:
   
   In BarrierTaskContext:
   ```
   class BarrierTaskContext { 
       private val latch = new CountDownLatch(1)
   
       def sync(): Boolean = latch.await(timeOut)
   
       def deSync(): Unit = latch.countDown()
    
       def barrier(): Unit = {
           .....
           // 1) use send() rather than askSync() so we don't need to clean RPC 
request
           // 2) when all tasks' RequestToSync msg received by 
BarrierCoordinator,
           // it sends back using execBackendRef.send(DeSyncTask(taskId))
           barrierCoordinator.send(RequestToSync(execBackendRef, taskId, _, _, 
_, _))
           if(!sync()) { // time out}
       }
   }
   ```
   
   In CoarseGrainedExecutorBackend: 
   ```
   ....
   override def receive: PartialFunction[Any, Unit] = {
       case DeSyncTask(taskId) => 
           // invoke BarrierTaskContext.deSync() by taskId
       ...
   }
   ```
   
   And, when task is killed, we could also invoke BarrierTaskContext.deSync() 
to finish sync.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to