Thanks, yes there were a lot of keys in the test input. In fact, every event has a unique key which is not repeated in subsequent events.
On Sun, Mar 26, 2023 at 10:26 PM Geng Biao <biaoge...@gmail.com> wrote: > I see your point. Are there lots of different keys in your test input? If > that is the case, CEP operator in 1.15.0 will not clean some intermediate > states(partial matches will be cleaned due to timeout but some computation > states > are leaked). It is fixed in flink1.16(FLINK-31017) by Juntao Hu. > Best, > Biao > > 获取 Outlook for iOS <https://aka.ms/o0ukef> > ------------------------------ > *发件人:* Abhishek Singla <abhisheksingla...@gmail.com> > *发送时间:* Monday, March 27, 2023 12:38:59 AM > *收件人:* Geng Biao <biaoge...@gmail.com> > *抄送:* user@flink.apache.org <user@flink.apache.org> > *主题:* Re: Flink CEP Resource Utilisation Optimisation > > Thanks, Geng for the quick and actionable response. > > I will definitely try this with Flink version >= 1.16.0 and get back with > the observations. > > Regarding the checkpoint size issue, my concern is if there is no more > state, shouldn't the checkpoint size be way less than 2 GB? I mean I was > expecting it to be only a few MBs. Is there something I am missing here? > > Regards, > Abhishek Singla > > On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <biaoge...@gmail.com> wrote: > > Hi Abhishek, > > > > Thanks for sharing the experiment! As for the performance question, I > believe you could give a try on Flink CEP with version >= 1.16.0, which > includes the optimization introduced in FLINK-23890 > <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization > will reduce lots of timer registration which can increase the throughput > significantly. In our own experiment, given same papalism settings, the > same job in 1.16.0 will require much less CPU usage than that in 1.15.x. > (~100% -> ~30%). In fact, due to the implementation, the optimization > should make CEP 10x better. If you must use Flink1.15.0 for some reason, > you may cherry-pick the relevant change and recompile the CEP library by > yourself. The change does not depend on some framework changes so it may > not cost much efforts. > > As for the checkpoint size issue, CEP Operator will store immediate > matching result in the state. So if there are no new events, then there are > no new partial matched and CEP Operator will not use more state. > > > > Best, > Biao Geng > > > > *From: *Abhishek Singla <abhisheksingla...@gmail.com> > *Date: *Sunday, March 26, 2023 at 11:58 PM > *To: *user@flink.apache.org <user@flink.apache.org> > *Subject: *Flink CEP Resource Utilisation Optimisation > > Hi Team, > > *Flink Version:* 1.15.0 > *Java Version:* 1.8 > *Standalone Cluster* > *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42 > Gb, 8 slots per TM) > *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins > *Throughput:* 20k events per second for Event A, 0 for Kafka Event B > *State Backend:* FsStateBackend > *Unaligned Checkpoints:* Enabled > *asynchronousSnapshots:* true > > > > While testing this (Kafka Event A followed by Kafka Event B within 10 > mins) scenario on load environment, it took 20 nodes of TM to achieve this > throughput otherwise either CPU utilization would reach its peak or > backpressure would be observed because output buffers are full. The > checkpoint size is only 6.75 GB, the state stored within the CEP operator > would be much lesser as we do unaligned checkpointing. > > > I am looking for some input on if it takes this many resources to > archive this throughput, and if not what probably could be the issue here. > > > > There was one more issue that I found If the throughput of Event A goes to > zero, then also the checkpoint size stays around 2 GB even after hours. Is > this expected? > > Regards, > Abhishek Singla > >