Hi Tao, I think you just need an extra `isEmpty` VARIABLE and maintain it properly (e.g., when restoring the job, check if the list state is empty or not).
Also, I remembered that the list state for rocksdb is not as performant as the map state when the state is large. Sometimes you could use a map state with some extra value states to simulate it. Best, Xingcan On Mon, Nov 21, 2022 at 9:20 PM tao xiao <xiaotao...@gmail.com> wrote: > any suggestion is highly appreciated > > On Tue, Nov 15, 2022 at 8:50 PM tao xiao <xiaotao...@gmail.com> wrote: > >> Hi team, >> >> I have a Flink job that joins two streams, let's say A and B streams, >> followed by a key process function. In the key process function the job >> inserts elements from B stream to a list state if element from A stream >> hasn't arrived yet. I am wondering if any way to skip the liststat.get() to >> check if there are elements in the list state when A stream arrives to >> reduce the call to underlying state (RocksDB) >> >> Here is the code snippet >> >> keyfunction { >> >> process(in, ctx, collector) { >> if (in is A stream) >> // anyway to check if list state is empty so that we dont need to call >> get()? >> for (b : liststate.get()) { >> ..... >> } >> >> if (in is B stream) >> liststate.add(in) >> >> >> -- >> Regards, >> Tao >> > > > -- > Regards, > Tao >