Hi,

I am attempting to do a seemingly simple task using the new state api. I have 
created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts events keyed 
by a particular id (session id) and intends to emit the same events partitioned 
by as sessionID/eventType. In the simple case this would be a normal DoFn but 
there is always a case where some events are not as clean as we would like and 
we need to save some state for the session and then emit those events later 
when cleanup is complete. For example:

Let’s say that the first few events are missing the eventType (or any other 
field), so we would like to buffer those events till we get the first event 
with the eventType field set and then use this information to emit the contents 
of the buffer with (last observed eventType + original contents of the buffered 
events),

For this my initial approach involved creating a BagState<Event> which would 
contain any buffered events and as more events came in, i would either emit the 
input with modification, or add the input to the buffer or, emit the events in 
the buffer with the input.

While running my test, I found that if I never get a “good” input, i.e. the 
session is only filled with error inputs, I would keep on buffering the input 
and never emit anything. My question is, how do i emit this buffer event when 
there is no more input?

— Ankur Chauhan

Reply via email to