Re: Any way to improve list state get performance
Hi Tao, I think you just need an extra `isEmpty` VARIABLE and maintain it properly (e.g., when restoring the job, check if the list state is empty or not). Also, I remembered that the list state for rocksdb is not as performant as the map state when the state is large. Sometimes you could use a map state with some extra value states to simulate it. Best, Xingcan On Mon, Nov 21, 2022 at 9:20 PM tao xiao wrote: > any suggestion is highly appreciated > > On Tue, Nov 15, 2022 at 8:50 PM tao xiao wrote: > >> Hi team, >> >> I have a Flink job that joins two streams, let's say A and B streams, >> followed by a key process function. In the key process function the job >> inserts elements from B stream to a list state if element from A stream >> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to >> check if there are elements in the list state when A stream arrives to >> reduce the call to underlying state (RocksDB) >> >> Here is the code snippet >> >> keyfunction { >> >> process(in, ctx, collector) { >> if (in is A stream) >> // anyway to check if list state is empty so that we dont need to call >> get()? >> for (b : liststate.get()) { >> . >> } >> >> if (in is B stream) >> liststate.add(in) >> >> >> -- >> Regards, >> Tao >> > > > -- > Regards, > Tao >
Re: Any way to improve list state get performance
any suggestion is highly appreciated On Tue, Nov 15, 2022 at 8:50 PM tao xiao wrote: > Hi team, > > I have a Flink job that joins two streams, let's say A and B streams, > followed by a key process function. In the key process function the job > inserts elements from B stream to a list state if element from A stream > hasn't arrived yet. I am wondering if any way to skip the liststat.get() to > check if there are elements in the list state when A stream arrives to > reduce the call to underlying state (RocksDB) > > Here is the code snippet > > keyfunction { > > process(in, ctx, collector) { > if (in is A stream) > // anyway to check if list state is empty so that we dont need to call > get()? > for (b : liststate.get()) { > . > } > > if (in is B stream) > liststate.add(in) > > > -- > Regards, > Tao > -- Regards, Tao
Any way to improve list state get performance
Hi team, I have a Flink job that joins two streams, let's say A and B streams, followed by a key process function. In the key process function the job inserts elements from B stream to a list state if element from A stream hasn't arrived yet. I am wondering if any way to skip the liststat.get() to check if there are elements in the list state when A stream arrives to reduce the call to underlying state (RocksDB) Here is the code snippet keyfunction { process(in, ctx, collector) { if (in is A stream) // anyway to check if list state is empty so that we dont need to call get()? for (b : liststate.get()) { . } if (in is B stream) liststate.add(in) -- Regards, Tao