Hello,

I enabled some of the RocksDB metrics and I noticed some additional things. 
After changing the configuration YAML, I restarted the cluster with a 
savepoint, and I can see that it only used 5.6MB on disk. Consequently, after 
the job switched to running state, the new checkpoints were also a few MB in 
size. After running for 1 day, checkpoint size is now around 100MB. From the 
metrics I can see with the Prometheus reporter:

- All entries for num-live-versions show 1
- All entries for compaction-pending show 0
- Most entries for estimate-num-keys are in the range of 0 to 100, although I 
see a few with 151 coming from 
flink_taskmanager_job_task_operator__timer_state_event_window_timers_rocksdb_estimate_num_keys

Is compaction expected after only 100MB? I imagine not, but if the savepoint 
shows that the effective amount of data is so low, size growth still seems far 
too large. In fact, if I only look at the UI, Bytes Received for the relevant 
SubTasks is about 14MB, yet the latest checkpoint already shows a Data Size of 
75MB for said SubTasks.

Regards,
Alexis.

-----Original Message-----
From: Roman Khachatryan <ro...@apache.org> 
Sent: Mittwoch, 20. April 2022 10:37
To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
Cc: user@flink.apache.org
Subject: Re: RocksDB's state size discrepancy with what's seen with state 
processor API

State Processor API works on a higher level and is not aware of any RocksDB 
specifics (in fact, it can be used with any backend).

Regards,
Roman

On Tue, Apr 19, 2022 at 10:52 PM Alexis Sarda-Espinosa

<alexis.sarda-espin...@microfocus.com> wrote:
>
> I can look into RocksDB metrics, I need to configure Prometheus at some point 
> anyway. However, going back to the original question, is there no way to gain 
> more insight into this with the state processor API? You've mentioned 
> potential issues (too many states, missing compaction) but, with my 
> admittedly limited understanding of the way RocksDB is used in Flink, I would 
> have thought that such things would be visible when using the state 
> processor. Is there no way for me to "parse" those MANIFEST files with some 
> of Flink's classes and get some more hints?
>
> Regards,
> Alexis.
>
> ________________________________
> From: Roman Khachatryan <ro...@apache.org>
> Sent: Tuesday, April 19, 2022 5:51 PM
> To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> Cc: user@flink.apache.org <user@flink.apache.org>
> Subject: Re: RocksDB's state size discrepancy with what's seen with 
> state processor API
>
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> Yes, that's what I meant, and that's regarded as the same column family.
>
> Another possible reason is that SST files aren't being compacted and 
> that increases the MANIFEST file size.
> I'd check the total number of loaded SST files and the creation date 
> of the oldest one.
>
> You can also see whether there are any compactions running via RocksDB 
> metrics [1] [2] (a reporter needs to be configured [3]).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-num-running-compactions
> [2]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/c
> onfig/#state-backend-rocksdb-metrics-compaction-pending
> [3]
> https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/m
> etric_reporters/#reporters
>
> Regards,
> Roman
>
> On Tue, Apr 19, 2022 at 1:38 PM Alexis Sarda-Espinosa 
> <alexis.sarda-espin...@microfocus.com> wrote:
> >
> > Hi Roman,
> >
> > I assume that when you say "new states", that is related to new descriptors 
> > with different names? Because, in the case of windowing for example, each 
> > window "instance" has its own scoped (non-global and keyed) state, but 
> > that's not regarded as a separate column family, is it?
> >
> > For the 3 descriptors I mentioned before, they are only instantiated once 
> > and used like this:
> >
> > - Window list state: each call to process() executes 
> > context.windowState().getListState(...).get()
> > - Global map state: each call to process() executes 
> > context.globalState().getMapState(...)
> > - Global list state: within open(), runtimeContext.getListState(...) is 
> > executed once and used throughout the life of the operator.
> >
> > According to [1], the two ways of using global state should be equivalent.
> >
> > I will say that some of the operators instantiate the state descriptor in 
> > their constructors, i.e. before they are serialized to the TM, but the 
> > descriptors are Serializable, so I imagine that's not relevant.
> >
> > [1] https://stackoverflow.com/a/50510054/5793905
> >
> > Regards,
> > Alexis.
> >
> > -----Original Message-----
> > From: Roman Khachatryan <ro...@apache.org>
> > Sent: Dienstag, 19. April 2022 11:48
> > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> > Cc: user@flink.apache.org
> > Subject: Re: RocksDB's state size discrepancy with what's seen with 
> > state processor API
> >
> > Hi Alexis,
> >
> > Thanks a lot for the information,
> >
> > MANIFEST files list RocksDB column families (among other info); ever 
> > growing size of these files might indicate that some new states are 
> > constantly being created.
> > Could you please confirm that the number of state names is constant?
> >
> > > Could you confirm if Flink's own operators could be creating state in 
> > > RocksDB? I assume the window operators save some information in the state 
> > > as well.
> > That's correct, window operators maintain a list of elements per window and 
> > a set of timers (timestamps). These states' names should be fixed 
> > (something like "window-contents" and "window-timers").
> >
> > > is that related to managed state used by my functions? Or does that 
> > > indicate size growth is elsewhere?
> > The same mechanism is used for both Flink internal state and operator 
> > state, so it's hard to say without at least knowing the state names.
> >
> >
> > Regards,
> > Roman
> >
> >
> > On Tue, Apr 12, 2022 at 2:06 PM Roman Khachatryan <ro...@apache.org> wrote:
> > >
> > > /shared folder contains keyed state that is shared among different 
> > > checkpoints [1]. Most of state should be shared in your case since 
> > > you're using keyed state and incremental checkpoints.
> > >
> > > When a checkpoint is loaded, the state that it shares with older 
> > > checkpoints is loaded as well. I suggested to load different 
> > > checkpoints (i.e. chk-* folders) and compare the numbers of 
> > > objects in their states. To prevent the job from discarding the 
> > > state, it can either be stopped for some time and then restarted 
> > > from the latest checkpoint; or the number of retained checkpoints 
> > > can be increased [2]. Copying isn't necessary.
> > >
> > > Besides that, you can also check state sizes of operator in Flink 
> > > Web UI (but not the sizes of individual states). If the operators 
> > > are chained then their combined state size will be shown. To 
> > > prevent this, you can disable chaining [3] (although this will 
> > > have performance impact).
> > >
> > > Individual checkpoint folders should be eventually removed (when 
> > > the checkpoint is subsumed). However, this is not guaranteed: if 
> > > there is any problem during deletion, it will be logged, but the 
> > > job will not fail.
> > >
> > > [1]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/stat
> > > e/ch
> > > eckpoints/#directory-structure
> > > [2]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/deployme
> > > nt/c onfig/#state-checkpoints-num-retained
> > > [3]
> > > https://nightlies.apache.org/flink/flink-docs-master/docs/dev/data
> > > stre am/operators/overview/#disable-chaining
> > >
> > > Regards,
> > > Roman
> > >
> > > On Tue, Apr 12, 2022 at 12:58 PM Alexis Sarda-Espinosa 
> > > <alexis.sarda-espin...@microfocus.com> wrote:
> > > >
> > > > Hi Roman,
> > > >
> > > > Maybe I'm misunderstanding the structure of the data within the 
> > > > checkpoint. You suggest comparing counts of objects in different 
> > > > checkpoints, I assume you mean copying my "checkpoints" folder at 
> > > > different times and comparing, not comparing different "chk-*" folders 
> > > > in the same snapshot, right?
> > > >
> > > > I haven't executed the processor program with a newer checkpoint, but I 
> > > > did look at the folder in the running system, and I noticed that most 
> > > > of the chk-* folders have remained unchanged, there's only 1 or 2 new 
> > > > folders corresponding to newer checkpoints. I would think this makes 
> > > > sense since the configuration specifies that only 1 completed 
> > > > checkpoint should be retained, but then why are the older chk-* folders 
> > > > still there? I did trigger a manual restart of the Flink cluster in the 
> > > > past (before starting the long-running test), but if my policy is to 
> > > > CLAIM the checkpoint, Flink's documentation states that it would be 
> > > > cleaned eventually.
> > > >
> > > > Moreover, just by looking at folder sizes with "du", I can see that 
> > > > most of the state is held in the "shared" folder, and that has grown 
> > > > for sure; I'm not sure what "shared" usually holds, but if that's 
> > > > what's growing, maybe I can rule out expired state staying around?. My 
> > > > pipeline doesn't use timers, although I guess Flink itself may use 
> > > > them. Is there any way I could get some insight into which operator 
> > > > holds larger states?
> > > >
> > > > Regards,
> > > > Alexis.
> > > >
> > > > -----Original Message-----
> > > > From: Roman Khachatryan <ro...@apache.org>
> > > > Sent: Dienstag, 12. April 2022 12:37
> > > > To: Alexis Sarda-Espinosa <alexis.sarda-espin...@microfocus.com>
> > > > Cc: user@flink.apache.org
> > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > with state processor API
> > > >
> > > > Hi Alexis,
> > > >
> > > > Thanks a lot for sharing this. I think the program is correct.
> > > > Although it doesn't take timers into account; and to estimate the state 
> > > > size more accurately, you could also use the same serializers used by 
> > > > the job.
> > > > But maybe it makes more sense to compare the counts of objects in 
> > > > different checkpoints and see which state is growing.
> > > >
> > > > If the number of keys is small, compaction should eventually clean up 
> > > > the old values, given that the windows eventually expire. I think it 
> > > > makes sense to check that watermarks in all windows are making progress.
> > > >
> > > > Setting ExecutionEnvironment#setParallelism(1) shouldn't affect the 
> > > > results of the State Processor program.
> > > >
> > > > Regards,
> > > > Roman
> > > >
> > > > On Mon, Apr 11, 2022 at 12:28 PM Alexis Sarda-Espinosa 
> > > > <alexis.sarda-espin...@microfocus.com> wrote:
> > > > >
> > > > > Some additional information that I’ve gathered:
> > > > >
> > > > >
> > > > >
> > > > > The number of unique keys in the system is 10, and that is correctly 
> > > > > reflected in the state.
> > > > > TTL for global window state is set to update on read and write, but 
> > > > > the code has logic to remove old state based on event time.
> > > > > Not sure it’s relevant, but the Flink cluster does run with jemalloc 
> > > > > enabled.
> > > > > GitHub gist with the whole processor setup since it’s not too long:
> > > > > https://gist.github.com/asardaes/eaf21f18860ec39b325a40acef2db
> > > > > 678
> > > > >
> > > > >
> > > > >
> > > > > Relevant configuration entries (explicitly set, others are left with 
> > > > > defaults):
> > > > >
> > > > >
> > > > >
> > > > > state.backend: rocksdb
> > > > >
> > > > > state.backend.incremental: true
> > > > >
> > > > > execution.checkpointing.interval: 30 s
> > > > >
> > > > > execution.checkpointing.min-pause: 25 s
> > > > >
> > > > > execution.checkpointing.timeout: 5 min
> > > > >
> > > > > execution.savepoint-restore-mode: CLAIM
> > > > >
> > > > > execution.checkpointing.externalized-checkpoint-retention:
> > > > > RETAIN_ON_CANCELLATION
> > > > >
> > > > >
> > > > >
> > > > > Over the weekend, state size has grown to 1.23GB with the operators 
> > > > > referenced in the processor program taking 849MB, so I’m still pretty 
> > > > > puzzled. I thought it could be due to expired state being retained, 
> > > > > but I think that doesn’t make sense if I have finite keys, right?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > From: Alexis Sarda-Espinosa 
> > > > > <alexis.sarda-espin...@microfocus.com>
> > > > > Sent: Samstag, 9. April 2022 01:39
> > > > > To: ro...@apache.org
> > > > > Cc: user@flink.apache.org
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Roman,
> > > > >
> > > > >
> > > > >
> > > > > Here's an example of a WindowReaderFunction:
> > > > >
> > > > >
> > > > >
> > > > >     public class StateReaderFunction extends 
> > > > > WindowReaderFunction<Pojo, Integer, String, TimeWindow> {
> > > > >
> > > > >         private static final ListStateDescriptor<Integer> LSD 
> > > > > = new ListStateDescriptor<>(
> > > > >
> > > > >                 "descriptorId",
> > > > >
> > > > >                 Integer.class
> > > > >
> > > > >         );
> > > > >
> > > > >
> > > > >
> > > > >         @Override
> > > > >
> > > > >         public void readWindow(String s, Context<TimeWindow> 
> > > > > context, Iterable<Pojo> elements, Collector<Integer> out) 
> > > > > throws Exception {
> > > > >
> > > > >             int count = 0;
> > > > >
> > > > >             for (Integer i :
> > > > > context.windowState().getListState(LSD).get()) {
> > > > >
> > > > >                 count++;
> > > > >
> > > > >             }
> > > > >
> > > > >             out.collect(count);
> > > > >
> > > > >         }
> > > > >
> > > > >     }
> > > > >
> > > > >
> > > > >
> > > > > That's for the operator that uses window state. The other readers do 
> > > > > something similar but with context.globalState(). That should provide 
> > > > > the number of state entries for each key+window combination, no? And 
> > > > > after collecting all results, I would get the number of state entries 
> > > > > across all keys+windows for an operator.
> > > > >
> > > > >
> > > > >
> > > > > And yes, I do mean ProcessWindowFunction.clear(). Therein I call 
> > > > > context.windowState().getListState(...).clear().
> > > > >
> > > > >
> > > > >
> > > > > Side note: in the state processor program I call 
> > > > > ExecutionEnvironment#setParallelism(1) even though my streaming job 
> > > > > runs with parallelism=4, this doesn't affect the result, does it?
> > > > >
> > > > >
> > > > >
> > > > > Regards,
> > > > >
> > > > > Alexis.
> > > > >
> > > > >
> > > > >
> > > > > ________________________________
> > > > >
> > > > > From: Roman Khachatryan <ro...@apache.org>
> > > > > Sent: Friday, April 8, 2022 11:06 PM
> > > > > To: Alexis Sarda-Espinosa 
> > > > > <alexis.sarda-espin...@microfocus.com>
> > > > > Cc: user@flink.apache.org <user@flink.apache.org>
> > > > > Subject: Re: RocksDB's state size discrepancy with what's seen 
> > > > > with state processor API
> > > > >
> > > > >
> > > > >
> > > > > Hi Alexis,
> > > > >
> > > > > If I understand correctly, the provided StateProcessor program 
> > > > > gives you the number of stream elements per operator. However, 
> > > > > you mentioned that these operators have collection-type states 
> > > > > (ListState and MapState). That means that per one entry there 
> > > > > can be an arbitrary number of state elements.
> > > > >
> > > > > Have you tried estimating the state sizes directly via 
> > > > > readKeyedState[1]?
> > > > >
> > > > > > The other operator does override and call clear()
> > > > > Just to make sure, you mean ProcessWindowFunction.clear() [2], right?
> > > > >
> > > > > [1]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.14/api
> > > > > /jav
> > > > > a/or
> > > > > g/apache/flink/state/api/ExistingSavepoint.html#readKeyedState
> > > > > -jav
> > > > > a.la
> > > > > ng.String-org.apache.flink.state.api.functions.KeyedStateReade
> > > > > rFun
> > > > > ctio
> > > > > n-
> > > > >
> > > > > [2]
> > > > > https://nightlies.apache.org/flink/flink-docs-release-1.4/api/
> > > > > java
> > > > > /org
> > > > > /apache/flink/streaming/api/functions/windowing/ProcessWindowFunction.
> > > > > html#clear-org.apache.flink.streaming.api.functions.windowing.
> > > > > Proc
> > > > > essW
> > > > > indowFunction.Context-
> > > > >
> > > > > Regards,
> > > > > Roman
> > > > >
> > > > >
> > > > > On Fri, Apr 8, 2022 at 4:19 PM Alexis Sarda-Espinosa 
> > > > > <alexis.sarda-espin...@microfocus.com> wrote:
> > > > > >
> > > > > > Hello,
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have a streaming job running on Flink 1.14.4 that uses managed 
> > > > > > state with RocksDB with incremental checkpoints as backend. I’ve 
> > > > > > been monitoring a dev environment that has been running for the 
> > > > > > last week and I noticed that state size and end-to-end duration 
> > > > > > have been increasing steadily. Currently, duration is 11 seconds 
> > > > > > and size is 917MB (as shown in the UI). The tasks with the largest 
> > > > > > state (614MB) come from keyed sliding windows. Some attributes of 
> > > > > > this job’s setup:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Windows are 11 minutes in size.
> > > > > > Slide time is 1 minute.
> > > > > > Throughput is approximately 20 events per minute.
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have 3 operators with these states:
> > > > > >
> > > > > >
> > > > > >
> > > > > > Window state with ListState<Integer> and no TTL.
> > > > > > Global window state with MapState<Long, List<String>> and a TTL of 
> > > > > > 1 hour (with cleanupInRocksdbCompactFilter(1000L)).
> > > > > > Global window state with ListState<Pojo> where the Pojo has an int 
> > > > > > and a long, a TTL of 1 hour, and configured with 
> > > > > > cleanupInRocksdbCompactFilter(1000L) as well.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Both operators with global window state have logic to manually 
> > > > > > remove old state in addition to configured TTL. The other operator 
> > > > > > does override and call clear().
> > > > > >
> > > > > >
> > > > > >
> > > > > > I have now analyzed the checkpoint folder with the state processor 
> > > > > > API, and I’ll note here that I see 50 folders named chk-*** even 
> > > > > > though I don’t set state.checkpoints.num-retained and the default 
> > > > > > should be 1. I loaded the data from the folder with the highest chk 
> > > > > > number and I see that my operators have these amounts respectively:
> > > > > >
> > > > > >
> > > > > >
> > > > > > 10 entries
> > > > > > 80 entries
> > > > > > 200 entries
> > > > > >
> > > > > >
> > > > > >
> > > > > > I got those numbers with something like this:
> > > > > >
> > > > > >
> > > > > >
> > > > > > savepoint
> > > > > >
> > > > > >         
> > > > > > .window(SlidingEventTimeWindows.of(Time.minutes(11L),
> > > > > > Time.minutes(1L)))
> > > > > >
> > > > > >         .process(...)
> > > > > >
> > > > > >         .collect()
> > > > > >
> > > > > >         .parallelStream()
> > > > > >
> > > > > >         .reduce(0, Integer::sum);
> > > > > >
> > > > > >
> > > > > >
> > > > > > Where my WindowReaderFunction classes just count the number of 
> > > > > > entries in each call to readWindow.
> > > > > >
> > > > > >
> > > > > >
> > > > > > Those amounts cannot possibly account for 614MB, so what am I 
> > > > > > missing?
> > > > > >
> > > > > >
> > > > > >
> > > > > > Regards,
> > > > > >
> > > > > > Alexis.
> > > > > >
> > > > > >

Reply via email to