This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f3ede854d70 [SPARK-42592][SS][DOCS] Document how to perform chained time window aggregations f3ede854d70 is described below commit f3ede854d7082fd7d9ca4d28cea24aa976e10262 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Tue Feb 28 08:38:07 2023 +0900 [SPARK-42592][SS][DOCS] Document how to perform chained time window aggregations ### What changes were proposed in this pull request? This PR proposes to document how to perform chained time window aggregations. Although it is introduced as a way to perform chained time window aggregations, it can be also used "generally" to apply operations which require timestamp column against the time window data. ### Why are the changes needed? We didn't document the new functionality in the guide doc in SPARK-40925. There was a doc change SPARK-42105, but it only mentioned the unblock of limitations. ### Does this PR introduce _any_ user-facing change? Yes, documentation change. ### How was this patch tested? Created a page via `SKIP_API=1 bundle exec jekyll serve --watch` and confirmed. Screenshot: <img width="611" alt="스크린샷 2023-02-28 오전 8 32 24" src="https://user-images.githubusercontent.com/1317309/221713232-3ea906ce-23f6-4293-82c0-de1e69ea1ee8.png"> Closes #40188 from HeartSaVioR/SPARK-42592. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- docs/structured-streaming-programming-guide.md | 137 +++++++++++++++++++++++++ 1 file changed, 137 insertions(+) diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index 29b2620ad77..a9545d516fb 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -1232,6 +1232,143 @@ local partition, doing partial aggregation can still increase the performance si You can enable `spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition` to indicate Spark to perform partial aggregation. +#### Representation of the time for time window + +In some use cases, it is necessary to extract the representation of the time for time window, to apply operations requiring timestamp to the time windowed data. +One example is chained time window aggregations, where users want to define another time window against the time window. Say, someone wants to aggregate 5 minutes time windows as 1 hour tumble time window. + +There are two ways to achieve this, like below: + +1. Use `window_time` SQL function with time window column as parameter +2. Use `window` SQL function with time window column as parameter + +`window_time` function will produce a timestamp which represents the time for time window. +User can pass the result to the parameter of `window` function (or anywhere requiring timestamp) to perform operation(s) with time window which requires timestamp. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import spark.implicits._ + +val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +val windowedCounts = words.groupBy( + window($"timestamp", "10 minutes", "5 minutes"), + $"word" +).count() + +// Group the windowed data by another window and word and compute the count of each group +val anotherWindowedCounts = windowedCounts.groupBy( + window(window_time($"window"), "1 hour"), + $"word" +).count() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +Dataset<Row> windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), + words.col("word") +).count(); + +// Group the windowed data by another window and word and compute the count of each group +Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy( + functions.window(functions.window_time("window"), "1 hour"), + windowedCounts.col("word") +).count(); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts = words.groupBy( + window(words.timestamp, "10 minutes", "5 minutes"), + words.word +).count() + +# Group the windowed data by another window and word and compute the count of each group +anotherWindowedCounts = windowedCounts.groupBy( + window(window_time(windowedCounts.window), "1 hour"), + windowedCounts.word +).count() +{% endhighlight %} + +</div> +</div> + +`window` function does not only take timestamp column, but also take the time window column. This is very useful for cases where users want to apply chained time window aggregations. + +<div class="codetabs"> +<div data-lang="scala" markdown="1"> + +{% highlight scala %} +import spark.implicits._ + +val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +val windowedCounts = words.groupBy( + window($"timestamp", "10 minutes", "5 minutes"), + $"word" +).count() + +// Group the windowed data by another window and word and compute the count of each group +val anotherWindowedCounts = windowedCounts.groupBy( + window($"window", "1 hour"), + $"word" +).count() +{% endhighlight %} + +</div> +<div data-lang="java" markdown="1"> + +{% highlight java %} +Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } + +// Group the data by window and word and compute the count of each group +Dataset<Row> windowedCounts = words.groupBy( + functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), + words.col("word") +).count(); + +// Group the windowed data by another window and word and compute the count of each group +Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy( + functions.window("window", "1 hour"), + windowedCounts.col("word") +).count(); +{% endhighlight %} + +</div> +<div data-lang="python" markdown="1"> +{% highlight python %} +words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } + +# Group the data by window and word and compute the count of each group +windowedCounts = words.groupBy( + window(words.timestamp, "10 minutes", "5 minutes"), + words.word +).count() + +# Group the windowed data by another window and word and compute the count of each group +anotherWindowedCounts = windowedCounts.groupBy( + window(windowedCounts.window, "1 hour"), + windowedCounts.word +).count() +{% endhighlight %} + +</div> +</div> + ##### Conditions for watermarking to clean aggregation state {:.no_toc} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org