Hi, Wang.
What about just increasing the parallemism to reduce the number of keys processed per parallelism? Is the distribution of keys uneven? If so, you can use the datastream api to manually implement some optimization methods of flink sql.[1] [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation -- Best! Xuyang 在 2024-03-29 21:49:42,"Lei Wang" <leiwang...@gmail.com> 写道: Perhaps I can keyBy(Hash(originalKey) % 100000) Then in the KeyProcessOperator using MapState instead of ValueState MapState<OriginalKey, Boolean> mapState There's about 100000 OriginalKey for each mapState Hope this will help On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com> wrote: Hi Lei, Have you tried to make the key smaller, and store a list of found keys as a value? Let's make the operator key a hash of your original key, and store a list of the full keys in the state. You can play with your hash length to achieve the optimal number of keys. I hope this helps, Peter On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote: Use RocksDBBackend to store whether the element appeared within the last one day, here is the code: public class DedupFunction extends KeyedProcessFunction<Long, IN,OUT> { private ValueState<Boolean> isExist; public void open(Configuration parameters) throws Exception { ValueStateDescriptor<boolean> desc = new ........ StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType...... desc.enableTimeToLive(ttlConfig); isExist = getRuntimeContext().getState(desc); } public void processElement(IN in, .... ) { if(null == isExist.value()) { out.collect(in) isExist.update(true) } } } Because the number of distinct key is too large(about 10 billion one day ), there's performance bottleneck for this operator. How can I optimize the performance? Thanks, Lei