Hi, since your state (150gb) seems to fit into memory (700gb), I would recommend trying the HashMapStateBackend: https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend (unless you know that your state size is going to increase a lot soon). But I guess you'll have a nice performance improvement.
At the moment I have no idea where else to look for the issue you are describing, but it seems that there are a few things for you to try out to optimize the resource allocation. On Wed, Jun 16, 2021 at 7:23 PM Rommel Holmes <rommelhol...@gmail.com> wrote: > Hi, Xintong and Robert > > Thanks for the reply. > > The checkpoint size for our job is 10-20GB since we are doing incremental > checkpointing, if we do a savepoint, it can be as big as 150GB. > > 1) We will try to make Flink instance bigger. > 2) Thanks for the pointer, we will take a look. > > 3) We do have CPU and memory monitoring, when it is backpressure, the CPU > load increases from 25% to 50% with more spiky shape, but it is not 100%. > As for memory, we monitored (Heap.Committed - Heap.Used) per host, when > backpressure happened, the memory on host is still 500MB ish. > > What we observed is that when backpressure happened, the read state time > slowness happened on one of the hosts, and on different task managers on > this host. The read state time (one metrics we create and measure) on that > host shoots up, from 0.x ms to 40-60 ms. > > We also observed that when this happens, the running compaction time for > RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other > hosts are still 1minute ish. > > We also observed that when this happens, size of the active and unflushed > immutable memtables metrics increased not as fast as before the > backpressure. > > I can provide more context if you are interested. We are still debugging > on this issue. > > Rommel > > > > > > On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger <rmetz...@apache.org> > wrote: > >> Hi Thomas, >> >> My gut feeling is that you can use the available resources more >> efficiently. >> >> What's the size of a checkpoint for your job (you can see that from the >> UI)? >> >> Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory >> available, you might be able to do everything in memory (I might be off by >> a few terabytes here, it all depends on your state size ;) ) >> >> 1. In my experience, it is usually more efficient to have a few large >> Flink instances than many small ones. Maybe try to run 12 TaskManagers (or >> 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb) >> and see how Flink behaves. >> >> 2. I'd say it's a try and see process, with a few educated guesses. Maybe >> check out this: >> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines >> to get some inspiration for making some "back of the napkin" calculations >> on the sizing requirements. >> >> 3. Do you have some monitoring of CPU / memory / network usage in place? >> It would be interesting to see what the mentrics look like when >> everything is ok vs when the job is backpressured. >> >> Best, >> Robert >> >> >> On Wed, Jun 16, 2021 at 3:56 AM Xintong Song <tonysong...@gmail.com> >> wrote: >> >>> Hi Thomas, >>> >>> It would be helpful if you can provide the jobmanager/taskmanager logs, >>> and gc logs if possible. >>> >>> Additionally, you may consider to monitor the cpu/memory related metrics >>> [1], see if there's anything abnormal when the problem is observed. >>> >>> Thank you~ >>> >>> Xintong Song >>> >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html >>> >>> >>> >>> On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <w...@datability.io> wrote: >>> >>>> Hi, >>>> >>>> I'm trying to see if we have been given enough resources (i.e. CPU and >>>> memory) to each task node to perform a deduplication job. Currently, the >>>> job is not running very stable. What I have been observing is that after a >>>> couple of days run, we will suddenly see backpressure happen on one >>>> arbitrary ec2 instance in the cluster and when that happens, we will have >>>> to give up the current state and restart the job with an empty state. We >>>> can no longer take savepoint as it would timeout after 10 minutes, which is >>>> understandable. >>>> >>>> Additional Observations >>>> >>>> When the backpressure happens, we see an increase in our state read >>>> time (we are measuring it using a custom metric) from about 0.1 >>>> milliseconds to 40-60 milliseconds on that specific problematic ec2 >>>> instance. We tried to reboot that ec2 instance, so that the corresponding >>>> tasks would be assigned to a different ec2 instance, but the problem >>>> persists. >>>> >>>> However, I’m not sure if this read time increase is a symptom or the >>>> cause of the problem. >>>> >>>> Background about this deduplication job: >>>> >>>> We are making sessionization with deduplication on an event stream by a >>>> session key that is embedded in the event. The throughput of the input >>>> stream is around 50k records per second. The after-aggregation output is >>>> around 8k records per second. >>>> >>>> We are currently using RocksDb-backend state with SSD support and in >>>> the state, we are storing session keys with a TTL of 1 week. Based on the >>>> current throughput, this could become really huge. I assume RocksDB would >>>> flush to the disc as needed, but please correct me if I am wrong. >>>> >>>> Information about the cluster: >>>> >>>> I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge). >>>> I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job >>>> running in the Yarn session. >>>> >>>> Questions: >>>> >>>> 1. Currently, I'm starting the yarn session w/ 7g memory on both the >>>> Task Manager and and the Job Manager, so that each Yarn container could get >>>> 1 CPU. Is this setting reasonable based on your experience? >>>> >>>> Here is the command I used to start the Yarn cluster: >>>> >>>> export HADOOP_CLASSPATH=`hadoop classpath` && >>>> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached >>>> >>>> 2. Is there a scientific way to tell what's the right amount of >>>> resources I should give to an arbitrary job? Or is this a try and see kinda >>>> process? >>>> >>>> 3. Right now, I'm suspecting resources caused the job to run unstably, >>>> but I'm not quite sure. Any other potential causes here? How should I debug >>>> from here if resources are not the issue? Is there a way to detect memory >>>> leaks? >>>> >>>> Thanks in advance! >>>> >>>> Thomas >>>> >>>> > > -- > Yours > Rommel > ************************************* > I waited patiently for the LORD; > he turned to me and heard my cry. > He lifted me out of the slimy pit, > out of the mud and mire; > he set my feet on a rock > and gave me a firm place to stand. > ************************************* >