Thanks.
I have a few follow up questions regarding ProcessFunction. I think that the core should take care of any synchronization issues between calls to onElement and onTimer in case of a keyed stream but tests do not seem to suggest that. I have specifically 2 questions. 1. Are calls to onElement(..) single threaded if scoped to a key ? As in on a keyed stream, is there a way that 2 or more threads can execute on the more than one element of a single key at one time ? Would I have to synchronize this construction *OUT accumulator = accumulatorState.value(); if (accumulator == null) { accumulator = acc.createNew(); }* 2. Can concurrent calls happen onTimer(..) and onElement(..) for the same key ? I intend to clean up state but I see NullPointers in OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are executed on 2 separate threads, with on Timer removing the state ( clear() ) but after another thread has registered a Timer ( in onElement ). if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions* accumulatorState.clear(); } PS. This is the full code. @Override public void processElement(IN event, Context context, Collector<OUT> out) throws Exception { TimerService timerService = context.timerService(); if (context.timestamp() > timerService.currentWatermark()) { OUT accumulator = accumulatorState.value(); if (accumulator == null) { accumulator = acc.createNew(); } accumulator.setLastModified(context.timestamp()); accumulatorState.update(accumulator); timerService.registerEventTimeTimer(context.timestamp() + gap); } } @Override public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> out) throws Exception { OUT accumulator = accumulatorState.value(); if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on Race Conditions* accumulatorState.clear(); } } On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: > That's correct. Removal of timers is not supported in ProcessFunction. Not > sure why this is supported for Triggers. > The common workaround for ProcessFunctions is to register multiple timers > and have a ValueState that stores the valid timestamp on which the onTimer > method should be executed. > When a timer fires and calls onTimer(), the method first checks whether > the timestamp is the correct one and leaves the method if that is not the > case. > If you want to fire on the next watermark, another trick is to register > multiple timers on (currentWatermark + 1). Since there is only one timer > per timestamp, there is only one timer which gets continuously overwritten. > The timer is called when the watermark is advanced. > > On the performance of the timer service. AFAIK, all methods that work with > some kind of timer use this service. So there is not much choice. > > 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> And that further begs the question.. how performant is Timer Service. I >> tried to peruse through the architecture behind it but cold not find a >> definite clue. Is it a Scheduled Service and if yes how many threads etc... >> >> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService >>> exposed by the Context does not have remove timer. Is it primarily b'coz A >>> Priority Queue is the storage ad remove from a PriorityQueue is expensive >>> ? Trigger Context does expose another version that has removal abilities >>> so was wondering why this dissonance... >>> >>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi Vishal, >>>> >>>> it is not guaranteed that add() and onElement() receive the same >>>> object, and even if they do it is not guaranteed that a mutation of the >>>> object in onElement() has an effect. The object might have been serialized >>>> and stored in RocksDB. >>>> Hence, elements should not be modified in onElement(). >>>> >>>> Have you considered to implement the operation completely in a >>>> ProcessFunction instead of a session window? >>>> This might be more code but easier to design and reason about because >>>> there is no interaction of window assigner, trigger, and window function. >>>> >>>> >>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>> Operator.java#L362 >>>>> >>>>> is where We could fashion as to what is emitted. Again for us it seems >>>>> natural to use WM to materialize a micro batches with "approximate" order >>>>> ( >>>>> and no I am not a fan of spark micro batches :)). Any pointers as to how >>>>> we >>>>> could write an implementation that allows for "up till WM emission" >>>>> through >>>>> a trigger on a Session Window would be very helpful. In essence I believe >>>>> that for any "funnel" analysis it is crucial. >>>>> >>>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98 >>>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or >>>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin >>>>> gWindowOperator.java#L346 >>>>> >>>>> I know I am simplifying this and there has to be more to it... >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> The Trigger in this case would be some CountBased Trigger.... Again >>>>>> the motive is the keep the state lean as we desire to search for >>>>>> patterns, >>>>>> sorted on even time, in the incoming sessionized ( and thus of un >>>>>> deterministic length ) stream.... >>>>>> >>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>> vishal.santo...@gmail.com> wrote: >>>>>> >>>>>>> For example, this would have worked perfect if it did not complain >>>>>>> about MergeableWindow and state. The Session class in this encapsulates >>>>>>> the trim up to watermark behavior ( reduce call after telling it the >>>>>>> current WM ) we desire >>>>>>> >>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, >>>>>>> Session, String, TimeWindow> { >>>>>>> >>>>>>> private static final ValueStateDescriptor<Session> sessionState = >>>>>>> new ValueStateDescriptor<>("session", Session.class); >>>>>>> >>>>>>> @Override >>>>>>> public void process(String key, Context context, Iterable<Event> >>>>>>> elements, Collector<Session> out) throws Exception { >>>>>>> >>>>>>> ValueState<Session> session = >>>>>>> context.windowState().getState(sessionState); >>>>>>> Session s = session.value() != null ? session.value() : new >>>>>>> Session(); >>>>>>> for (Event e : elements) { >>>>>>> s.add(e); >>>>>>> } >>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>> context.currentWatermark(); >>>>>>> s.reduce(); >>>>>>> out.collect(s); >>>>>>> session.update(s); >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void clear(Context context){ >>>>>>> ValueState<Session> session = >>>>>>> context.windowState().getState(sessionState); >>>>>>> session.clear(); >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>> >>>>>>>> I think that does not work, as it is the WM of the Window Operator >>>>>>>> is what is desired to make deterministic decisions rather than off an >>>>>>>> operator the precedes the Window ? This is doable using >>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable >>>>>>>> windows. >>>>>>>> >>>>>>>> The best API option I think is a TimeBaseTrigger that fires >>>>>>>> every configured time progression of WM and a Window implementation >>>>>>>> that >>>>>>>> materializes *only data up till that WM* ( it might have more data >>>>>>>> but that data has event time grater than the WM ). I am not sure we >>>>>>>> have >>>>>>>> that built in option and thus was asking for an access the current WM >>>>>>>> for >>>>>>>> the window operator to allow us handle the "*only data up till >>>>>>>> that WM" *range retrieval using some custom data structure. >>>>>>>> >>>>>>>> Regards. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> the Trigger is not designed to augment records but just to control >>>>>>>>> when a window is evaluated. >>>>>>>>> I would recommend to use a ProcessFunction to enrich records with >>>>>>>>> the current watermark before passing them into the window operator. >>>>>>>>> The context object of the processElement() method gives access to >>>>>>>>> the current watermark and timestamp of a record. >>>>>>>>> >>>>>>>>> Please note that watermarks are not deterministic but may depend >>>>>>>>> on the order in which parallel inputs are consumed by an operator. >>>>>>>>> >>>>>>>>> Best, Fabian >>>>>>>>> >>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>> >>>>>>>>>> An addendum >>>>>>>>>> >>>>>>>>>> Is the element reference IN in onElement(IN element.. ) in >>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >>>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the >>>>>>>>>> onElement() is >>>>>>>>>> not visible to the Accumulator that is carrying it as a previous >>>>>>>>>> element >>>>>>>>>> reference albeit in the next invocation of add(). This seems to be >>>>>>>>>> only in >>>>>>>>>> distributed mode, which makes sense only if theses reference point to >>>>>>>>>> different objects. >>>>>>>>>> >>>>>>>>>> The pipeline >>>>>>>>>> >>>>>>>>>> .keyBy(keySelector) >>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>> .trigger(new >>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>> .aggregate( >>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>> return newInstance; >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>> >>>>>>>>>> /** This method is called before onElement of the >>>>>>>>>> Trigger and keeps the reference to the last IN **/ >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> accumulator.add(value); >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> ..... >>>>>>>>>> >>>>>>>>>> The Trigger >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, >>>>>>>>>> W extends Window> extends Trigger<T, W> { >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> @Override >>>>>>>>>> >>>>>>>>>> public TriggerResult onElement(T element, long timestamp, W >>>>>>>>>> window, TriggerContext ctx) throws Exception { >>>>>>>>>> >>>>>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>> >>>>>>>>>> . >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>> TriggerContext. >>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>> >>>>>>>>>>> 1. call to add() in the accumulator for the window and save the >>>>>>>>>>> POJO reference in the Accumulator. >>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>> >>>>>>>>>>> The next add() method should have the last reference and any >>>>>>>>>>> mutation done in step 3. >>>>>>>>>>> >>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as >>>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in >>>>>>>>>>> the >>>>>>>>>>> subsequent add(), but not on a distributed cluster. The specific >>>>>>>>>>> question >>>>>>>>>>> I had is whether add() on a supplied accumulator on a window and >>>>>>>>>>> onElement() method of the trigger on that window are inline >>>>>>>>>>> executions, on >>>>>>>>>>> the same thread or is there any serialization/deserialization IPC >>>>>>>>>>> that >>>>>>>>>>> causes these divergence ( local versus distributed ) >>>>>>>>>>> >>>>>>>>>>> Regards. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >