Thanks Fabian! This seems to be the way to go

On Tue, Jun 19, 2018 at 12:18 PM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Johannes,
>
> You are right. You should approach the problem with the semantics that you
> need before thinking about optimizations such as state size.
>
> The Table API / SQL offers (in v1.5.0) two types of joins:
> 1) Windowed joins where each record joins with records in a time-range of
> the other stream "(A.ts BETWEEN B.ts - 1 hour AND B.ts + 1 hour)"
> 2) Non-windowed joins, which support arbitrary join predicates but which
> fully materialize both inputs. As you mentioned, you can use idle state
> retention to remove records from state that have not been accessed for a
> certain time.
>
> Best, Fabian
>
> 2018-06-18 11:09 GMT+02:00 Johannes Schulte <johannes.schu...@gmail.com>:
>
>> Hi Fabian,
>>
>> thanks for the hints, though I somehow got the feeling that I am on the
>> wrong track given how much code I would need to write for implementing a
>> "blueprint" usecase.
>>
>> Would a join be more simple using the Table API? In the end it's the
>> classical Order & OrderPosition example, where the output is an
>> upsert-stream. Would I get the expected behaviour (output elements on every
>> update on either side of the input stream). I realize that my session
>> window approach wasn't driven by the requirements but by operational
>> aspects (state size), so using a concept like idle state retention time
>> would be a more natural fit.
>>
>> Thanks,
>>
>> Johannes
>>
>> On Mon, Jun 18, 2018 at 9:57 AM Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> Hi Johannes,
>>>
>>> EventTimeSessionWindows [1] use the EventTimeTrigger [2] as default
>>> trigger (see EventTimeSessionWindows.getDefaultTrigger()).
>>>
>>> I would take the EventTimeTrigger and extend it with early firing
>>> functionality.
>>> However, there are a few things to consider
>>> * you need to be aware that session window can be merged, i.e., two
>>> session windows A, B with gap 10: A [20,25), B [37, 45), will be merged
>>> when a record at 32 is received.
>>> * windows store all records in a list. For every firing, you need to
>>> iterate the full list and also track which records you joined already to
>>> avoid duplicates. Maybe you can migrate records from the window state into
>>> a custom state defined in a ProcessWindowFunction.
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>>
>>>
>>> 2018-06-13 13:43 GMT+02:00 Johannes Schulte <johannes.schu...@gmail.com>
>>> :
>>>
>>>> Hi,
>>>>
>>>> I am joining two streams with a session window and want to emit a
>>>> joined (early) result for every element arriving on one of the streams.
>>>>
>>>> Currently the code looks like this:
>>>>
>>>> s1.join(s2)
>>>> .where(s1.id).equalTo(s2.id)
>>>> .window(EventTimeSessionWindows.withGap(Time.minutes(15)))
>>>> // trigger(?)
>>>> .apply(...custom code..)
>>>>
>>>> What I am missing is the right trigger ala "withEarlyFiring" - do I
>>>> need to implement my on trigger for this and if yes, what kind of
>>>> functionality must be present to not break the session window semantics?
>>>>
>>>> Thanks in advance,
>>>>
>>>> Johannes
>>>>
>>>>
>>>
>

Reply via email to