Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/11804#discussion_r56868269
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -142,22 +173,75 @@ class StreamExecution(
/**
* Populate the start offsets to start the execution at the current
offsets stored in the sink
- * (i.e. avoid reprocessing data that we have already processed).
+ * (i.e. avoid reprocessing data that we have already processed). This
function must be called
+ * before any processing occurs and will populate the following fields:
+ * - currentBatchId
+ * - committedOffsets
+ * - availableOffsets
*/
private def populateStartOffsets(): Unit = {
- sink.currentOffset match {
- case Some(c: CompositeOffset) =>
- val storedProgress = c.offsets
- val sources = logicalPlan collect {
- case StreamingRelation(source, _) => source
+ offsetLog.getLatest() match {
+ case Some((batchId, nextOffsets: CompositeOffset)) =>
+ logInfo(s"Resuming continuous query, starting with batch $batchId")
+ currentBatchId = batchId + 1
+ nextOffsets.toStreamProgress(sources, availableOffsets)
+ logDebug(s"Found possibly uncommitted offsets $availableOffsets")
+
+ offsetLog.get(batchId - 1).foreach {
+ case lastOffsets: CompositeOffset =>
+ lastOffsets.toStreamProgress(sources, committedOffsets)
+ logDebug(s"Resuming with committed offsets: $committedOffsets")
}
- assert(sources.size == storedProgress.size)
- sources.zip(storedProgress).foreach { case (source, offset) =>
- offset.foreach(streamProgress.update(source, _))
- }
case None => // We are starting this stream for the first time.
- case _ => throw new IllegalArgumentException("Expected composite
offset from sink")
+ logInfo(s"Starting new continuous query.")
+ currentBatchId = 0
+ commitAndConstructNextBatch()
+
+ case Some((_, offset)) =>
--- End diff --
How about changing the type of `offsetLog` to
`HDFSMetadataLog[CompositeOffset]`? Then you don't need to add this case branch.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]