You are right. Building a large record might result in an OOME. But I think, that would also happen with a regular SessionWindow, RocksDBStatebackend, and a WindowFunction that immediately ships the records it receives from the Iterable. As far as I know, a SessionWindow stores all elements in an internal a ListState. When before iterating over a ListState, the RocksDBStateBackend completely deserializes the list into an ArrayList, i.e., all records of a session would be on the heap already.
If this is a situation that you have to avoid, I think the only way to do this is to implement the sessionization yourself with a ProcessFunction and MapState (keyed on timestamp). This would be a non-trivial task though. In case the SessionWindow + WindowFunction is OK for you (or you implemented your own sessionization logic, e.g., in a ProcessFunction), you could just forward the elements to a modified BucketingSink. Your version of the BucketingSink would need to ensure that files are not closed between checkpoints, i.e., only when a checkpoint barrier is received, it may be closed. Right now, files are closed on timer and/or file size. Since all records of a session are emitted by a single WIndowFunction call, these records won't be interrupted by a barrier. Hence, you'll have a "consistent" state for all windows when a checkpoint is triggered. I'm afraid, I'm not aware of a simpler solution for this use case. Hope it helps, Fabian 2017-07-04 11:24 GMT+02:00 Niels Basjes <ni...@basjes.nl>: > Hi Fabian, > > On Fri, Jun 30, 2017 at 6:27 PM, Fabian Hueske <fhue...@gmail.com> wrote: > >> If I understand your use case correctly, you'd like to hold back all >> events of a session until it ends/timesout and then write all events out. >> So, instead of aggregating per session (the common use case), you'd just >> like to collect the event. >> > > Yes, and I want to write the completed sessions into files. No aggregation > or filtering at all. > The idea is that our DataScience guys who want to analyze sessions have a > much easier task of knowing for certain that they have 'a set of complete > sessions'. > > >> I would implement a simple WindowFunction that just forwards all events >> that it receives from the iterator. Conceptually, the window will just >> collect the events and emit them when the session ended/timedout. >> Then you can add BucketingSink which writes out the events. I'm not sure >> if the BucketingSInk supports buckets based on event-time though. Maybe you >> would need to adapt it a bit to guarantee that all rows of the same session >> are written to the same file. >> Alternatively, the WindowFunction could also emit one large record which >> is a List or Array of events belonging to the same session. >> > > That last one was the idea I had. > Have a window function that keeps the Window until finished, then output > that with the eventtime of the 'end of the session' and use the bucketing > sink to write those to disk. > > The problem (in my mind) that I have with this is that a single session > with a LOT of events would bring the system to a halt because it can > trigger OOM events. > > How should I handle those? > > Niels >