Hi Fabian,

thanks for the hints, though I somehow got the feeling that I am on the
wrong track given how much code I would need to write for implementing a
"blueprint" usecase.

Would a join be more simple using the Table API? In the end it's the
classical Order & OrderPosition example, where the output is an
upsert-stream. Would I get the expected behaviour (output elements on every
update on either side of the input stream). I realize that my session
window approach wasn't driven by the requirements but by operational
aspects (state size), so using a concept like idle state retention time
would be a more natural fit.

Thanks,

Johannes

On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <[email protected]> wrote:

> Hi Johannes,
>
> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>
> I would take the EventTimeTrigger and extend it with early firing
> functionality.
> However, there are a few things to consider
> * you need to be aware that session window can be merged, i.e., two
> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
> when a record at 32 is received.
> * windows store all records in a list. For every firing, you need to
> iterate the full list and also track which records you joined already to
> avoid duplicates. Maybe you can migrate records from the window state into
> a custom state defined in a ProcessWindowFunction.
>
> Best, Fabian
>
>
>
>
>
> 2018-06-13 13:43 GMT+02:00 Johannes Schulte <[email protected]>:
>
>> Hi,
>>
>> I am joining two streams with a session window and want to emit a joined
>> (early) result for every element arriving on one of the streams.
>>
>> Currently the code looks like this:
>>
>> s1.join(s2)
>> .where(s1.id).equalTo(s2.id)
>> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
>> // trigger(?)
>> .apply(...custom code..)
>>
>> What I am missing is the right trigger ala "withEarlyFiring" - do I need
>> to implement my on trigger for this and if yes, what kind of functionality
>> must be present to not break the session window semantics?
>>
>> Thanks in advance,
>>
>> Johannes
>>
>>
>

Reply via email to