Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/11804#discussion_r56737797
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -142,22 +173,75 @@ class StreamExecution(
/**
* Populate the start offsets to start the execution at the current
offsets stored in the sink
- * (i.e. avoid reprocessing data that we have already processed).
+ * (i.e. avoid reprocessing data that we have already processed). This
function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * - availableOffsets
*/
private def populateStartOffsets(): Unit = {
- sink.currentOffset match {
- case Some(c: CompositeOffset) =>
- val storedProgress = c.offsets
- val sources = logicalPlan collect {
- case StreamingRelation(source, _) => source
+ offsetLog.getLatest() match {
+ case Some((batchId, nextOffsets: CompositeOffset)) =>
+ logInfo(s"Resuming continuous query, starting with batch $batchId")
+ currentBatchId = batchId + 1
+ nextOffsets.toStreamProgress(sources, availableOffsets)
+ logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+
+ offsetLog.get(batchId - 1).foreach {
+ case lastOffsets: CompositeOffset =>
+ lastOffsets.toStreamProgress(sources, committedOffsets)
+ logDebug(s"Resuming with committed offsets: $committedOffsets")
}
- assert(sources.size == storedProgress.size)
- sources.zip(storedProgress).foreach { case (source, offset) =>
- offset.foreach(streamProgress.update(source, _))
- }
case None => // We are starting this stream for the first time.
- case _ => throw new IllegalArgumentException("Expected composite
offset from sink")
+ logInfo(s"Starting new continuous query.")
+ currentBatchId = 0
+ commitAndConstructNextBatch()
+
+ case Some((_, offset)) =>
+ sys.error(s"Invalid offset $offset")
+ }
+ }
+
+ /**
+ * Returns true if there is any new data available to be processed.
+ */
+ def dataAvailable: Boolean = {
+ availableOffsets.exists {
+ case (source, available) =>
+ committedOffsets
+ .get(source)
+ .map(committed => committed < available)
+ .getOrElse(true)
+ }
+ }
+
+ /**
+ * Queries all of the sources to see if any new data is available. When
there is new data the
+ * batchId counter is incremented and a new log entry is written with
the newest offsets.
+ *
+ * Note that committing the offsets for a new batch implicitly marks the
previous batch as
+ * finished and thus this method should only be called when all
currently available data
+ * has been written to the sink.
+ */
+ def commitAndConstructNextBatch(): Boolean =
committedOffsets.synchronized {
--- End diff --
private. And why needs `committedOffsets.synchronized`?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]