Hi Abhishek,

Thanks for sharing the experiment! As for the performance question, I believe 
you could give a try on Flink CEP with version >= 1.16.0, which includes the 
optimization introduced in 
FLINK-23890<https://issues.apache.org/jira/browse/FLINK-23890>. This 
optimization will reduce lots of timer registration which can increase the 
throughput significantly. In our own experiment, given same papalism settings, 
the same job in 1.16.0 will require much less CPU usage than that in 1.15.x. 
(~100% -> ~30%). In fact, due to the implementation, the optimization should 
make CEP 10x better.  If you must use Flink1.15.0 for some reason, you may 
cherry-pick the relevant change and recompile the CEP library by yourself. The 
change does not depend on some framework changes so it may not cost much 
efforts.
As for the checkpoint size issue, CEP Operator will store immediate matching 
result in the state. So if there are no new events, then there are no new 
partial matched and CEP Operator will not use more state.

Best,
Biao Geng

From: Abhishek Singla <abhisheksingla...@gmail.com>
Date: Sunday, March 26, 2023 at 11:58 PM
To: user@flink.apache.org <user@flink.apache.org>
Subject: Flink CEP Resource Utilisation Optimisation
Hi Team,

Flink Version: 1.15.0
Java Version: 1.8
Standalone Cluster
Task Manager: AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 Gb, 8 
slots per TM)
CEP Scenario: Kafka Event A followed by Kafka Event B within 10 mins
Throughput: 20k events per second for Event A, 0 for Kafka Event B
State Backend: FsStateBackend
Unaligned Checkpoints: Enabled
asynchronousSnapshots: true

While testing this (Kafka Event A followed by Kafka Event B within 10 mins) 
scenario on load environment, it took 20 nodes of TM to achieve this throughput 
otherwise either CPU utilization would reach its peak or backpressure would be 
observed because output buffers are full. The checkpoint size is only 6.75 GB, 
the state stored within the CEP operator would be much lesser as we do 
unaligned checkpointing.

I am looking for some input on if it takes this many resources to archive this 
throughput, and if not what probably could be the issue here.

There was one more issue that I found If the throughput of Event A goes to 
zero, then also the checkpoint size stays around 2 GB even after hours. Is this 
expected?

Regards,
Abhishek Singla

Reply via email to