Another option is to key the stream and use a simple function like this to de-dupe each stream element:
public class DeduplicateElement extends KeyedProcessFunction<String, Event, Event> { private transient ValueState<Boolean> seen; @Override public void processElement(Event element, KeyedProcessFunction<String, Event, Event>.Context ctx, Collector<Event> out) throws Exception { if (seen.value() == null) { seen.update(true); out.collect(element); } } @Override public void open(OpenContext openContext) throws Exception { var ttlConfig = StateTtlConfig.newBuilder(Duration.ofHours(1)) .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) .build(); ValueStateDescriptor<Boolean> descriptor = new ValueStateDescriptor<>("seen", Boolean.class); descriptor.enableTimeToLive(ttlConfig); seen = getRuntimeContext().getState(descriptor); super.open(openContext); } } On Wed, 4 Jun 2025 at 06:12, Owais Ansari <owaisansari1...@gmail.com> wrote: > +user@flink.apache.org <user@flink.apache.org> > > On Wed, 4 Jun, 2025, 9:41 am Owais Ansari, <owaisansari1...@gmail.com> > wrote: > >> It expires the individual key and not the entire state. For your use case >> Map state is a good option. >> >> On Wed, 4 Jun, 2025, 8:26 am Sachin Mittal, <sjmit...@gmail.com> wrote: >> >>> So my TTL config is like: >>> >>> StateTtlConfig.newBuilder(Duration.ofHours(1)) >>> .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) >>> >>> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) >>> .build(); >>> >>> >>> Issue is that if every time I use ListState.update it would be an >>> expensive process to update the entire list of hashes everytime. >>> >>> I can use Map State but how can I control the size of this map state to >>> not grow beyond a certain point ? >>> >>> Use case is that for a given key we get data every few seconds. Each >>> data has a unique hash associated with it. >>> >>> We want to check if a certain hash is in the state, it should not >>> process that data again. >>> At the same time old hashes expire beyond a certain ttl, so that the >>> hashes state don't grow beyond a size. >>> >>> I believe even with map state I have to expire the entire map and there >>> is no way to set ttl for individual keys in the map. >>> >>> Please let me know if there is a better way to do this. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> >>> >>> On Fri, May 30, 2025 at 9:32 AM Zakelly Lan <zakelly....@gmail.com> >>> wrote: >>> >>>> Hi Sachin, >>>> >>>> I assume you are using the rocksdb state backend. The TTL for ListState >>>> is applied for each list entry, if you are using `ListState.add`. However >>>> if you do ListState.update, the entire list is rewrite so the ttl is >>>> updated. Could you share your use case and the ttl config? >>>> Another suggestion is to use the Map State, thus you could >>>> manipulate each entry freely without rewriting the entire list. >>>> >>>> >>>> Best, >>>> Zakelly >>>> >>>> On Fri, May 30, 2025 at 12:22 AM Sachin Mittal <sjmit...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I think ttl would be applied for the entire list, >>>>> I would like the ListState to restrict the entries by size and >>>>> automatically purge older added entries as new ones get added. >>>>> Something similar to a bounded list. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> On Thu, May 29, 2025 at 6:51 PM Sigalit Eliazov <e.siga...@gmail.com> >>>>> wrote: >>>>> >>>>>> hi, >>>>>> i think you can achieve this by using StateTtlConfig to define the >>>>>> ttl, and add ListStateDescriptor to the ListState definition. >>>>>> >>>>>> thanks, >>>>>> Sigalit >>>>>> >>>>>> >>>>>> >>>>>> On Thu, May 29, 2025 at 11:53 AM Sachin Mittal <sjmit...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> I am adding some hashes of my elements in my list date to check for >>>>>>> dedups. >>>>>>> >>>>>>> Now to not have an infinte growing list, I would like to limit the >>>>>>> size of hashes in that list to say a number or just add some TTL config >>>>>>> which would expire the entries in the list beyond certain time. >>>>>>> >>>>>>> Is this something possible using Flink constructs. >>>>>>> >>>>>>> If not, is there any way I can achieve this ? >>>>>>> >>>>>>> Thanks >>>>>>> Sachin >>>>>>> >>>>>>>