Hi Kenneth, Thanks for the pointer, I wrote some tests that exercise the same code using PAssert by sending two events that get buffered by the code mentioned originally. I notice that the @OnTimer(“…”) method never got invoked. Is that a know issue with the TestPipeline?
— AC > On Apr 11, 2017, at 10:28 AM, Ankur Chauhan <[email protected]> wrote: > > Thanks for confirming a hunch that I had. I was considering doing that but > the javadoc saying "this feature is not implemented by any runner" sort of > put me off. > > Is there a more up to date list of similar in progress features? If not it > may be helpful to keep one. > > Thanks! > Ankur Chauhan. > > Sent from my iPhone > > On Apr 11, 2017, at 07:18, Kenneth Knowles <[email protected] > <mailto:[email protected]>> wrote: > >> Hi Ankur, >> >> If I understand your desire, then what you need for such a use case is an >> event time timer, to flush when you are ready. You might choose the end of >> the window, the window GC time or, in the global window, just some later >> watermark. >> >> new DoFn<...>() { >> >> @StateId("buffer") >> private final StateSpec<Object, BagState<Foo>> bufferSpec = >> StateSpec.bag(...) >> >> @TimerId("finallyCleanup") >> private final TimerSpec finallySpec = >> TimerSpecs.timer(TimeDomain.EVENT_TIME); >> >> @ProcessElement >> public void process(@TimerId("finallyCleanup") Timer cleanupTimer) { >> cleanupTimer.set(...); >> } >> >> @OnTimer("finallyCleanup") >> public void onFinallyCleanup(@StateId("buffer") BagState<Foo> >> buffered) { >> ... >> } >> } >> >> This feature hasn't been blogged about or documented thoroughly except for a >> couple of examples in the DoFn Javadoc. But it is available since 0.6.0. >> >> Kenn >> >> On Tue, Apr 11, 2017 at 3:03 AM, Ankur Chauhan <[email protected] >> <mailto:[email protected]>> wrote: >> Hi, >> >> I am attempting to do a seemingly simple task using the new state api. I >> have created a DoFn<KV<Sting, Event>, KV<String, Event>> that accepts events >> keyed by a particular id (session id) and intends to emit the same events >> partitioned by as sessionID/eventType. In the simple case this would be a >> normal DoFn but there is always a case where some events are not as clean as >> we would like and we need to save some state for the session and then emit >> those events later when cleanup is complete. For example: >> >> Let’s say that the first few events are missing the eventType (or any other >> field), so we would like to buffer those events till we get the first event >> with the eventType field set and then use this information to emit the >> contents of the buffer with (last observed eventType + original contents of >> the buffered events), >> >> For this my initial approach involved creating a BagState<Event> which would >> contain any buffered events and as more events came in, i would either emit >> the input with modification, or add the input to the buffer or, emit the >> events in the buffer with the input. >> >> While running my test, I found that if I never get a “good” input, i.e. the >> session is only filled with error inputs, I would keep on buffering the >> input and never emit anything. My question is, how do i emit this buffer >> event when there is no more input? >> >> — Ankur Chauhan >>
