Github user zsxwing commented on a diff in the pull request:
https://github.com/apache/spark/pull/11804#discussion_r56866005
--- Diff:
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
---
@@ -52,13 +55,28 @@ class StreamExecution(
/** Minimum amount of time in between the start of each batch. */
private val minBatchTime = 10
- /** Tracks how much data we have processed from each input source. */
- private[sql] val streamProgress = new StreamProgress
+ /**
+ * Tracks how much data we have processed and committed to the sink or
state store from each
+ * input source.
+ */
+ private[sql] val committedOffsets = new StreamProgress
--- End diff --
We need some protection for accessing committedOffsets since StreamProgress
is not thread-safe. How about providing a getter method that is protected by
synchronized and returns a clone `StreamProgress`? Since this is only used in
tests, we don't need to worry about the overhead of `clone`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]