Hi Kenneth,

Thanks for the pointer, I wrote some tests that exercise the same code using 
PAssert by sending two events that get buffered by the code mentioned 
originally. I notice that the @OnTimer(“…”) method never got invoked. Is that a 
know issue with the TestPipeline?

— AC

> On Apr 11, 2017, at 10:28 AM, Ankur Chauhan <[email protected]> wrote:
> 
> Thanks for confirming a hunch that I had. I was considering doing that but 
> the javadoc saying "this feature is not implemented by any runner" sort of 
> put me off. 
> 
> Is there a more up to date list of similar in progress features? If not it 
> may be helpful to keep one. 
> 
> Thanks!
> Ankur Chauhan. 
> 
> Sent from my iPhone
> 
> On Apr 11, 2017, at 07:18, Kenneth Knowles <[email protected] 
> <mailto:[email protected]>> wrote:
> 
>> Hi Ankur,
>> 
>> If I understand your desire, then what you need for such a use case is an 
>> event time timer, to flush when you are ready. You might choose the end of 
>> the window, the window GC time or, in the global window, just some later 
>> watermark.
>> 
>>     new DoFn<...>() {
>>     
>>       @StateId("buffer")
>>       private final StateSpec<Object, BagState<Foo>> bufferSpec = 
>> StateSpec.bag(...)
>>     
>>       @TimerId("finallyCleanup")
>>       private final TimerSpec finallySpec = 
>> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>>     
>>       @ProcessElement
>>       public void process(@TimerId("finallyCleanup") Timer cleanupTimer) {
>>         cleanupTimer.set(...);
>>       }
>>     
>>       @OnTimer("finallyCleanup")
>>       public void onFinallyCleanup(@StateId("buffer") BagState<Foo> 
>> buffered) {
>>         ...
>>       }
>>     }
>> 
>> This feature hasn't been blogged about or documented thoroughly except for a 
>> couple of examples in the DoFn Javadoc. But it is available since 0.6.0.
>> 
>> Kenn
>> 
>> On Tue, Apr 11, 2017 at 3:03 AM, Ankur Chauhan <[email protected] 
>> <mailto:[email protected]>> wrote:
>> Hi,
>> 
>> I am attempting to do a seemingly simple task using the new state api. I 
>> have created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts events 
>> keyed by a particular id (session id) and intends to emit the same events 
>> partitioned by as sessionID/eventType. In the simple case this would be a 
>> normal DoFn but there is always a case where some events are not as clean as 
>> we would like and we need to save some state for the session and then emit 
>> those events later when cleanup is complete. For example:
>> 
>> Let’s say that the first few events are missing the eventType (or any other 
>> field), so we would like to buffer those events till we get the first event 
>> with the eventType field set and then use this information to emit the 
>> contents of the buffer with (last observed eventType + original contents of 
>> the buffered events),
>> 
>> For this my initial approach involved creating a BagState<Event> which would 
>> contain any buffered events and as more events came in, i would either emit 
>> the input with modification, or add the input to the buffer or, emit the 
>> events in the buffer with the input.
>> 
>> While running my test, I found that if I never get a “good” input, i.e. the 
>> session is only filled with error inputs, I would keep on buffering the 
>> input and never emit anything. My question is, how do i emit this buffer 
>> event when there is no more input?
>> 
>> — Ankur Chauhan
>> 

Reply via email to