You are right. Building a large record might result in an OOME.
But I think, that would also happen with a regular SessionWindow,
RocksDBStatebackend, and a WindowFunction that immediately ships the
records it receives from the Iterable.
As far as I know, a SessionWindow stores all elements in an internal a
ListState. When before iterating over a ListState, the RocksDBStateBackend
completely deserializes the list into an ArrayList, i.e., all records of a
session would be on the heap already.

If this is a situation that you have to avoid, I think the only way to do
this is to implement the sessionization yourself with a ProcessFunction and
MapState (keyed on timestamp). This would be a non-trivial task though.

In case the SessionWindow + WindowFunction is OK for you (or you
implemented your own sessionization logic, e.g., in a ProcessFunction), you
could just forward the elements to a modified BucketingSink.
Your version of the BucketingSink would need to ensure that files are not
closed between checkpoints, i.e., only when a checkpoint barrier is
received, it may be closed. Right now, files are closed on timer and/or
file size.
Since all records of a session are emitted by a single WIndowFunction call,
these records won't be interrupted by a barrier. Hence, you'll have a
"consistent" state for all windows when a checkpoint is triggered.

I'm afraid, I'm not aware of a simpler solution for this use case.

Hope it helps, Fabian

2017-07-04 11:24 GMT+02:00 Niels Basjes <ni...@basjes.nl>:

> Hi Fabian,
>
> On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> If I understand your use case correctly, you'd like to hold back all
>> events of a session until it ends/timesout and then write all events out.
>> So, instead of aggregating per session (the common use case), you'd just
>> like to collect the event.
>>
>
> Yes, and I want to write the completed sessions into files. No aggregation
> or filtering at all.
> The idea is that our DataScience guys who want to analyze sessions have a
> much easier task of knowing for certain that they have 'a set of complete
> sessions'.
>
>
>> I would implement a simple WindowFunction that just forwards all events
>> that it receives from the iterator. Conceptually, the window will just
>> collect the events and emit them when the session ended/timedout.
>> Then you can add BucketingSink which writes out the events. I'm not sure
>> if the BucketingSInk supports buckets based on event-time though. Maybe you
>> would need to adapt it a bit to guarantee that all rows of the same session
>> are written to the same file.
>> Alternatively, the WindowFunction could also emit one large record which
>> is a List or Array of events belonging to the same session.
>>
>
> That last one was the idea I had.
> Have a window function that keeps the Window until finished, then output
> that with the eventtime of the 'end of the session' and use the bucketing
> sink to write those to disk.
>
> The problem (in my mind) that I have with this is that a single session
> with a LOT of events would bring the system to a halt because it can
> trigger OOM events.
>
> How should I handle those?
>
> Niels
>

Reply via email to