Thanks.



    I have a few follow up questions regarding ProcessFunction. I think
that the core should take care of any synchronization issues between calls
to onElement and onTimer in case of a keyed stream but tests do not seem to
suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in
on a keyed stream, is there a  way that 2 or more threads can execute on
the more than one element of a single key at one time ? Would I have to
synchronize this construction




*OUT accumulator = accumulatorState.value();        if (accumulator ==
null) {            accumulator = acc.createNew();        }*



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same
key ? I intend to clean up state but I see  NullPointers in OnTimer(..)
thrown and I presume it is b'coz the onElement and onTimer are executed on 2
separate threads, with on Timer removing the state ( clear() ) but after
another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
        accumulatorState.clear();
    }






PS. This is the full code.



@Override
public  void processElement(IN event, Context context, Collector<OUT>
out) throws Exception {
    TimerService timerService = context.timerService();
    if (context.timestamp() > timerService.currentWatermark()) {
        OUT accumulator = accumulatorState.value();
        if (accumulator == null) {
            accumulator = acc.createNew();
        }
        accumulator.setLastModified(context.timestamp());
        accumulatorState.update(accumulator);
        timerService.registerEventTimeTimer(context.timestamp() + gap);
    }
}

@Override
public  void onTimer(long timestamp, OnTimerContext context,
Collector<OUT> out) throws Exception {
    OUT accumulator = accumulatorState.value();
    if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
        accumulatorState.clear();
    }
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> That's correct. Removal of timers is not supported in ProcessFunction. Not
> sure why this is supported for Triggers.
> The common workaround for ProcessFunctions is to register multiple timers
> and have a ValueState that stores the valid timestamp on which the onTimer
> method should be executed.
> When a timer fires and calls onTimer(), the method first checks whether
> the timestamp is the correct one and leaves the method if that is not the
> case.
> If you want to fire on the next watermark, another trick is to register
> multiple timers on (currentWatermark + 1). Since there is only one timer
> per timestamp, there is only one timer which gets continuously overwritten.
> The timer is called when the watermark is advanced.
>
> On the performance of the timer service. AFAIK, all methods that work with
> some kind of timer use this service. So there is not much choice.
>
> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> And that further begs the question.. how performant is Timer Service. I
>> tried to peruse through the architecture behind it but cold not find a
>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>
>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>>> ?  Trigger Context does expose another version that has removal abilities
>>> so was wondering why this dissonance...
>>>
>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> it is not guaranteed that add() and onElement() receive the same
>>>> object, and even if they do it is not guaranteed that a mutation of the
>>>> object in onElement() has an effect. The object might have been serialized
>>>> and stored in RocksDB.
>>>> Hence, elements should not be modified in onElement().
>>>>
>>>> Have you considered to implement the operation completely in a
>>>> ProcessFunction instead of a session window?
>>>> This might be more code but easier to design and reason about because
>>>> there is no interaction of window assigner, trigger, and window function.
>>>>
>>>>
>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>> Operator.java#L362
>>>>>
>>>>> is where We could fashion as to what is emitted. Again for us it seems
>>>>> natural to use WM to materialize a micro batches with "approximate" order 
>>>>> (
>>>>> and no I am not a fan of spark micro batches :)). Any pointers as to how 
>>>>> we
>>>>> could write an implementation that allows for "up till WM emission" 
>>>>> through
>>>>> a trigger on a Session Window would be very helpful. In essence I believe
>>>>> that for any "funnel" analysis it is crucial.
>>>>>
>>>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>>>> gWindowOperator.java#L346
>>>>>
>>>>> I know I am simplifying this and there has to be more to it...
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> The Trigger in this case would be some CountBased Trigger.... Again
>>>>>> the motive is the keep the state lean as we desire to search for  
>>>>>> patterns,
>>>>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>>>>> deterministic length ) stream....
>>>>>>
>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>
>>>>>>> For example, this would have worked perfect if it did not complain
>>>>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>>>>> current WM )  we desire
>>>>>>>
>>>>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>>>>>>> Session, String, TimeWindow> {
>>>>>>>
>>>>>>>     private static final ValueStateDescriptor<Session> sessionState = 
>>>>>>> new ValueStateDescriptor<>("session", Session.class);
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void process(String key, Context context, Iterable<Event> 
>>>>>>> elements, Collector<Session> out) throws Exception {
>>>>>>>
>>>>>>>         ValueState<Session> session = 
>>>>>>> context.windowState().getState(sessionState);
>>>>>>>         Session s = session.value() != null ? session.value() : new 
>>>>>>> Session();
>>>>>>>         for (Event e : elements) {
>>>>>>>             s.add(e);
>>>>>>>         }
>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>> context.currentWatermark();
>>>>>>>         s.reduce();
>>>>>>>         out.collect(s);
>>>>>>>         session.update(s);
>>>>>>>     }
>>>>>>>
>>>>>>>     @Override
>>>>>>>     public void clear(Context context){
>>>>>>>         ValueState<Session> session = 
>>>>>>> context.windowState().getState(sessionState);
>>>>>>>         session.clear();
>>>>>>>     }
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>
>>>>>>>>  I think that does not work, as it is the WM of the Window Operator
>>>>>>>> is what is desired to make deterministic decisions rather than off an
>>>>>>>> operator the precedes the Window ? This is doable using
>>>>>>>> ProcessWindowFunction using state but only in the case of non mergeable
>>>>>>>> windows.
>>>>>>>>
>>>>>>>>    The best API  option I think is a TimeBaseTrigger that fires
>>>>>>>> every configured time progression of WM  and a Window implementation 
>>>>>>>> that
>>>>>>>> materializes *only data up till that WM* ( it might have more data
>>>>>>>> but that data has event time grater than the WM ). I am not sure we 
>>>>>>>> have
>>>>>>>> that built in option and thus was asking for an access the current WM 
>>>>>>>> for
>>>>>>>> the window operator to allow  us handle the "*only data up till
>>>>>>>> that WM" *range retrieval using some  custom data structure.
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> the Trigger is not designed to augment records but just to control
>>>>>>>>> when a window is evaluated.
>>>>>>>>> I would recommend to use a ProcessFunction to enrich records with
>>>>>>>>> the current watermark before passing them into the window operator.
>>>>>>>>> The context object of the processElement() method gives access to
>>>>>>>>> the current watermark and timestamp of a record.
>>>>>>>>>
>>>>>>>>> Please note that watermarks are not deterministic but may depend
>>>>>>>>> on the order in which parallel inputs are consumed by an operator.
>>>>>>>>>
>>>>>>>>> Best, Fabian
>>>>>>>>>
>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> An addendum
>>>>>>>>>>
>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the 
>>>>>>>>>> onElement() is
>>>>>>>>>> not visible to the Accumulator that is carrying it as a previous 
>>>>>>>>>> element
>>>>>>>>>> reference albeit in the next invocation of add(). This seems to be 
>>>>>>>>>> only in
>>>>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>>>>> different objects.
>>>>>>>>>>
>>>>>>>>>> The pipeline
>>>>>>>>>>
>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>> .trigger(new 
>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>> .aggregate(
>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>                 return newInstance;
>>>>>>>>>>             }
>>>>>>>>>>
>>>>>>>>>>             @Override
>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>
>>>>>>>>>>                 /** This method is called before onElement of the 
>>>>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>
>>>>>>>>>>             }
>>>>>>>>>>             .....
>>>>>>>>>>
>>>>>>>>>>    The Trigger
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, 
>>>>>>>>>> W extends Window> extends Trigger<T, W> {
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>     @Override
>>>>>>>>>>
>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>>>
>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>
>>>>>>>>>>         .
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>> TriggerContext.
>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>
>>>>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>
>>>>>>>>>>> The next add() method should have the last reference and any
>>>>>>>>>>> mutation done in step 3.
>>>>>>>>>>>
>>>>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as
>>>>>>>>>>> in I have access to the mutation by the onElement() in the POJO in 
>>>>>>>>>>> the
>>>>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>>>>> question
>>>>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>>>>> executions, on
>>>>>>>>>>> the same thread or is there any serialization/deserialization IPC 
>>>>>>>>>>> that
>>>>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>>>>
>>>>>>>>>>> Regards.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to