Hi Ashish, Did you use per-window state (also called partitioned state) in your Trigger? If yes, you need to make sure that it is completely removed in the clear() method because processing time timers won't fire once a window was purged. So you cannot (fully) rely on timers to clean up per-window state.
Best, Fabian 2018-05-14 9:34 GMT+02:00 Kostas Kloudas <k.klou...@data-artisans.com>: > Hi Ashish, > > It would be helpful to share the code of your custom trigger for the first > case. > Without that, we cannot tell what state you create and how/when you > update/clear it. > > Cheers, > Kostas > > On May 14, 2018, at 1:04 AM, ashish pok <ashish...@yahoo.com> wrote: > > Hi Till, > > Thanks for getting back. I am sure that will fix the issue but I feel like > that would potentially mask an issue. I have been going back and forth with > Fabian on a use case where for some of our highly transient datasets, it > might make sense to just use memory based state (except of course data loss > becomes an issue when apps occasionally hit a problem and whole job > restarts or app has to be taken down etc - ie. handling graceful shutdowns > / restarts better essentially). I was on the hook to create a business case > and post it back to this forum (which I am hoping I can get around to at > some point soon). Long story short, this is one of those datasets. > > States in this case are either fired and cleared normally or on processing > timeout. So technically, unless there is a memory leak in app code, memory > usage should plateau out at a high-point. What I was noticing was memory > would start to creep up ever so slowly. > > I couldn't tell exactly why heap utilization kept on growing (ever so > slowly but it had upward trend for sure) because the states should > technically be cleared if not as part of a reducing function then on > timeout. App after running for couple of days would then run into Java Heap > issues. So changing to RocksDB probably will fix the issue but not > necessarily leak of states that should be cleared IMO. Interestingly, I > switched my app from using something like this: > > WindowedStream<BasicFactTuple, String, GlobalWindow> windowedStats = > statsStream > .keyBy(BasicFactTuple::getKey) > .window(GlobalWindows.create()) > .trigger(BitwiseOrTrigger.of(60, AppConfigs.getWindowSize(5*60* > 1000))) > ; > > To > > DataStream<PlatformEvent> processStats = pipStatsStream > .keyBy(BasicFactTuple::getKey) > .process(new > IfStatsReduceProcessFn(AppConfigs.getWindowSize(5*60*1000), > 60)) > > I basically moved logic of trigger to process function over the weekend. > Once I did that, heap is completely stabilized. In trigger implementation, > I was using FIRE_AND_PURGE on trigger condition or onProcessingTime and in > process implementation I am using .clear() method for same. > > I seem to have solved the problem by using process but I'd be interested > to understand the cause of why heap would creep up in trigger scenario. > > Hope this makes sense, > > Ashish > > On Sunday, May 13, 2018, 4:06:59 PM EDT, Till Rohrmann < > till.rohrm...@gmail.com> wrote: > > > Hi Ashish, > > have you tried using Flink's RocksDBStateBackend? If your job accumulates > state exceeding the available main memory, then you have to use a state > backend which can spill to disk. The RocksDBStateBackend offers you exactly > this functionality. > > Cheers, > Till > > On Mon, Apr 30, 2018 at 3:54 PM, ashish pok <ashish...@yahoo.com> wrote: > > All, > > I am using noticing heap utilization creeping up slowly in couple of apps > which eventually lead to OOM issue. Apps only have 1 process function that > cache state. I did make sure I have a clear method invoked when events are > collected normally, on exception and on timeout. > > Are any other best practices others follow for memory backed states? > > Thanks, > > -- Ashish > > > >