Hello Till & Guowei,


Thanks for the replies! Here is a snippet of the window function:



  SingleOutputStreamOperator<DataLayer> aggregatedStream = dataStream

                .keyBy(idKeySelector())

                .window(TumblingProcessingTimeWindows.of(seconds(15)))

                .apply(new Aggregator())

                .name("Aggregator")

                .setParallelism(3);



Checkpoint interval: 2 secs when the checkpoint size grew from 100KB to 100MB 
(we’ve since changed the 5 minutes, which has slowed the checkpoint size growth)

Lateness allowed: 0

Watermarks: nothing is set in terms of watermarks – do they apply for Process 
Time?

The set of keys processed in the stream is stable over time

The checkpoint size actually looks pretty stable now that the interval was 
increased. Is it possible that the short checkpoint interval prevented 
compaction?

Thanks!

-Matt


From: Till Rohrmann <trohrm...@apache.org>
Date: Wednesday, May 27, 2020 at 9:00 AM
To: Guowei Ma <guowei....@gmail.com>
Cc: "Wissman, Matt" <matt.wiss...@here.com>, "user@flink.apache.org" 
<user@flink.apache.org>
Subject: Re: Tumbling windows - increasing checkpoint size over time

LEARN FAST: This email originated outside of HERE.
Please do not click on links or open attachments unless you recognize the 
sender and know the content is safe. Thank you.

Hi Matt,

could you give us a bit more information about the windows you are using? They 
are tumbling windows. What's the size of the windows? Do you allow lateness of 
events? What's your checkpoint interval?

Are you using event time? If yes, how is the watermark generated?

You said that the number of events per window is more or less constant. Does 
this is also apply to the size of the individual events?

Cheers,
Till

On Wed, May 27, 2020 at 1:21 AM Guowei Ma 
<guowei....@gmail.com<mailto:guowei....@gmail.com>> wrote:
Hi, Matt
The total size of the state of the window operator is related to the
number of windows. For example if you use keyby+tumblingwindow there
would be keys number of windows.
Hope this helps.
Best,
Guowei

Wissman, Matt <matt.wiss...@here.com<mailto:matt.wiss...@here.com>> 
于2020年5月27日周三 上午3:35写道:
>
> Hello Flink Community,
>
>
>
> I’m running a Flink pipeline that uses a tumbling window and incremental 
> checkpoint with RocksDB backed by s3. The number of objects in the window is 
> stable but overtime the checkpoint size grows seemingly unbounded. Within the 
> first few hours after bringing the Flink pipeline up, the checkpoint size is 
> around 100K but after a week of operation it grows to around 100MB. The 
> pipeline isn’t using any other Flink state besides the state that the window 
> uses. I think this has something to do with RocksDB’s compaction but 
> shouldn’t the tumbling window state expire and be purged from the checkpoint?
>
>
>
> Flink Version 1.7.1
>
>
>
> Thanks!
>
>
>
> -Matt

Reply via email to