HeartSaVioR opened a new pull request #33081:
URL: https://github.com/apache/spark/pull/33081


   ### What changes were proposed in this pull request?
   
   This PR proposes to support native session window. Please refer the 
comments/design doc in SPARK-10816 for more details on the rationalization and 
design (could be outdated a bit compared to the PR).
   
   The definition of the boundary of "session window" is [the timestamp of 
start event ~ the timestamp of last event + gap duration). That said, unlike 
time window, session window is a dynamic window which can expand if new input 
row is added to the session. To handle expansion of session window, Spark 
defines session window per input row, and "merge" windows if they can be merged 
(boundaries are overlapped).
   
   This PR leverages two different approaches on merging session windows:
   
   1. merging session windows with Spark's aggregation logic (a variant of sort 
aggregation)
   2. updating session window for all rows bound to the same session, and 
applying aggregation logic afterwards
   
   First one is preferable as it outperforms compared to the second one, though 
it can be only used if merging session window can be applied altogether with 
aggregation. It is not applicable on all the cases, so second one is used to 
cover the remaining cases.
   
   This PR also applies the optimization on merging input rows and existing 
sessions with retaining the order (group keys + start timestamp of session 
window), leveraging the fact the number of existing sessions per group key 
won't be huge.
   
   The state format is versioned, so that we can bring a new state format if we 
find a better one.
   
   ### Why are the changes needed?
   
   For now, to deal with sessionization, Spark requires end users to play with 
(flat)MapGroupsWithState directly which has a couple of major drawbacks:
   
   1. (flat)MapGroupsWithState is lower level API and end users have to code 
everything in details for defining session window and merging windows
   2. built-in aggregate functions cannot be used and end users have to deal 
with aggregation by themselves
   3. (flat)MapGroupsWithState is only available in Scala/Java.
   
   With native support of session window, end users simply use "session_window" 
like they use "window" for tumbling/sliding window, and leverage built-in 
aggregate functions as well as UDAFs to simply define aggregations.
   
   Quoting the query example from test suite:
   
   ```
       val inputData = MemoryStream[(String, Long)]
   
       // Split the lines into words, treat words as sessionId of events
       val events = inputData.toDF()
         .select($"_1".as("value"), $"_2".as("timestamp"))
         .withColumn("eventTime", $"timestamp".cast("timestamp"))
         .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
         .withWatermark("eventTime", "30 seconds")
   
       val sessionUpdates = events
         .groupBy(session_window($"eventTime", "10 seconds") as 'session, 
'sessionId)
         .agg(count("*").as("numEvents"))
         .selectExpr("sessionId", "CAST(session.start AS LONG)", 
"CAST(session.end AS LONG)",
           "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS 
durationMs",
           "numEvents")
   ```
   
   which is same as StructuredSessionization (native session window is shorter 
and clearer even ignoring model classes).
   
   
https://github.com/apache/spark/blob/39542bb81f8570219770bb6533c077f44f6cbd2a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredSessionization.scala#L66-L105
 
   
   (Worth noting that the code in StructuredSessionization only works with 
processing time. The code doesn't consider old event can update the start time 
of old session.)
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. This PR brings the new feature to support session window on both batch 
and streaming query, which adds a new function "session_window" which usage is 
similar with "window".
   
   ### How was this patch tested?
   
   New test suites. Also tested with benchmark code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to