Replied to the SO but cross posted the answer I put:

1) I believe you have a simple unit issue. The window gap duration of 600
is being specified in seconds Duration.standardSeconds[1] yet new
Instant(long)[2] uses milliseconds which means that the 600 second gap is
larger then the time interval of 700 millis causing the sessions to be
merged.

2) Sessions still use interval windows internally. You will need to compute
what the output window would be after all sessions are merged based upon
your trigger strategy. By default, a session window uses the
IntervalWindow(timestamp, gap duration)[3], and merges all overlapping
windows[4] to create a larger window. For example, if you had the windows
(start time, end time), [10, 14], [12, 18], [4, 14] for the same session
key, they would all be merged producing a single [4, 18] window.


  [1]:
http://joda-time.sourceforge.net/apidocs/org/joda/time/Duration.html#standardSeconds(long)
  [2]:
http://joda-time.sourceforge.net/apidocs/org/joda/time/Instant.html#Instant(long)
  [3]:
https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L59
  [4]:
https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java#L64

On Fri, Aug 24, 2018 at 12:57 AM Bart Aelterman <[email protected]>
wrote:

>
> Hi there,
>
> I am trying to implement a unit test for a pipeline that implements
> session windows. However I am not getting the results I expected. I posted
> the question on Stack Overflow:
> https://stackoverflow.com/questions/51994579/how-to-write-unit-tests-for-session-windows-in-a-beam-pipeline
>
> Sorry for cross posting.
>
> Thanks a lot,
>
> Bart Aelterman
>
>
>

Reply via email to