Hi Fabian, thanks for the hints, though I somehow got the feeling that I am on the wrong track given how much code I would need to write for implementing a "blueprint" usecase.
Would a join be more simple using the Table API? In the end it's the classical Order & OrderPosition example, where the output is an upsert-stream. Would I get the expected behaviour (output elements on every update on either side of the input stream). I realize that my session window approach wasn't driven by the requirements but by operational aspects (state size), so using a concept like idle state retention time would be a more natural fit. Thanks, Johannes On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[email protected]> wrote: > Hi Johannes, > > EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default > trigger (see EventTimeSessionWindows.getDefaultTrigger()). > > I would take the EventTimeTrigger and extend it with early firing > functionality. > However, there are a few things to consider > * you need to be aware that session window can be merged, i.e., two > session windows A, B with gap 10: A [20,25), B [37, 45), will be merged > when a record at 32 is received. > * windows store all records in a list. For every firing, you need to > iterate the full list and also track which records you joined already to > avoid duplicates. Maybe you can migrate records from the window state into > a custom state defined in a ProcessWindowFunction. > > Best, Fabian > > > > > > 2018-06-13 13:43 GMT+02:00 Johannes Schulte <[email protected]>: > >> Hi, >> >> I am joining two streams with a session window and want to emit a joined >> (early) result for every element arriving on one of the streams. >> >> Currently the code looks like this: >> >> s1.join(s2) >> .where(s1.id).equalTo(s2.id) >> .window(EventTimeSessionWindows.withGap(Time.minutes(15))) >> // trigger(?) >> .apply(...custom code..) >> >> What I am missing is the right trigger ala "withEarlyFiring" - do I need >> to implement my on trigger for this and if yes, what kind of functionality >> must be present to not break the session window semantics? >> >> Thanks in advance, >> >> Johannes >> >> >
