Replied to the SO but cross posted the answer I put: 1) I believe you have a simple unit issue. The window gap duration of 600 is being specified in seconds Duration.standardSeconds[1] yet new Instant(long)[2] uses milliseconds which means that the 600 second gap is larger then the time interval of 700 millis causing the sessions to be merged.
2) Sessions still use interval windows internally. You will need to compute what the output window would be after all sessions are merged based upon your trigger strategy. By default, a session window uses the IntervalWindow(timestamp, gap duration)[3], and merges all overlapping windows[4] to create a larger window. For example, if you had the windows (start time, end time), [10, 14], [12, 18], [4, 14] for the same session key, they would all be merged producing a single [4, 18] window. [1]: http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long) [2]: http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long) [3]: https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59 [4]: https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64 On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <[email protected]> wrote: > > Hi there, > > I am trying to implement a unit test for a pipeline that implements > session windows. However I am not getting the results I expected. I posted > the question on Stack Overflow: > https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline > > Sorry for cross posting. > > Thanks a lot, > > Bart Aelterman > > >
