Hi team,

I have a Flink job that joins two streams, let's say A and B streams,
followed by a key process function. In the key process function the job
inserts elements from B stream to a list state if element from A stream
hasn't arrived yet. I am wondering if any way to skip the liststat.get() to
check if there are elements in the list state when A stream arrives to
reduce the call to underlying state (RocksDB)

Here is the code snippet

keyfunction {

process(in, ctx, collector) {
if (in is A stream)
// anyway to check if list state is empty so that we dont need to call
get()?
for (b : liststate.get()) {
.....
}

if (in is B stream)
liststate.add(in)


-- 
Regards,
Tao

Reply via email to