Hi,

since your state (150gb) seems to fit into memory (700gb), I would
recommend trying the HashMapStateBackend:
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/state/state_backends/#the-hashmapstatebackend
(unless you know that your state size is going to increase a lot soon).
But I guess you'll have a nice performance improvement.

At the moment I have no idea where else to look for the issue you are
describing, but it seems that there are a few things for you to try out to
optimize the resource allocation.

On Wed, Jun 16, 2021 at 7:23 PM Rommel Holmes <rommelhol...@gmail.com>
wrote:

> Hi, Xintong and Robert
>
> Thanks for the reply.
>
> The checkpoint size for our job is 10-20GB since we are doing incremental
> checkpointing, if we do a savepoint, it can be as big as 150GB.
>
> 1) We will try to make Flink instance bigger.
> 2) Thanks for the pointer, we will take a look.
>
> 3) We do have CPU and memory monitoring, when it is backpressure, the CPU
> load increases from 25% to 50% with more spiky shape, but it is not 100%.
> As for memory, we monitored (Heap.Committed - Heap.Used) per host, when
> backpressure happened, the memory on host is still 500MB ish.
>
> What we observed is that when backpressure happened, the read state time
> slowness happened on one of the hosts, and on different task managers on
> this host. The read state time (one metrics we create and measure) on that
> host shoots up, from 0.x ms to 40-60 ms.
>
> We also observed that when this happens, the running compaction time for
> RocksDB on that host gets longer, from 1 minutes to over 2 minutes. other
> hosts are still 1minute ish.
>
> We also observed that when this happens, size of the active and unflushed
> immutable memtables metrics increased not as fast as before the
> backpressure.
>
> I can provide more context if you are interested. We are still debugging
> on this issue.
>
> Rommel
>
>
>
>
>
> On Wed, Jun 16, 2021 at 4:25 AM Robert Metzger <rmetz...@apache.org>
> wrote:
>
>> Hi Thomas,
>>
>> My gut feeling is that you can use the available resources more
>> efficiently.
>>
>> What's the size of a checkpoint for your job (you can see that from the
>> UI)?
>>
>> Given that your cluster has has an aggregate of 64 * 12 = 768gb of memory
>> available, you might be able to do everything in memory (I might be off by
>> a few terabytes here, it all depends on your state size ;) )
>>
>> 1. In my experience, it is usually more efficient to have a few large
>> Flink instances than many small ones. Maybe try to run 12 TaskManagers (or
>> 11 to make the JM fit) with 58gb of memory (the JM can stick to the 7gb)
>> and see how Flink behaves.
>>
>> 2. I'd say it's a try and see process, with a few educated guesses. Maybe
>> check out this:
>> https://www.ververica.com/blog/how-to-size-your-apache-flink-cluster-general-guidelines
>> to get some inspiration for making some "back of the napkin" calculations
>> on the sizing requirements.
>>
>> 3. Do you have some monitoring of CPU / memory / network usage in place?
>> It would be interesting to see what the mentrics look like when
>> everything is ok vs when the job is backpressured.
>>
>> Best,
>> Robert
>>
>>
>> On Wed, Jun 16, 2021 at 3:56 AM Xintong Song <tonysong...@gmail.com>
>> wrote:
>>
>>> Hi Thomas,
>>>
>>> It would be helpful if you can provide the jobmanager/taskmanager logs,
>>> and gc logs if possible.
>>>
>>> Additionally, you may consider to monitor the cpu/memory related metrics
>>> [1], see if there's anything abnormal when the problem is observed.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/monitoring/metrics.html
>>>
>>>
>>>
>>> On Wed, Jun 16, 2021 at 8:11 AM Thomas Wang <w...@datability.io> wrote:
>>>
>>>> Hi,
>>>>
>>>> I'm trying to see if we have been given enough resources (i.e. CPU and
>>>> memory) to each task node to perform a deduplication job. Currently, the
>>>> job is not running very stable. What I have been observing is that after a
>>>> couple of days run, we will suddenly see backpressure happen on one
>>>> arbitrary ec2 instance in the cluster and when that happens, we will have
>>>> to give up the current state and restart the job with an empty state. We
>>>> can no longer take savepoint as it would timeout after 10 minutes, which is
>>>> understandable.
>>>>
>>>> Additional Observations
>>>>
>>>> When the backpressure happens, we see an increase in our state read
>>>> time (we are measuring it using a custom metric) from about 0.1
>>>> milliseconds to 40-60 milliseconds on that specific problematic ec2
>>>> instance. We tried to reboot that ec2 instance, so that the corresponding
>>>> tasks would be assigned to a different ec2 instance, but the problem
>>>> persists.
>>>>
>>>> However, I’m not sure if this read time increase is a symptom or the
>>>> cause of the problem.
>>>>
>>>> Background about this deduplication job:
>>>>
>>>> We are making sessionization with deduplication on an event stream by a
>>>> session key that is embedded in the event. The throughput of the input
>>>> stream is around 50k records per second. The after-aggregation output is
>>>> around 8k records per second.
>>>>
>>>> We are currently using RocksDb-backend state with SSD support and in
>>>> the state, we are storing session keys with a TTL of 1 week. Based on the
>>>> current throughput, this could become really huge. I assume RocksDB would
>>>> flush to the disc as needed, but please correct me if I am wrong.
>>>>
>>>> Information about the cluster:
>>>>
>>>> I'm running on an AWS EMR cluster with 12 ec2 instances (r5d.2xlarge).
>>>> I'm using Flink 1.11.2 in Yarn session mode. Currently there is only 1 job
>>>> running in the Yarn session.
>>>>
>>>> Questions:
>>>>
>>>> 1. Currently, I'm starting the yarn session w/ 7g memory on both the
>>>> Task Manager and and the Job Manager, so that each Yarn container could get
>>>> 1 CPU. Is this setting reasonable based on your experience?
>>>>
>>>> Here is the command I used to start the Yarn cluster:
>>>>
>>>> export HADOOP_CLASSPATH=`hadoop classpath` &&
>>>> /usr/lib/flink/bin/yarn-session.sh -jm 7g -tm 7g --detached
>>>>
>>>> 2. Is there a scientific way to tell what's the right amount of
>>>> resources I should give to an arbitrary job? Or is this a try and see kinda
>>>> process?
>>>>
>>>> 3. Right now, I'm suspecting resources caused the job to run unstably,
>>>> but I'm not quite sure. Any other potential causes here? How should I debug
>>>> from here if resources are not the issue? Is there a way to detect memory
>>>> leaks?
>>>>
>>>> Thanks in advance!
>>>>
>>>> Thomas
>>>>
>>>>
>
> --
>      Yours
>      Rommel
> *************************************
>   I  waited patiently for the LORD;
>    he turned to me and heard my cry.
>  He lifted me out of the slimy pit,
>    out of the mud and mire;
> he set my feet on a rock
>    and gave me a firm place to stand.
> *************************************
>

Reply via email to