Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11940#discussion_r57471001
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -101,6 +102,65 @@ class StreamExecution(
       private val offsetLog =
         new HDFSMetadataLog[CompositeOffset](sqlContext, 
checkpointFile("offsets"))
     
    +  /** A monitor to protect "uninterruptible" and "interrupted" */
    +  private val uninterruptibleLock = new Object
    +
    +  /**
    +   * Indicates if "microBatchThread" are in the uninterruptible status. If 
so, interrupting
    +   * "microBatchThread" will be deferred until "microBatchThread" enters 
into the interruptible
    +   * status.
    +   */
    +  @GuardedBy("uninterruptibleLock")
    +  private var uninterruptible = false
    +
    +  /**
    +   * Indicates if we should interrupt "microBatchThread" when we are 
leaving the uninterruptible
    +   * zone.
    +   */
    +  @GuardedBy("uninterruptibleLock")
    +  private var interrupted = false
    +
    +  /**
    +   * Interrupt "microBatchThread" if possible. If "microBatchThread" is in 
the uninterruptible
    +   * status. "microBatchThread" won't be interrupted until it enters into 
the interruptible status.
    +   */
    +  private def interruptMicroBatchThreadSafely(): Unit = {
    +    uninterruptibleLock.synchronized {
    +      if (uninterruptible) {
    +        interrupted = true
    +      } else {
    +        microBatchThread.interrupt()
    +      }
    +    }
    +  }
    +
    +  /**
    +   * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" 
won't be interrupted before
    +   * returning from `f`.
    +   */
    +  private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = {
    +    assert(Thread.currentThread() == microBatchThread)
    +    uninterruptibleLock.synchronized {
    +      uninterruptible = true
    +      // Clear the interrupted status if it's set.
    +      if (Thread.interrupted()) {
    --- End diff --
    
    `interrupted = Thread.interrupted()` is simpler.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to