Thanks, yes there were a lot of keys in the test input. In fact, every
event has a unique key which is not repeated in subsequent events.

On Sun, Mar 26, 2023 at 10:26 PM Geng Biao <biaoge...@gmail.com> wrote:

> I see your point. Are there lots of different keys in your test input? If
> that is the case, CEP operator in 1.15.0 will not clean some intermediate
> states(partial matches will be cleaned due to timeout but some computation 
> states
> are leaked). It is fixed in flink1.16(FLINK-31017) by Juntao Hu.
> Best,
> Biao
>
> 获取 Outlook for iOS <https://aka.ms/o0ukef>
> ------------------------------
> *发件人:* Abhishek Singla <abhisheksingla...@gmail.com>
> *发送时间:* Monday, March 27, 2023 12:38:59 AM
> *收件人:* Geng Biao <biaoge...@gmail.com>
> *抄送:* user@flink.apache.org <user@flink.apache.org>
> *主题:* Re: Flink CEP Resource Utilisation Optimisation
>
> Thanks, Geng for the quick and actionable response.
>
> I will definitely try this with Flink version >= 1.16.0 and get back with
> the observations.
>
> Regarding the checkpoint size issue, my concern is if there is no more
> state, shouldn't the checkpoint size be way less than 2 GB? I mean I was
> expecting it to be only a few MBs. Is there something I am missing here?
>
> Regards,
> Abhishek Singla
>
> On Sun, Mar 26, 2023 at 9:56 PM Geng Biao <biaoge...@gmail.com> wrote:
>
> Hi Abhishek,
>
>
>
> Thanks for sharing the experiment! As for the performance question, I
> believe you could give a try on Flink CEP with version >= 1.16.0, which
> includes the optimization introduced in FLINK-23890
> <https://issues.apache.org/jira/browse/FLINK-23890>. This optimization
> will reduce lots of timer registration which can increase the throughput
> significantly. In our own experiment, given same papalism settings, the
> same job in 1.16.0 will require much less CPU usage than that in 1.15.x.
> (~100% -> ~30%). In fact, due to the implementation, the optimization
> should make CEP 10x better.  If you must use Flink1.15.0 for some reason,
> you may cherry-pick the relevant change and recompile the CEP library by
> yourself. The change does not depend on some framework changes so it may
> not cost much efforts.
>
> As for the checkpoint size issue, CEP Operator will store immediate
> matching result in the state. So if there are no new events, then there are
> no new partial matched and CEP Operator will not use more state.
>
>
>
> Best,
> Biao Geng
>
>
>
> *From: *Abhishek Singla <abhisheksingla...@gmail.com>
> *Date: *Sunday, March 26, 2023 at 11:58 PM
> *To: *user@flink.apache.org <user@flink.apache.org>
> *Subject: *Flink CEP Resource Utilisation Optimisation
>
> Hi Team,
>
> *Flink Version:* 1.15.0
> *Java Version:* 1.8
> *Standalone Cluster*
> *Task Manager:* AWS EC2 of Instance Type c5n.4xlarge (vCPU 16, Memory 42
> Gb, 8 slots per TM)
> *CEP Scenario:* Kafka Event A followed by Kafka Event B within 10 mins
> *Throughput:* 20k events per second for Event A, 0 for Kafka Event B
> *State Backend:* FsStateBackend
> *Unaligned Checkpoints:* Enabled
> *asynchronousSnapshots:* true
>
>
>
> While testing this (Kafka Event A followed by Kafka Event B within 10
> mins) scenario on load environment, it took 20 nodes of TM to achieve this
> throughput otherwise either CPU utilization would reach its peak or
> backpressure would be observed because output buffers are full. The
> checkpoint size is only 6.75 GB, the state stored within the CEP operator
> would be much lesser as we do unaligned checkpointing.
>
>
> I am looking for some input on if it takes this many resources to
> archive this throughput, and if not what probably could be the issue here.
>
>
>
> There was one more issue that I found If the throughput of Event A goes to
> zero, then also the checkpoint size stays around 2 GB even after hours. Is
> this expected?
>
> Regards,
> Abhishek Singla
>
>

Reply via email to