Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/19926#discussion_r157041705
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -231,14 +201,14 @@ class StreamExecution(
* processing is done. Thus, the Nth record in this log indicated data
that is currently being
* processed and the N-1th entry indicates which offsets have been
durably committed to the sink.
*/
- val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
+ def offsetLog: OffsetSeqLog
/**
* A log that records the batch ids that have completed. This is used to
check if a batch was
* fully processed, and its output was committed to the sink, hence no
need to process it again.
* This is used (for instance) during restart, to help identify which
batch to run next.
*/
- val batchCommitLog = new BatchCommitLog(sparkSession,
checkpointFile("commits"))
--- End diff --
let's keep `batchCommitLog` and `offsetLog` in the base class since both
subclasses need to initialize them. And we can rename `batchCommitLog` to
`commitLog` to make it more general.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]