Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11804#discussion_r56732168
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
    @@ -74,20 +92,32 @@ class StreamExecution(
         override def run(): Unit = { runBatches() }
       }
     
    +  /**
    +   * A write-ahead-log that records the offsets that are present in each 
batch. In order to ensure
    +   * that a given batch will always consist of the same data, we write to 
this log *before* any
    +   * processing is done.  Thus, the Nth record in this log indicated data 
that is currently being
    +   * processed and the N-1th entry indicates which offsets have been 
durably committed to the sink.
    +   */
    +  val offsetLog = new HDFSMetadataLog[Offset](sqlContext, 
metadataDirectory("offsets"))
    +
       /** Whether the query is currently active or not */
       override def isActive: Boolean = state == ACTIVE
     
       /** Returns current status of all the sources. */
       override def sourceStatuses: Array[SourceStatus] = {
    -    sources.map(s => new SourceStatus(s.toString, 
streamProgress.get(s))).toArray
    +    sources.map(s => new SourceStatus(s.toString, 
availableOffsets.get(s))).toArray
       }
     
       /** Returns current status of the sink. */
    -  override def sinkStatus: SinkStatus = new SinkStatus(sink.toString, 
sink.currentOffset)
    +  override def sinkStatus: SinkStatus =
    +    new SinkStatus(sink.toString, 
committedOffsets.toCompositeOffset(sources))
     
       /** Returns the [[ContinuousQueryException]] if the query was terminated 
by an exception. */
       override def exception: Option[ContinuousQueryException] = 
Option(streamDeathCause)
     
    +  private def metadataDirectory(name: String): String =
    --- End diff --
    
    what name is this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to