Another option is to key the stream and use a simple function like this to
de-dupe each stream element:

public class DeduplicateElement extends KeyedProcessFunction<String,
Event, Event> {

  private transient ValueState<Boolean> seen;

  @Override
  public void processElement(Event element,
KeyedProcessFunction<String, Event, Event>.Context ctx,
Collector<Event> out)
      throws Exception {
    if (seen.value() == null) {
      seen.update(true);
      out.collect(element);
    }
  }

  @Override
  public void open(OpenContext openContext) throws Exception {
    var ttlConfig = StateTtlConfig.newBuilder(Duration.ofHours(1))
        .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
        
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
        .build();
    ValueStateDescriptor<Boolean> descriptor =
        new ValueStateDescriptor<>("seen", Boolean.class);
    descriptor.enableTimeToLive(ttlConfig);
    seen = getRuntimeContext().getState(descriptor);

    super.open(openContext);
  }

}


On Wed, 4 Jun 2025 at 06:12, Owais Ansari <owaisansari1...@gmail.com> wrote:

> +user@flink.apache.org <user@flink.apache.org>
>
> On Wed, 4 Jun, 2025, 9:41 am Owais Ansari, <owaisansari1...@gmail.com>
> wrote:
>
>> It expires the individual key and not the entire state. For your use case
>> Map state is a good option.
>>
>> On Wed, 4 Jun, 2025, 8:26 am Sachin Mittal, <sjmit...@gmail.com> wrote:
>>
>>> So my TTL config is like:
>>>
>>> StateTtlConfig.newBuilder(Duration.ofHours(1))
>>>     .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
>>>     
>>> .setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
>>>     .build();
>>>
>>>
>>> Issue is that if every time I use ListState.update it would be an
>>> expensive process to update the entire list of hashes everytime.
>>>
>>> I can use Map State but how can I control the size of this map state to
>>> not grow beyond a certain point ?
>>>
>>> Use case is that for a given key we get data every few seconds. Each
>>> data has a unique hash associated with it.
>>>
>>> We want to check if a certain hash is in the state, it should not
>>> process that data again.
>>> At the same time old hashes expire beyond a certain ttl, so that the
>>> hashes state don't grow beyond a size.
>>>
>>> I believe even with map state I have to expire the entire map and there
>>> is no way to set ttl for individual keys in the map.
>>>
>>> Please let me know if there is a better way to do this.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>>
>>>
>>> On Fri, May 30, 2025 at 9:32 AM Zakelly Lan <zakelly....@gmail.com>
>>> wrote:
>>>
>>>> Hi Sachin,
>>>>
>>>> I assume you are using the rocksdb state backend. The TTL for ListState
>>>> is applied for each list entry, if you are using `ListState.add`. However
>>>> if you do ListState.update, the entire list is rewrite so the ttl is
>>>> updated. Could you share your use case and the ttl config?
>>>> Another suggestion is to use the Map State, thus you could
>>>> manipulate each entry freely without rewriting the entire list.
>>>>
>>>>
>>>> Best,
>>>> Zakelly
>>>>
>>>> On Fri, May 30, 2025 at 12:22 AM Sachin Mittal <sjmit...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I think ttl would be applied for the entire list,
>>>>> I would like the ListState to restrict the entries by size and
>>>>> automatically purge older added entries as new ones get added.
>>>>> Something similar to a bounded list.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Thu, May 29, 2025 at 6:51 PM Sigalit Eliazov <e.siga...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> hi,
>>>>>> i think you can achieve this by using  StateTtlConfig to define the
>>>>>> ttl, and add ListStateDescriptor to the ListState definition.
>>>>>>
>>>>>> thanks,
>>>>>> Sigalit
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, May 29, 2025 at 11:53 AM Sachin Mittal <sjmit...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> I am adding some hashes of my elements in my list date to check for
>>>>>>> dedups.
>>>>>>>
>>>>>>> Now to not have an infinte growing list, I would like to limit the
>>>>>>> size of hashes in that list to say a number or just add some TTL config
>>>>>>> which would expire the entries in the list beyond certain time.
>>>>>>>
>>>>>>> Is this something possible using Flink constructs.
>>>>>>>
>>>>>>> If not, is there any way I can achieve this ?
>>>>>>>
>>>>>>> Thanks
>>>>>>> Sachin
>>>>>>>
>>>>>>>

Reply via email to