Hi Lei,

In addition to the valuable suggested options above, maybe you can try to
optimize your partitioning function (since you know your data).
Maybe sample [subset of] your data if possible and/or check the key
distribution, before re-defining your partitioning function.

Regards,
Jeyhun

On Mon, Apr 1, 2024 at 4:00 AM Xuyang <xyzhong...@163.com> wrote:

> Hi, Wang.
>
> What about just increasing the parallemism to reduce the number of keys
> processed per parallelism? Is the distribution
>  of keys uneven? If so, you can use the datastream api to manually
> implement some optimization methods of flink sql.[1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/tuning/#split-distinct-aggregation
>
>
> --
>     Best!
>     Xuyang
>
>
> 在 2024-03-29 21:49:42,"Lei Wang" <leiwang...@gmail.com> 写道:
>
> Perhaps I can  keyBy(Hash(originalKey) % 100000)
> Then in the KeyProcessOperator using MapState instead of ValueState
>   MapState<OriginalKey, Boolean>  mapState
>
> There's about  100000 OriginalKey for each mapState
>
> Hope this will help
>
> On Fri, Mar 29, 2024 at 9:24 PM Péter Váry <peter.vary.apa...@gmail.com>
> wrote:
>
>> Hi Lei,
>>
>> Have you tried to make the key smaller, and store a list of found keys as
>> a value?
>>
>> Let's make the operator key a hash of your original key, and store a list
>> of the full keys in the state. You can play with your hash length to
>> achieve the optimal number of keys.
>>
>> I hope this helps,
>> Peter
>>
>> On Fri, Mar 29, 2024, 09:08 Lei Wang <leiwang...@gmail.com> wrote:
>>
>>>
>>> Use RocksDBBackend to store whether the element appeared within the last
>>> one day,  here is the code:
>>>
>>> *public class DedupFunction extends KeyedProcessFunction<Long, IN,OUT>
>>> {*
>>>
>>> *    private ValueState<Boolean> isExist;*
>>>
>>> *    public void open(Configuration parameters) throws Exception {*
>>> *        ValueStateDescriptor<boolean> desc = new ........*
>>> *        StateTtlConfig ttlConfig =
>>> StateTtlConfig.newBuilder(Time.hours(24)).setUpdateType......*
>>> *        desc.enableTimeToLive(ttlConfig);*
>>> *        isExist = getRuntimeContext().getState(desc);*
>>> *    }*
>>>
>>> *    public void processElement(IN in, .... ) {*
>>> *        if(null == isExist.value()) {*
>>> *            out.collect(in)*
>>> *            isExist.update(true)*
>>> *        } *
>>> *    }*
>>> *}*
>>>
>>> Because the number of distinct key is too large(about 10 billion one day
>>> ), there's performance bottleneck for this operator.
>>> How can I optimize the performance?
>>>
>>> Thanks,
>>> Lei
>>>
>>>
>>

Reply via email to