jerrypeng commented on code in PR #38517:
URL: https://github.com/apache/spark/pull/38517#discussion_r1049012172
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala:
##########
@@ -727,18 +719,56 @@ class MicroBatchExecution(
withProgressLocked {
sinkCommitProgress = batchSinkProgress
- watermarkTracker.updateWatermark(lastExecution.executedPlan)
- reportTimeTaken("commitOffsets") {
- assert(commitLog.add(currentBatchId,
CommitMetadata(watermarkTracker.currentWatermark)),
- "Concurrent update to the commit log. Multiple streaming jobs
detected for " +
- s"$currentBatchId")
- }
- committedOffsets ++= availableOffsets
+ markMicroBatchEnd()
}
logDebug(s"Completed batch ${currentBatchId}")
}
- /** Execute a function while locking the stream from making an progress */
+
+ /**
+ * Called at the start of the micro batch with given offsets. It takes care
of offset
+ * checkpointing to offset log and any microbatch startup tasks.
+ */
+ protected def markMicroBatchStart(): Unit = {
+ assert(offsetLog.add(currentBatchId,
+ availableOffsets.toOffsetSeq(sources, offsetSeqMetadata)),
+ s"Concurrent update to the log. Multiple streaming jobs detected for
$currentBatchId")
+ logInfo(s"Committed offsets for batch $currentBatchId. " +
+ s"Metadata ${offsetSeqMetadata.toString}")
+ }
+
+ /**
+ * Method called once after the planning is done and before the start of the
microbatch execution.
+ * It can be used to perform any pre-execution tasks.
+ */
+ protected def markMicroBatchExecutionStart(): Unit = {}
+
+ /**
+ * Called after the microbatch has completed execution. It takes care of
committing the offset
+ * to commit log and other bookkeeping.
+ */
+ protected def markMicroBatchEnd(): Unit = {
+ watermarkTracker.updateWatermark(lastExecution.executedPlan)
+ reportTimeTaken("commitOffsets") {
+ assert(commitLog.add(currentBatchId,
CommitMetadata(watermarkTracker.currentWatermark)),
+ "Concurrent update to the commit log. Multiple streaming jobs detected
for " +
+ s"$currentBatchId")
+ }
+ committedOffsets ++= availableOffsets
+ }
+
+ protected def cleanUpLastExecutedMicroBatch(): Unit = {
+ if (currentBatchId != 0) {
+ val prevBatchOff = offsetLog.get(currentBatchId - 1)
+ if (prevBatchOff.isDefined) {
+ commitSources(prevBatchOff.get)
+ } else {
+ throw new IllegalStateException(s"batch ${currentBatchId - 1} doesn't
exist")
+ }
+ }
+ }
+
+ /** Execute a function while locking the stream from making an progress */
Review Comment:
will fix
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]