Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11030#discussion_r52082863
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -55,9 +59,89 @@ class StreamExecution(
       private val sources =
         logicalPlan.collect { case s: StreamingRelation => s.source }
     
    -  // Start the execution at the current offsets stored in the sink. (i.e. 
avoid reprocessing data
    -  // that we have already processed).
    -  {
    +  /** When false, signals to the microBatchThread that it should stop 
running. */
    +  @volatile
    +  private var state: State = INITIALIZED
    +
    +  @volatile
    +  private[sql] var lastExecution: QueryExecution = null
    +
    +  @volatile
    +  private[sql] var streamDeathCause: ContinuousQueryException = null
    +
    +  /** The thread that runs the micro-batches of this stream. */
    +  private[sql] val microBatchThread = new Thread(s"stream execution thread 
for $name") {
    +    override def run(): Unit = { runBatches() }
    +  }
    +
    +  /** Whether the query is currently active or not */
    +  override def isActive: Boolean = state == ACTIVE
    +
    +  /** Returns current status of all the sources. */
    +  override def sourceStatuses: Array[SourceStatus] = {
    +    sources.map(s => new SourceStatus(s.toString, 
streamProgress.get(s))).toArray
    +  }
    +
    +  /** Returns current status of the sink. */
    +  override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, 
sink.currentOffset)
    +
    +  /** Returns the [[ContinuousQueryException]] if the query was terminated 
by an exception. */
    +  override def exception: Option[ContinuousQueryException] = 
Option(streamDeathCause)
    +
    +  /**
    +   * Starts the execution. This returns only after the thread has started 
and [[QueryStarted]] event
    +   * has been posted to all the listeners.
    +   */
    +  private[sql] def start(): Unit = {
    +    microBatchThread.setDaemon(true)
    +    microBatchThread.start()
    +    startLatch.await()  // Wait until thread started and QueryStart event 
has been posted
    +  }
    +
    +  /**
    +   * Repeatedly attempts to run batches as data arrives.
    +   *
    +   * Note that this method ensures that [[QueryStarted]] and 
[[QueryTerminated]] events are posted
    +   * so that listeners are guaranteed to get former event before the 
latter. Furthermore, this
    +   * method also ensures that [[QueryStarted]] event is posted before the 
`start()` method returns.
    +   */
    +  private def runBatches(): Unit = {
    +    try {
    +      // Mark ACTIVE and then post the event. QueryStarted event is 
synchronously sent to listeners,
    +      // so must mark this as ACTIVE first.
    +      state = ACTIVE
    +      postEvent(new QueryStarted(this)) // Assumption: Does not throw 
exception.
    +
    +      // Unblock starting thread, so that
    --- End diff --
    
    so that...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to