Hi Alexis,

Thanks a lot for sharing this. I think the program is correct.
Although it doesn't take timers into account; and to estimate the
state size more accurately, you could also use the same serializers
used by the job.
But maybe it makes more sense to compare the counts of objects in
different checkpoints and see which state is growing.

If the number of keys is small, compaction should eventually clean up
the old values, given that the windows eventually expire. I think it
makes sense to check that watermarks in all windows are making

Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the
results of the State Processor program.


On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa
<alexis.sarda-espin...@microfocus.com> wrote:
> Some additional information that I’ve gathered:
> The number of unique keys in the system is 10, and that is correctly 
> reflected in the state.
> TTL for global window state is set to update on read and write, but the code 
> has logic to remove old state based on event time.
> Not sure it’s relevant, but the Flink cluster does run with jemalloc enabled.
> GitHub gist with the whole processor setup since it’s not too long: 
> https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db678
> Relevant configuration entries (explicitly set, others are left with 
> defaults):
> state.backend: rocksdb
> state.backend.incremental: true
> execution.checkpointing.interval: 30 s
> execution.checkpointing.min-pause: 25 s
> execution.checkpointing.timeout: 5 min
> execution.savepoint-restore-mode: CLAIM
> execution.checkpointing.externalized-checkpoint-retention: 
> Over the weekend, state size has grown to 1.23GB with the operators 
> referenced in the processor program taking 849MB, so I’m still pretty 
> puzzled. I thought it could be due to expired state being retained, but I 
> think that doesn’t make sense if I have finite keys, right?
> Regards,
> Alexis.
> From: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> Sent: Samstag, 9. April 2022 01:39
> To: ro...@apache.org
> Cc: user@flink.apache.org
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
> Hi Roman,
> Here's an example of a WindowReaderFunction:
>     public class StateReaderFunction extends WindowReaderFunction<Pojo, 
> Integer, String, TimeWindow> {
>         private static final ListStateDescriptor<Integer> LSD = new 
> ListStateDescriptor<>(
>                 "descriptorId",
>                 Integer.class
>         );
>         @Override
>         public void readWindow(String s, Context<TimeWindow> context, 
> Iterable<Pojo> elements, Collector<Integer> out) throws Exception {
>             int count = 0;
>             for (Integer i : context.windowState().getListState(LSD).get()) {
>                 count++;
>             }
>             out.collect(count);
>         }
>     }
> That's for the operator that uses window state. The other readers do 
> something similar but with context.globalState(). That should provide the 
> number of state entries for each key+window combination, no? And after 
> collecting all results, I would get the number of state entries across all 
> keys+windows for an operator.
> And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
> context.windowState().getListState(...).clear().
> Side note: in the state processor program I call 
> ExecutionEnvironment#setParallelism(1) even though my streaming job runs with 
> parallelism=4, this doesn't affect the result, does it?
> Regards,
> Alexis.
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Friday, April 8, 2022 11:06 PM
> To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> Cc: user@flink.apache.org <user@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with state 
> processor API
> Hi Alexis,
> If I understand correctly, the provided StateProcessor program gives
> you the number of stream elements per operator. However, you mentioned
> that these operators have collection-type states (ListState and
> MapState). That means that per one entry there can be an arbitrary
> number of state elements.
> Have you tried estimating the state sizes directly via readKeyedState[1]?
> > The other operator does override and call clear()
> Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> [1]
> https://nightlies.apache.org/flink/flink-docs-release-1.14/api/java/org/apache/flink/state/api/ExistingSavepoint.html#readKeyedState-java.lang.String-org.apache.flink.state.api.functions.KeyedStateReaderFunction-
> [2]
> https://nightlies.apache.org/flink/flink-docs-release-1.4/api/java/org/apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.html#clear-org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction.Context-
> Regards,
> Roman
> On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa
> <alexis.sarda-espin...@microfocus.com> wrote:
> >
> > Hello,
> >
> >
> >
> > I have a streaming job running on Flink 1.14.4 that uses managed state with 
> > RocksDB with incremental checkpoints as backend. I’ve been monitoring a dev 
> > environment that has been running for the last week and I noticed that 
> > state size and end-to-end duration have been increasing steadily. 
> > Currently, duration is 11 seconds and size is 917MB (as shown in the UI). 
> > The tasks with the largest state (614MB) come from keyed sliding windows. 
> > Some attributes of this job’s setup:
> >
> >
> >
> > Windows are 11 minutes in size.
> > Slide time is 1 minute.
> > Throughput is approximately 20 events per minute.
> >
> >
> >
> > I have 3 operators with these states:
> >
> >
> >
> > Window state with ListState<Integer> and no TTL.
> > Global window state with MapState<Long, List<String>> and a TTL of 1 hour 
> > (with cleanupInRocksdbCompactFilter(1000L)).
> > Global window state with ListState<Pojo> where the Pojo has an int and a 
> > long, a TTL of 1 hour, and configured with 
> > cleanupInRocksdbCompactFilter(1000L) as well.
> >
> >
> >
> > Both operators with global window state have logic to manually remove old 
> > state in addition to configured TTL. The other operator does override and 
> > call clear().
> >
> >
> >
> > I have now analyzed the checkpoint folder with the state processor API, and 
> > I’ll note here that I see 50 folders named chk-*** even though I don’t set 
> > state.checkpoints.num-retained and the default should be 1. I loaded the 
> > data from the folder with the highest chk number and I see that my 
> > operators have these amounts respectively:
> >
> >
> >
> > 10 entries
> > 80 entries
> > 200 entries
> >
> >
> >
> > I got those numbers with something like this:
> >
> >
> >
> > savepoint
> >
> >         .window(SlidingEventTimeWindows.of(Time.minutes(11L), 
> > Time.minutes(1L)))
> >
> >         .process(...)
> >
> >         .collect()
> >
> >         .parallelStream()
> >
> >         .reduce(0, Integer::sum);
> >
> >
> >
> > Where my WindowReaderFunction classes just count the number of entries in 
> > each call to readWindow.
> >
> >
> >
> > Those amounts cannot possibly account for 614MB, so what am I missing?
> >
> >
> >
> > Regards,
> >
> > Alexis.
> >
> >

Reply via email to