This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e5e2b914de6 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit e5e2b914de6 is described below commit e5e2b914de6a498ae191bdb0d02308c5b6f13f15 Author: bartosz25 <bartkoniec...@yahoo.fr> AuthorDate: Sat Jul 15 08:31:31 2023 -0500 [MINOR][SS][DOCS] Fix typos in the Scaladoc and make the semantic of getCurrentWatermarkMs explicit ### What changes were proposed in this pull request? Improve the code comments: 1. Rate micro-batch data source Scaladoc parameters aren't consistent with the options really supported by this data source. 2. The `getCurrentWatermarkMs` has a special semantic for the 1st micro-batch when the watermark is not set yet. IMO, it should return `Option[Long]`, hence `None` instead of `0` for the first micro-batch, but since it's a breaking change, I preferred to add a note on that instead. ### Why are the changes needed? 1. Avoid confusion while using the classes and methods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? The tests weren't added because the change is only at the Scaladoc level. I affirm that the contribution is my original work and that I license the work to the project under the project's open source license. Closes #41988 from bartosz25/comments_fixes. Authored-by: bartosz25 <bartkoniec...@yahoo.fr> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../sql/execution/streaming/sources/RatePerMicroBatchProvider.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/streaming/GroupState.scala | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala index ccf8b0a7b92..41878a6a549 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala @@ -34,11 +34,11 @@ import org.apache.spark.sql.util.CaseInsensitiveStringMap * with 0L. * * This source supports the following options: - * - `rowsPerMicroBatch` (e.g. 100): How many rows should be generated per micro-batch. + * - `rowsPerBatch` (e.g. 100): How many rows should be generated per micro-batch. * - `numPartitions` (e.g. 10, default: Spark's default parallelism): The partition number for the * generated rows. * - `startTimestamp` (e.g. 1000, default: 0): starting value of generated time - * - `advanceMillisPerMicroBatch` (e.g. 1000, default: 1000): the amount of time being advanced in + * - `advanceMillisPerBatch` (e.g. 1000, default: 1000): the amount of time being advanced in * generated time on each micro-batch. * * Unlike `rate` data source, this data source provides a consistent set of input rows per diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala index 2c8f1db74f8..f08a2fd3cc5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/GroupState.scala @@ -315,6 +315,11 @@ trait GroupState[S] extends LogicalGroupState[S] { * * @note In a streaming query, this can be called only when watermark is set before calling * `[map/flatMap]GroupsWithState`. In a batch query, this method always returns -1. + * @note The watermark gets propagated in the end of each query. As a result, this method will + * return 0 (1970-01-01T00:00:00) for the first micro-batch. If you use this value + * as a part of the timestamp set in the `setTimeoutTimestamp`, it may lead to the + * state expiring immediately in the next micro-batch, once the watermark gets the + * real value from your data. */ @throws[UnsupportedOperationException]( "if watermark has not been set before in [map|flatMap]GroupsWithState") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org