Well you can only performance test it beforehand in different scenarios with 
different configurations. 

I am not sure what exactly your state holds (eg how many objects etc), but if 
it is Java objects then 3 times might be a little bit low (depends also how you 
initially tested state size) - however Flink optimizes this as well. 
Nevertheless, something like Rocksdb is probably a better solution for larger 
states.

> On 29. Oct 2017, at 21:15, Ashish Pokharel <ashish...@yahoo.com> wrote:
> 
> Hi Till,
> 
> I got the same feedback from Robert Metzger over in Stackflow. I have 
> switched my app to use RocksDB and as yes, it did stabilize the app :) 
> 
> However, I am still struggling with how to map out my TMs and JMs memory, 
> number of slots per TMs etc. Currently I am using 60 slots with 10 TMs and 60 
> GB of total cluster memory. Idea was to make the states distributed and 
> approx. 1 GB of memory per slot. I have also changed 
> containerized.heap-cutoff-ratio config to 0.3 to allow for a little room for 
> RocksDB (RocksDB is using basic spinning disk optimized pre-defined configs 
> but we do have SSDs on our Prod machines that we can leverage in future too) 
> and set taskmanager.memory.off-heap to true.It feels more experimental at 
> this point than an exact science :) If there are any further guidelines on 
> how we can plan for this as we open up the flood gates to stream heavy 
> continuous streams, that will be great.
> 
> Thanks again,
> 
> Ashish
> 
>> On Oct 27, 2017, at 8:45 AM, Till Rohrmann <trohrm...@apache.org> wrote:
>> 
>> Hi Ashish,
>> 
>> what you are describing should be a good use case for Flink and it should be 
>> able to run your program.
>> 
>> When you are seeing a GC overhead limit exceeded error, then it means that 
>> Flink or your program are creating too many/too large objects filling up the 
>> memory in a short time. I would recommend checking your user program to see 
>> whether you can avoid unnecessary object instantiations and whether it is 
>> possible to reuse created objects.
>> 
>> Concerning Flink's state backends, the memory state backend is currently not 
>> able to spill to disk. Also the managed memory is only relevant for 
>> DataSet/batch programs and not streaming programs. Therefore, I would 
>> recommend you to try out the RocksDB state backend which is able to 
>> gracefully spill to disk if the state size should grow too large. 
>> Consequently, you don't have to adjust the managed memory settings because 
>> they currently don't have an effect on streaming programs. 
>> 
>> My gut feeling is that switching to the RocksDBStateBackend could already 
>> solve your problems. If this should not be the case, then please let me know 
>> again.
>> 
>> Cheers,
>> Till
>> 
>>> On Fri, Oct 27, 2017 at 5:27 AM, Ashish Pokharel <ashish...@yahoo.com> 
>>> wrote:
>>> Hi Everyone,
>>> 
>>> We have hit a roadblock moving an app at Production scale and was hoping to 
>>> get some guidance. Application is pretty common use case in stream 
>>> processing but does require maintaining large number of keyed states. We 
>>> are processing 2 streams - one of which is a daily burst of stream 
>>> (normally around 50 mil but could go upto 100 mil in one hour burst) and 
>>> other is constant stream of around 70-80 mil per hour. We are doing a low 
>>> level join using CoProcess function between the two keyed streams. 
>>> CoProcess function needs to refresh (upsert) state from the daily burst 
>>> stream and decorate constantly streaming data with values from state built 
>>> using bursty stream. All of the logic is working pretty well in a 
>>> standalone Dev environment. We are throwing about 500k events of bursty 
>>> traffic for state and about 2-3 mil of data stream. We have 1 TM with 16GB 
>>> memory, 1 JM with 8 GB memory and 16 slots (1 per core on the server) on 
>>> the server. We have been taking savepoints in case we need to restart app 
>>> for with code changes etc. App does seem to recover from state very well as 
>>> well. Based on the savepoints, total volume of state in production flow 
>>> should be around 25-30GB.
>>> 
>>> At this point, however, we are trying deploy the app at production scale. 
>>> App also has a flag that can be set at startup time to ignore data stream 
>>> so we can simply initialize state. So basically we are trying to see if we 
>>> can initialize the state first and take a savepoint as test. At this point 
>>> we are using 10 TM with 4 slots and 8GB memory each (idea was to allocate 
>>> around 3 times estimated state size to start with) but TMs keep getting 
>>> killed by YARN with a GC Overhead Limit Exceeded error. We have gone 
>>> through quite a few blogs/docs on Flink Management Memory, off-heap vs heap 
>>> memory, Disk Spill over, State Backend etc. We did try to tweak 
>>> managed-memory configs in multiple ways (off/on heap, fraction, network 
>>> buffers etc) but can’t seem to figure out good way to fine tune the app to 
>>> avoid issues. Ideally, we would hold state in memory (we do have enough 
>>> capacity in Production environment for it) for performance reasons and 
>>> spill over to disk (which I believe Flink should provide out of the box?). 
>>> It feels like 3x anticipated state volume in cluster memory should have 
>>> been enough to just initialize state. So instead of just continuing to 
>>> increase memory (which may or may not help as error is regarding GC 
>>> overhead) we wanted to get some input from experts on best practices and 
>>> approach to plan this application better.
>>> 
>>> Appreciate your input in advance!
>> 
> 

Reply via email to