I see your point. Are there lots of different keys in your test input? If that 
is the case, CEP operator in 1.15.0 will not clean some intermediate 
states(partial matches will be cleaned due to timeout but some computation 
states are leaked). It is fixed in flink1.16(FLINK-31017) by Juntao Hu.
Best,
Biao

获取 Outlook for iOS<https://aka.ms/o0ukef>
________________________________
发件人: Abhishek Singla <abhisheksingla...@gmail.com>
发送时间: Monday, March 27, 2023 12:38:59 AM
收件人: Geng Biao <biaoge...@gmail.com>
抄送: user@flink.apache.org <user@flink.apache.org>
主题: Re: Flink CEP Resource Utilisation Optimisation

Thanks, Geng for the quick and actionable response.

I will definitely try this with Flink version >= 1.16.0 and get back with the 
observations.

Regarding the checkpoint size issue, my concern is if there is no more state, 
shouldn't the checkpoint size be way less than 2 GB? I mean I was expecting it 
to be only a few MBs. Is there something I am missing here?

Regards,
Abhishek Singla

On Sun, Mar 26, 2023 at 9:56 PM Geng Biao 
<biaoge...@gmail.com<mailto:biaoge...@gmail.com>> wrote:

Hi Abhishek,



Thanks for sharing the experiment! As for the performance question, I believe 
you could give a try on Flink CEP with version >= 1.16.0, which includes the 
optimization introduced in 
FLINK-23890<https://issues.apache.org/jira/browse/FLINK-23890>. This 
optimization will reduce lots of timer registration which can increase the 
throughput significantly. In our own experiment, given same papalism settings, 
the same job in 1.16.0 will require much less CPU usage than that in 1.15.x. 
(~100% -> ~30%). In fact, due to the implementation, the optimization should 
make CEP 10x better.  If you must use Flink1.15.0 for some reason, you may 
cherry-pick the relevant change and recompile the CEP library by yourself. The 
change does not depend on some framework changes so it may not cost much 
efforts.

As for the checkpoint size issue, CEP Operator will store immediate matching 
result in the state. So if there are no new events, then there are no new 
partial matched and CEP Operator will not use more state.



Best,
Biao Geng



From: Abhishek Singla 
<abhisheksingla...@gmail.com<mailto:abhisheksingla...@gmail.com>>
Date: Sunday, March 26, 2023 at 11:58 PM
To: user@flink.apache.org<mailto:user@flink.apache.org> 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Flink CEP Resource Utilisation Optimisation

Hi Team,

Flink Version: 1.15.0
Java Version: 1.8
Standalone Cluster
Task Manager: AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 Gb, 8 
slots per TM)
CEP Scenario: Kafka Event A followed by Kafka Event B within 10 mins
Throughput: 20k events per second for Event A, 0 for Kafka Event B
State Backend: FsStateBackend
Unaligned Checkpoints: Enabled
asynchronousSnapshots: true



While testing this (Kafka Event A followed by Kafka Event B within 10 mins) 
scenario on load environment, it took 20 nodes of TM to achieve this throughput 
otherwise either CPU utilization would reach its peak or backpressure would be 
observed because output buffers are full. The checkpoint size is only 6.75 GB, 
the state stored within the CEP operator would be much lesser as we do 
unaligned checkpointing.

I am looking for some input on if it takes this many resources to archive this 
throughput, and if not what probably could be the issue here.



There was one more issue that I found If the throughput of Event A goes to 
zero, then also the checkpoint size stays around 2 GB even after hours. Is this 
expected?

Regards,
Abhishek Singla

Reply via email to