This makes sense.  Thanks.

On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi,
>
> all calls to onElement() or onTimer() are syncronized for any keys. Think
> of a single thread calling these methods.
> Event-time timers are called when a watermark passes the timer. Watermarks
> are received as special records, so the methods are called in the same
> order as records (actual records or watermarks) arrive at the function.
> Only for processing-time timers, actual synchronization is required.
>
> The NPE might be thrown because of two timers that fire one after the
> other without a new record being processed in between the onTimer() calls.
> In that case the state is cleared in the first call and null in the second.
>
> Best, Fabian
>
> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> Thanks.
>>
>>
>>
>>
>>     I have a few follow up questions regarding ProcessFunction. I think
>> that the core should take care of any synchronization issues between calls
>> to onElement and onTimer in case of a keyed stream but tests do not seem to
>> suggest that.
>>
>>
>>
>> I have  specifically 2 questions.
>>
>>
>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
>> in on a keyed stream, is there a  way that 2 or more threads can execute
>> on the more than one element of a single key at one time ? Would I have to
>> synchronize this construction
>>
>>
>>
>>
>> *OUT accumulator = accumulatorState.value();        if (accumulator == null) 
>> {            accumulator = acc.createNew();        }*
>>
>>
>>
>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>> same key ? I intend to clean up state but I see  NullPointers in
>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>> executed on 2  separate threads, with on Timer removing the state (
>> clear() ) but after another thread has registered a Timer ( in onElement ).
>>
>>
>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on 
>> Race Conditions*
>>         accumulatorState.clear();
>>     }
>>
>>
>>
>>
>>
>>
>> PS. This is the full code.
>>
>>
>>
>> @Override
>> public  void processElement(IN event, Context context, Collector<OUT> out) 
>> throws Exception {
>>     TimerService timerService = context.timerService();
>>     if (context.timestamp() > timerService.currentWatermark()) {
>>         OUT accumulator = accumulatorState.value();
>>         if (accumulator == null) {
>>             accumulator = acc.createNew();
>>         }
>>         accumulator.setLastModified(context.timestamp());
>>         accumulatorState.update(accumulator);
>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>     }
>> }
>>
>> @Override
>> public  void onTimer(long timestamp, OnTimerContext context, Collector<OUT> 
>> out) throws Exception {
>>     OUT accumulator = accumulatorState.value();
>>     if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers 
>> on Race Conditions*
>>         accumulatorState.clear();
>>     }
>> }
>>
>>
>>
>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>
>>> That's correct. Removal of timers is not supported in ProcessFunction.
>>> Not sure why this is supported for Triggers.
>>> The common workaround for ProcessFunctions is to register multiple
>>> timers and have a ValueState that stores the valid timestamp on which the
>>> onTimer method should be executed.
>>> When a timer fires and calls onTimer(), the method first checks whether
>>> the timestamp is the correct one and leaves the method if that is not the
>>> case.
>>> If you want to fire on the next watermark, another trick is to register
>>> multiple timers on (currentWatermark + 1). Since there is only one timer
>>> per timestamp, there is only one timer which gets continuously overwritten.
>>> The timer is called when the watermark is advanced.
>>>
>>> On the performance of the timer service. AFAIK, all methods that work
>>> with some kind of timer use this service. So there is not much choice.
>>>
>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>
>>>> And that further begs the question.. how performant is Timer Service. I
>>>> tried to peruse through the architecture behind it but cold not find a
>>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>>
>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>> PriorityQueue is expensive ?  Trigger Context does expose another version
>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>
>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>>>> object in onElement() has an effect. The object might have been 
>>>>>> serialized
>>>>>> and stored in RocksDB.
>>>>>> Hence, elements should not be modified in onElement().
>>>>>>
>>>>>> Have you considered to implement the operation completely in a
>>>>>> ProcessFunction instead of a session window?
>>>>>> This might be more code but easier to design and reason about because
>>>>>> there is no interaction of window assigner, trigger, and window function.
>>>>>>
>>>>>>
>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>> Operator.java#L362
>>>>>>>
>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>> seems natural to use WM to materialize a micro batches with 
>>>>>>> "approximate"
>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any pointers 
>>>>>>> as
>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>> emission" through a trigger on a Session Window would be very helpful. 
>>>>>>> In
>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>
>>>>>>> Something like https://github.com/apache
>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>
>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>>>> the motive is the keep the state lean as we desire to search for  
>>>>>>>> patterns,
>>>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>>>> deterministic length ) stream....
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>>>> about MergeableWindow and state. The Session class in this 
>>>>>>>>> encapsulates
>>>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>>>> current WM )  we desire
>>>>>>>>>
>>>>>>>>> public class SessionProcessWindow extends 
>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>
>>>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = 
>>>>>>>>> new ValueStateDescriptor<>("session", Session.class);
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void process(String key, Context context, Iterable<Event> 
>>>>>>>>> elements, Collector<Session> out) throws Exception {
>>>>>>>>>
>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>         Session s = session.value() != null ? session.value() : new 
>>>>>>>>> Session();
>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>             s.add(e);
>>>>>>>>>         }
>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>>>> context.currentWatermark();
>>>>>>>>>         s.reduce();
>>>>>>>>>         out.collect(s);
>>>>>>>>>         session.update(s);
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>>     @Override
>>>>>>>>>     public void clear(Context context){
>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>         session.clear();
>>>>>>>>>     }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>
>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>> Operator is what is desired to make deterministic decisions rather 
>>>>>>>>>> than off
>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>> ProcessWindowFunction using state but only in the case of non 
>>>>>>>>>> mergeable
>>>>>>>>>> windows.
>>>>>>>>>>
>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>>>> every configured time progression of WM  and a Window implementation 
>>>>>>>>>> that
>>>>>>>>>> materializes *only data up till that WM* ( it might have more
>>>>>>>>>> data but that data has event time grater than the WM ). I am not 
>>>>>>>>>> sure we
>>>>>>>>>> have that built in option and thus was asking for an access the 
>>>>>>>>>> current WM
>>>>>>>>>> for the window operator to allow  us handle the "*only data up
>>>>>>>>>> till that WM" *range retrieval using some  custom data
>>>>>>>>>> structure.
>>>>>>>>>>
>>>>>>>>>> Regards.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com
>>>>>>>>>> > wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>
>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>> with the current watermark before passing them into the window 
>>>>>>>>>>> operator.
>>>>>>>>>>> The context object of the processElement() method gives access
>>>>>>>>>>> to the current watermark and timestamp of a record.
>>>>>>>>>>>
>>>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>>>
>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>
>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>
>>>>>>>>>>>> An addendum
>>>>>>>>>>>>
>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations to
>>>>>>>>>>>> IN in the onElement() is not visible to the Accumulator that is 
>>>>>>>>>>>> carrying it
>>>>>>>>>>>> as a previous element  reference albeit in the next invocation of 
>>>>>>>>>>>> add().
>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense only 
>>>>>>>>>>>> if theses
>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>
>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>
>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>> .trigger(new 
>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>
>>>>>>>>>>>>             @Override
>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>             }
>>>>>>>>>>>>
>>>>>>>>>>>>             @Override
>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>
>>>>>>>>>>>>                 /** This method is called before onElement of the 
>>>>>>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>
>>>>>>>>>>>>             }
>>>>>>>>>>>>             .....
>>>>>>>>>>>>
>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>         Serializable & 
>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> 
>>>>>>>>>>>> extends Trigger<T, W> {
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>
>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>
>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>
>>>>>>>>>>>>         .
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>>>> TriggerContext.
>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save
>>>>>>>>>>>>> the POJO  reference in the Accumulator.
>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>
>>>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>>>
>>>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster,
>>>>>>>>>>>>> as in I have access to the mutation by the onElement() in the 
>>>>>>>>>>>>> POJO in the
>>>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>>>>>>> question
>>>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>>>>>>> executions, on
>>>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC 
>>>>>>>>>>>>> that
>>>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to