Re: Any way to improve list state get performance

2022-11-22 Thread Xingcan Cui
Hi Tao,

I think you just need an extra `isEmpty` VARIABLE and maintain it properly
(e.g., when restoring the job, check if the list state is empty or not).

Also, I remembered that the list state for rocksdb is not as performant as
the map state when the state is large. Sometimes you could use a map state
with some extra value states to simulate it.

Best,
Xingcan

On Mon, Nov 21, 2022 at 9:20 PM tao xiao  wrote:

> any suggestion is highly appreciated
>
> On Tue, Nov 15, 2022 at 8:50 PM tao xiao  wrote:
>
>> Hi team,
>>
>> I have a Flink job that joins two streams, let's say A and B streams,
>> followed by a key process function. In the key process function the job
>> inserts elements from B stream to a list state if element from A stream
>> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
>> check if there are elements in the list state when A stream arrives to
>> reduce the call to underlying state (RocksDB)
>>
>> Here is the code snippet
>>
>> keyfunction {
>>
>> process(in, ctx, collector) {
>> if (in is A stream)
>> // anyway to check if list state is empty so that we dont need to call
>> get()?
>> for (b : liststate.get()) {
>> .
>> }
>>
>> if (in is B stream)
>> liststate.add(in)
>>
>>
>> --
>> Regards,
>> Tao
>>
>
>
> --
> Regards,
> Tao
>


Re: Any way to improve list state get performance

2022-11-21 Thread tao xiao
any suggestion is highly appreciated

On Tue, Nov 15, 2022 at 8:50 PM tao xiao  wrote:

> Hi team,
>
> I have a Flink job that joins two streams, let's say A and B streams,
> followed by a key process function. In the key process function the job
> inserts elements from B stream to a list state if element from A stream
> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
> check if there are elements in the list state when A stream arrives to
> reduce the call to underlying state (RocksDB)
>
> Here is the code snippet
>
> keyfunction {
>
> process(in, ctx, collector) {
> if (in is A stream)
> // anyway to check if list state is empty so that we dont need to call
> get()?
> for (b : liststate.get()) {
> .
> }
>
> if (in is B stream)
> liststate.add(in)
>
>
> --
> Regards,
> Tao
>


-- 
Regards,
Tao


Any way to improve list state get performance

2022-11-15 Thread tao xiao
Hi team,

I have a Flink job that joins two streams, let's say A and B streams,
followed by a key process function. In the key process function the job
inserts elements from B stream to a list state if element from A stream
hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
check if there are elements in the list state when A stream arrives to
reduce the call to underlying state (RocksDB)

Here is the code snippet

keyfunction {

process(in, ctx, collector) {
if (in is A stream)
// anyway to check if list state is empty so that we dont need to call
get()?
for (b : liststate.get()) {
.
}

if (in is B stream)
liststate.add(in)


-- 
Regards,
Tao