Thanks Fabian! This seems to be the way to go On Tue, Jun 19, 2018 at 12:18 PM Fabian Hueske <fhue...@gmail.com> wrote:
> Hi Johannes, > > You are right. You should approach the problem with the semantics that you > need before thinking about optimizations such as state size. > > The Table API / SQL offers (in v1.5.0) two types of joins: > 1) Windowed joins where each record joins with records in a time-range of > the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)" > 2) Non-windowed joins, which support arbitrary join predicates but which > fully materialize both inputs. As you mentioned, you can use idle state > retention to remove records from state that have not been accessed for a > certain time. > > Best, Fabian > > 2018-06-18 11:09 GMT+02:00 Johannes Schulte <johannes.schu...@gmail.com>: > >> Hi Fabian, >> >> thanks for the hints, though I somehow got the feeling that I am on the >> wrong track given how much code I would need to write for implementing a >> "blueprint" usecase. >> >> Would a join be more simple using the Table API? In the end it's the >> classical Order & OrderPosition example, where the output is an >> upsert-stream. Would I get the expected behaviour (output elements on every >> update on either side of the input stream). I realize that my session >> window approach wasn't driven by the requirements but by operational >> aspects (state size), so using a concept like idle state retention time >> would be a more natural fit. >> >> Thanks, >> >> Johannes >> >> On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <fhue...@gmail.com> wrote: >> >>> Hi Johannes, >>> >>> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default >>> trigger (see EventTimeSessionWindows.getDefaultTrigger()). >>> >>> I would take the EventTimeTrigger and extend it with early firing >>> functionality. >>> However, there are a few things to consider >>> * you need to be aware that session window can be merged, i.e., two >>> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged >>> when a record at 32 is received. >>> * windows store all records in a list. For every firing, you need to >>> iterate the full list and also track which records you joined already to >>> avoid duplicates. Maybe you can migrate records from the window state into >>> a custom state defined in a ProcessWindowFunction. >>> >>> Best, Fabian >>> >>> >>> >>> >>> >>> 2018-06-13 13:43 GMT+02:00 Johannes Schulte <johannes.schu...@gmail.com> >>> : >>> >>>> Hi, >>>> >>>> I am joining two streams with a session window and want to emit a >>>> joined (early) result for every element arriving on one of the streams. >>>> >>>> Currently the code looks like this: >>>> >>>> s1.join(s2) >>>> .where(s1.id).equalTo(s2.id) >>>> .window(EventTimeSessionWindows.withGap(Time.minutes(15))) >>>> // trigger(?) >>>> .apply(...custom code..) >>>> >>>> What I am missing is the right trigger ala "withEarlyFiring" - do I >>>> need to implement my on trigger for this and if yes, what kind of >>>> functionality must be present to not break the session window semantics? >>>> >>>> Thanks in advance, >>>> >>>> Johannes >>>> >>>> >>> >