This makes sense. Thanks. On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> Hi, > > all calls to onElement() or onTimer() are syncronized for any keys. Think > of a single thread calling these methods. > Event-time timers are called when a watermark passes the timer. Watermarks > are received as special records, so the methods are called in the same > order as records (actual records or watermarks) arrive at the function. > Only for processing-time timers, actual synchronization is required. > > The NPE might be thrown because of two timers that fire one after the > other without a new record being processed in between the onTimer() calls. > In that case the state is cleared in the first call and null in the second. > > Best, Fabian > > 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> Thanks. >> >> >> >> >> I have a few follow up questions regarding ProcessFunction. I think >> that the core should take care of any synchronization issues between calls >> to onElement and onTimer in case of a keyed stream but tests do not seem to >> suggest that. >> >> >> >> I have specifically 2 questions. >> >> >> 1. Are calls to onElement(..) single threaded if scoped to a key ? As >> in on a keyed stream, is there a way that 2 or more threads can execute >> on the more than one element of a single key at one time ? Would I have to >> synchronize this construction >> >> >> >> >> *OUT accumulator = accumulatorState.value(); if (accumulator == null) >> { accumulator = acc.createNew(); }* >> >> >> >> 2. Can concurrent calls happen onTimer(..) and onElement(..) for the >> same key ? I intend to clean up state but I see NullPointers in >> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are >> executed on 2 separate threads, with on Timer removing the state ( >> clear() ) but after another thread has registered a Timer ( in onElement ). >> >> >> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on >> Race Conditions* >> accumulatorState.clear(); >> } >> >> >> >> >> >> >> PS. This is the full code. >> >> >> >> @Override >> public void processElement(IN event, Context context, Collector<OUT> out) >> throws Exception { >> TimerService timerService = context.timerService(); >> if (context.timestamp() > timerService.currentWatermark()) { >> OUT accumulator = accumulatorState.value(); >> if (accumulator == null) { >> accumulator = acc.createNew(); >> } >> accumulator.setLastModified(context.timestamp()); >> accumulatorState.update(accumulator); >> timerService.registerEventTimeTimer(context.timestamp() + gap); >> } >> } >> >> @Override >> public void onTimer(long timestamp, OnTimerContext context, Collector<OUT> >> out) throws Exception { >> OUT accumulator = accumulatorState.value(); >> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers >> on Race Conditions* >> accumulatorState.clear(); >> } >> } >> >> >> >> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> wrote: >> >>> That's correct. Removal of timers is not supported in ProcessFunction. >>> Not sure why this is supported for Triggers. >>> The common workaround for ProcessFunctions is to register multiple >>> timers and have a ValueState that stores the valid timestamp on which the >>> onTimer method should be executed. >>> When a timer fires and calls onTimer(), the method first checks whether >>> the timestamp is the correct one and leaves the method if that is not the >>> case. >>> If you want to fire on the next watermark, another trick is to register >>> multiple timers on (currentWatermark + 1). Since there is only one timer >>> per timestamp, there is only one timer which gets continuously overwritten. >>> The timer is called when the watermark is advanced. >>> >>> On the performance of the timer service. AFAIK, all methods that work >>> with some kind of timer use this service. So there is not much choice. >>> >>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>> >>>> And that further begs the question.. how performant is Timer Service. I >>>> tried to peruse through the architecture behind it but cold not find a >>>> definite clue. Is it a Scheduled Service and if yes how many threads etc... >>>> >>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Makes sense. Did a first stab at Using ProcessFunction. The >>>>> TimeService exposed by the Context does not have remove timer. Is it >>>>> primarily b'coz A Priority Queue is the storage ad remove from a >>>>> PriorityQueue is expensive ? Trigger Context does expose another version >>>>> that has removal abilities so was wondering why this dissonance... >>>>> >>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> it is not guaranteed that add() and onElement() receive the same >>>>>> object, and even if they do it is not guaranteed that a mutation of the >>>>>> object in onElement() has an effect. The object might have been >>>>>> serialized >>>>>> and stored in RocksDB. >>>>>> Hence, elements should not be modified in onElement(). >>>>>> >>>>>> Have you considered to implement the operation completely in a >>>>>> ProcessFunction instead of a session window? >>>>>> This might be more code but easier to design and reason about because >>>>>> there is no interaction of window assigner, trigger, and window function. >>>>>> >>>>>> >>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>> >: >>>>>> >>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>>>> Operator.java#L362 >>>>>>> >>>>>>> is where We could fashion as to what is emitted. Again for us it >>>>>>> seems natural to use WM to materialize a micro batches with >>>>>>> "approximate" >>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers >>>>>>> as >>>>>>> to how we could write an implementation that allows for "up till WM >>>>>>> emission" through a trigger on a Session Window would be very helpful. >>>>>>> In >>>>>>> essence I believe that for any "funnel" analysis it is crucial. >>>>>>> >>>>>>> Something like https://github.com/apache >>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s >>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti >>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346 >>>>>>> >>>>>>> I know I am simplifying this and there has to be more to it... >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> The Trigger in this case would be some CountBased Trigger.... Again >>>>>>>> the motive is the keep the state lean as we desire to search for >>>>>>>> patterns, >>>>>>>> sorted on even time, in the incoming sessionized ( and thus of un >>>>>>>> deterministic length ) stream.... >>>>>>>> >>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>> >>>>>>>>> For example, this would have worked perfect if it did not complain >>>>>>>>> about MergeableWindow and state. The Session class in this >>>>>>>>> encapsulates >>>>>>>>> the trim up to watermark behavior ( reduce call after telling it the >>>>>>>>> current WM ) we desire >>>>>>>>> >>>>>>>>> public class SessionProcessWindow extends >>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> { >>>>>>>>> >>>>>>>>> private static final ValueStateDescriptor<Session> sessionState = >>>>>>>>> new ValueStateDescriptor<>("session", Session.class); >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public void process(String key, Context context, Iterable<Event> >>>>>>>>> elements, Collector<Session> out) throws Exception { >>>>>>>>> >>>>>>>>> ValueState<Session> session = >>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>> Session s = session.value() != null ? session.value() : new >>>>>>>>> Session(); >>>>>>>>> for (Event e : elements) { >>>>>>>>> s.add(e); >>>>>>>>> } >>>>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>>>> context.currentWatermark(); >>>>>>>>> s.reduce(); >>>>>>>>> out.collect(s); >>>>>>>>> session.update(s); >>>>>>>>> } >>>>>>>>> >>>>>>>>> @Override >>>>>>>>> public void clear(Context context){ >>>>>>>>> ValueState<Session> session = >>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>> session.clear(); >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>>>> >>>>>>>>>> I think that does not work, as it is the WM of the Window >>>>>>>>>> Operator is what is desired to make deterministic decisions rather >>>>>>>>>> than off >>>>>>>>>> an operator the precedes the Window ? This is doable using >>>>>>>>>> ProcessWindowFunction using state but only in the case of non >>>>>>>>>> mergeable >>>>>>>>>> windows. >>>>>>>>>> >>>>>>>>>> The best API option I think is a TimeBaseTrigger that fires >>>>>>>>>> every configured time progression of WM and a Window implementation >>>>>>>>>> that >>>>>>>>>> materializes *only data up till that WM* ( it might have more >>>>>>>>>> data but that data has event time grater than the WM ). I am not >>>>>>>>>> sure we >>>>>>>>>> have that built in option and thus was asking for an access the >>>>>>>>>> current WM >>>>>>>>>> for the window operator to allow us handle the "*only data up >>>>>>>>>> till that WM" *range retrieval using some custom data >>>>>>>>>> structure. >>>>>>>>>> >>>>>>>>>> Regards. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com >>>>>>>>>> > wrote: >>>>>>>>>> >>>>>>>>>>> Hi Vishal, >>>>>>>>>>> >>>>>>>>>>> the Trigger is not designed to augment records but just to >>>>>>>>>>> control when a window is evaluated. >>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records >>>>>>>>>>> with the current watermark before passing them into the window >>>>>>>>>>> operator. >>>>>>>>>>> The context object of the processElement() method gives access >>>>>>>>>>> to the current watermark and timestamp of a record. >>>>>>>>>>> >>>>>>>>>>> Please note that watermarks are not deterministic but may depend >>>>>>>>>>> on the order in which parallel inputs are consumed by an operator. >>>>>>>>>>> >>>>>>>>>>> Best, Fabian >>>>>>>>>>> >>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>> >>>>>>>>>>>> An addendum >>>>>>>>>>>> >>>>>>>>>>>> Is the element reference IN in onElement(IN element.. ) in >>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN >>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to >>>>>>>>>>>> IN in the onElement() is not visible to the Accumulator that is >>>>>>>>>>>> carrying it >>>>>>>>>>>> as a previous element reference albeit in the next invocation of >>>>>>>>>>>> add(). >>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only >>>>>>>>>>>> if theses >>>>>>>>>>>> reference point to different objects. >>>>>>>>>>>> >>>>>>>>>>>> The pipeline >>>>>>>>>>>> >>>>>>>>>>>> .keyBy(keySelector) >>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>>>> .trigger(new >>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>>>> .aggregate( >>>>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>>>> return newInstance; >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>>>> >>>>>>>>>>>> /** This method is called before onElement of the >>>>>>>>>>>> Trigger and keeps the reference to the last IN **/ >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> accumulator.add(value); >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> ..... >>>>>>>>>>>> >>>>>>>>>>>> The Trigger >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>>>> Serializable & >>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> >>>>>>>>>>>> extends Trigger<T, W> { >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> >>>>>>>>>>>> public TriggerResult onElement(T element, long timestamp, W >>>>>>>>>>>> window, TriggerContext ctx) throws Exception { >>>>>>>>>>>> >>>>>>>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>>>> >>>>>>>>>>>> . >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>>>> TriggerContext. >>>>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>>>> >>>>>>>>>>>>> 1. call to add() in the accumulator for the window and save >>>>>>>>>>>>> the POJO reference in the Accumulator. >>>>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>>>> >>>>>>>>>>>>> The next add() method should have the last reference and any >>>>>>>>>>>>> mutation done in step 3. >>>>>>>>>>>>> >>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, >>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the >>>>>>>>>>>>> POJO in the >>>>>>>>>>>>> subsequent add(), but not on a distributed cluster. The specific >>>>>>>>>>>>> question >>>>>>>>>>>>> I had is whether add() on a supplied accumulator on a window and >>>>>>>>>>>>> onElement() method of the trigger on that window are inline >>>>>>>>>>>>> executions, on >>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC >>>>>>>>>>>>> that >>>>>>>>>>>>> causes these divergence ( local versus distributed ) >>>>>>>>>>>>> >>>>>>>>>>>>> Regards. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >