Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Thanks Till, I will try to create an instance of app will smaller heap and get a couple of dumps as well. I should be ok to share that on google drive.  - Ashish On Tuesday, July 31, 2018, 7:49 AM, Till Rohrmann wrote: Hi Ashish, FIRE_AND_PURGE should also clear the window state. Yes I mean w

Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish, FIRE_AND_PURGE should also clear the window state. Yes I mean with active windows, windows which have not been purged yet. Maybe Aljoscha knows more about why the window state is growing (I would not rule out a bug). Cheers, Till On Tue, Jul 31, 2018 at 1:45 PM ashish pok wrote: >

Re: Questions on Unbounded number of keys

2018-07-31 Thread ashish pok
Hi Till, Keys are unbounded (a group of events have same key but that key doesnt repeat after it is fired other than some odd delayed events). So basically there 1 key that will be aligned to a window. When you say key space of active windows, does that include keys for windows that have already

Re: Questions on Unbounded number of keys

2018-07-31 Thread Till Rohrmann
Hi Ashish, the processing time session windows need to store state in the StateBackends and I assume that your key space of active windows is constantly growing. That could explain why you are seeing an ever increasing memory footprint. But without knowing the input stream and what the UDFs do thi

Re: Questions on Unbounded number of keys

2018-07-30 Thread Fabian Hueske
Hi Chang, The state handle objects are not created per key but just once per function instance. Instead they route state accesses to the backend (JVM heap or RocksDB) for the currently active key. Best, Fabian 2018-07-30 12:19 GMT+02:00 Chang Liu : > Hi Andrey, > > Thanks for your reply. My que

Re: Questions on Unbounded number of keys

2018-07-30 Thread Chang Liu
Hi Andrey, Thanks for your reply. My question might be silly, but there is still one part I would like to fully understand. For example, in the following example: class MyFunction extends KeyedProcessFunction[String, Click, Click] { // keyed by Session ID lazy val userId: ValueState[String] =

Re: Questions on Unbounded number of keys

2018-07-28 Thread Ashish Pokharel
Andrey, Till, This doesn’t jive with what I have noticed (fully acknowledge that I am still getting hang of the framework). I sent a couple of notes on this in earlier threads. With this very simple processing, I am running into slow creep up of memory with unbounded keys, which eventually en

Re: Questions on Unbounded number of keys

2018-07-26 Thread Andrey Zagrebin
Hi Chang Liu, The unbounded nature of the stream keyed or not should not lead to out of memory. Flink parallel keyed operator instances have fixed number (parallelism) and just process some range of keyed elements, in your example it is a subrange of session ids. The keyed processed element

Re: Questions on Unbounded number of keys

2018-07-25 Thread Chang Liu
Hi Till, Thanks for your reply. But I think maybe I did not make my question clear. My question is not about whether the States within each keyed operator instances will run out of memory. My question is about, whether the unlimited keyed operator instances themselves will run out of memory. S

Re: Questions on Unbounded number of keys

2018-07-24 Thread Till Rohrmann
Hi Chang Liu, if you are dealing with an unlimited number of keys and keep state around for every key, then your state size will keep growing with the number of keys. If you are using the FileStateBackend which keeps state in memory, you will eventually run into an OutOfMemoryException. One way to

Questions on Unbounded number of keys

2018-07-24 Thread Chang Liu
Dear All, I have questions regarding the keys. In general, the questions are: what happens if I am doing keyBy based on unlimited number of keys? How Flink is managing each KeyedStream under the hood? Will I get memory overflow, for example, if every KeyStream associated with a specific key is t