This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f3ede854d70 [SPARK-42592][SS][DOCS] Document how to perform chained 
time window aggregations
f3ede854d70 is described below

commit f3ede854d7082fd7d9ca4d28cea24aa976e10262
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Tue Feb 28 08:38:07 2023 +0900

    [SPARK-42592][SS][DOCS] Document how to perform chained time window 
aggregations
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to document how to perform chained time window 
aggregations. Although it is introduced as a way to perform chained time window 
aggregations, it can be also used "generally" to apply operations which require 
timestamp column against the time window data.
    
    ### Why are the changes needed?
    
    We didn't document the new functionality in the guide doc in SPARK-40925. 
There was a doc change SPARK-42105, but it only mentioned the unblock of 
limitations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, documentation change.
    
    ### How was this patch tested?
    
    Created a page via `SKIP_API=1 bundle exec jekyll serve --watch` and 
confirmed.
    Screenshot:
    
    <img width="611" alt="스크린샷 2023-02-28 오전 8 32 24" 
src="https://user-images.githubusercontent.com/1317309/221713232-3ea906ce-23f6-4293-82c0-de1e69ea1ee8.png";>
    
    Closes #40188 from HeartSaVioR/SPARK-42592.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md | 137 +++++++++++++++++++++++++
 1 file changed, 137 insertions(+)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 29b2620ad77..a9545d516fb 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -1232,6 +1232,143 @@ local partition, doing partial aggregation can still 
increase the performance si
 
 You can enable 
`spark.sql.streaming.sessionWindow.merge.sessions.in.local.partition` to 
indicate Spark to perform partial aggregation.
 
+#### Representation of the time for time window
+
+In some use cases, it is necessary to extract the representation of the time 
for time window, to apply operations requiring timestamp to the time windowed 
data.
+One example is chained time window aggregations, where users want to define 
another time window against the time window. Say, someone wants to aggregate 5 
minutes time windows as 1 hour tumble time window.
+
+There are two ways to achieve this, like below:
+
+1. Use `window_time` SQL function with time window column as parameter
+2. Use `window` SQL function with time window column as parameter
+
+`window_time` function will produce a timestamp which represents the time for 
time window.
+User can pass the result to the parameter of `window` function (or anywhere 
requiring timestamp) to perform operation(s) with time window which requires 
timestamp.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import spark.implicits._
+
+val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+// Group the data by window and word and compute the count of each group
+val windowedCounts = words.groupBy(
+  window($"timestamp", "10 minutes", "5 minutes"),
+  $"word"
+).count()
+
+// Group the windowed data by another window and word and compute the count of 
each group
+val anotherWindowedCounts = windowedCounts.groupBy(
+  window(window_time($"window"), "1 hour"),
+  $"word"
+).count()
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: 
Timestamp, word: String }
+
+// Group the data by window and word and compute the count of each group
+Dataset<Row> windowedCounts = words.groupBy(
+  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
+  words.col("word")
+).count();
+
+// Group the windowed data by another window and word and compute the count of 
each group
+Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
+  functions.window(functions.window_time("window"), "1 hour"),
+  windowedCounts.col("word")
+).count();
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+{% highlight python %}
+words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+windowedCounts = words.groupBy(
+    window(words.timestamp, "10 minutes", "5 minutes"),
+    words.word
+).count()
+
+# Group the windowed data by another window and word and compute the count of 
each group
+anotherWindowedCounts = windowedCounts.groupBy(
+    window(window_time(windowedCounts.window), "1 hour"),
+    windowedCounts.word
+).count()
+{% endhighlight %}
+
+</div>
+</div>
+
+`window` function does not only take timestamp column, but also take the time 
window column. This is very useful for cases where users want to apply chained 
time window aggregations.
+
+<div class="codetabs">
+<div data-lang="scala"  markdown="1">
+
+{% highlight scala %}
+import spark.implicits._
+
+val words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+// Group the data by window and word and compute the count of each group
+val windowedCounts = words.groupBy(
+  window($"timestamp", "10 minutes", "5 minutes"),
+  $"word"
+).count()
+
+// Group the windowed data by another window and word and compute the count of 
each group
+val anotherWindowedCounts = windowedCounts.groupBy(
+  window($"window", "1 hour"),
+  $"word"
+).count()
+{% endhighlight %}
+
+</div>
+<div data-lang="java"  markdown="1">
+
+{% highlight java %}
+Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: 
Timestamp, word: String }
+
+// Group the data by window and word and compute the count of each group
+Dataset<Row> windowedCounts = words.groupBy(
+  functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
+  words.col("word")
+).count();
+
+// Group the windowed data by another window and word and compute the count of 
each group
+Dataset<Row> anotherWindowedCounts = windowedCounts.groupBy(
+  functions.window("window", "1 hour"),
+  windowedCounts.col("word")
+).count();
+{% endhighlight %}
+
+</div>
+<div data-lang="python"  markdown="1">
+{% highlight python %}
+words = ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+windowedCounts = words.groupBy(
+    window(words.timestamp, "10 minutes", "5 minutes"),
+    words.word
+).count()
+
+# Group the windowed data by another window and word and compute the count of 
each group
+anotherWindowedCounts = windowedCounts.groupBy(
+    window(windowedCounts.window, "1 hour"),
+    windowedCounts.word
+).count()
+{% endhighlight %}
+
+</div>
+</div>
+
 ##### Conditions for watermarking to clean aggregation state
 {:.no_toc}
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to