Hi, Wang.

What about just increasing the parallemism to reduce the number of keys 
processed per parallelism? Is the distribution
 of keys uneven? If so, you can use the datastream api to manually implement 
some optimization methods of flink sql.[1]




在 2024-03-29 21:49:42,"Lei Wang" <leiwang...@gmail.com> 写道:

Perhaps I can  keyBy(Hash(originalKey) % 100000)
Then in the KeyProcessOperator using MapState instead of ValueState
  MapState<OriginalKey, Boolean>  mapState

There's about  100000 OriginalKey for each mapState 

Hope this will help

On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com> wrote:

Hi Lei,

Have you tried to make the key smaller, and store a list of found keys as a 

Let's make the operator key a hash of your original key, and store a list of 
the full keys in the state. You can play with your hash length to achieve the 
optimal number of keys.

I hope this helps,

On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote:

Use RocksDBBackend to store whether the element appeared within the last one 
day,  here is the code:

public class DedupFunction extends KeyedProcessFunction<Long, IN,OUT>  {

    private ValueState<Boolean> isExist;

    public void open(Configuration parameters) throws Exception {
        ValueStateDescriptor<boolean> desc = new ........
        StateTtlConfig ttlConfig = 
        isExist = getRuntimeContext().getState(desc);

    public void processElement(IN in, .... ) {
        if(null == isExist.value()) {

Because the number of distinct key is too large(about 10 billion one day ), 
there's performance bottleneck for this operator.
How can I optimize the performance?


Reply via email to