Hello Fabian, Thank you for your response.

                     I thought about it and may be am missing something
obvious here. The code below is what I think you suggest. The issue is that
the window now is a list of Session's ( or shall subsets of the Session).

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without
purge.

I do not see how the Trigger approach helps us. It does tell us that the
watermark has progressed but to get a subset of the ListState that falls
before the watermark, we would need access to *the new value  of the
watermark*. That was what my initial query was.



public class SessionProcessWindow<IN extends HasTime & HasKey, OUT
extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT,
String, TimeWindow> {


    OUT toCreateNew;
    Long gap;
    private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

    public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
                                OUT toCreateNew) {
        this.toCreateNew = toCreateNew;
        mergingSetsStateDescriptor =
                new ListStateDescriptor<>("sessions", aggregationResultType);
    }
    @Override
    public void process(String s, Context context, Iterable<IN>
elements, Collector<OUT> out) throws Exception {
        OUT session = toCreateNew.createNew();
        elements.forEach(f -> session.add(f));
        
context.windowState().getListState(mergingSetsStateDescriptor).add(session);
    }
}


On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Vishal,
>
> thanks for sharing your solution!
>
> Looking at this issue again and your mail in which you shared your
> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
> ValueState that prevents the ProcessWindowFunction to be used in a
> mergeable window.
> You could have created a new Session object in each invocation of the
> ProcessWindowFucntion and simply keep the elements in the (mergable) list
> state of the window.
> In that case you would simply need a custom trigger that calls the
> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
> elements from the window's state.
> Did you try that?
>
> Best, Fabian
>
>
>
> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> Dear Fabian,
>>
>>            I was able to create a pretty functional ProcessFunction and
>> here is the synopsis and please see if it makes sense.
>>
>> Sessionization is unique as in it entails windows of dynamic length. The
>> way flink approaches is pretty simple. It will create a TimeWindow of size
>> "gap" relative to the event time, find an overlapping window ( intersection
>> ) and create a covering window. Each such window has a "state" associated
>> with it, which too has to be merged when a cover window is created on
>> intersection of 2 or more incident windows.To be more precise if Window1
>> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
>> a new Window is created ( t1, t4 ) and the associated states are merged.
>>
>>
>> In the current Window API the states are external and are
>> Accumulator based. This approach pretty much works for all cases where
>> the aggregation is accumulative/reduced  and does not depend on order,
>> as in no order list of incoming records needs to be kept and reduction is
>> to a single aggregated element ( think counts, min max etc). In path
>> analysis ( and other use cases ) however this approach has drawbacks. Even
>> though in our accumulator we could keep an ordered list of events it
>> becomes unreasonable if not within bounds. An approach that does
>> *attempt* to bind state, is to preemptively analyze paths using the WM
>> as the marker that defines the *subset* of the state that is safe to
>> analyze. So if we have n events in the window state and m fall before WM,
>> we can safely analyze the m subset, emitting paths seen and reducing the
>> cumulative state size. There are caveats though that I will go into later.
>>
>>
>>
>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>> have access to the WM.
>>
>>
>> This lead to this generic approach  ( implemented and tested )
>>
>>
>> * Use a low level ProcessFunction that allows access to WM and definitely
>> nearer to the guts of Flink.
>>
>>
>> * Still use the merge Windows on intersection approach but use WM to
>> trigger ( through Timers)  reductions in state. This is not very
>> dissimilar to what Flink does but we have more control over what to do and
>> when to do it. Essentially have exposed a lifecycle method that reacts
>> to WM progression.
>>
>>
>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>> of a Window, which if there is no further mutation b'coz of merge etc will
>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>>
>>
>> * There are 2 ways to short circuit a Session 1. On Session time span 2.
>> On Session size.
>>
>>
>> * There is a safety valve to blacklist keys when it is obvious that it is
>> a bot ( again
>>
>>
>> The solution will thus preemptively push out Patterns ( and correct
>> patterns ) while keeping the ordered state within reasonable bounds. The
>> incident data of course has to be analyzed . Are the paths to large etc.
>> But one has full control over how to fashion the solution.
>>
>>
>>
>>
>> Regards and Thanks,
>>
>>
>> Vishal
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This makes sense.  Thanks.
>>>
>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>> Think of a single thread calling these methods.
>>>> Event-time timers are called when a watermark passes the timer.
>>>> Watermarks are received as special records, so the methods are called in
>>>> the same order as records (actual records or watermarks) arrive at the
>>>> function. Only for processing-time timers, actual synchronization is
>>>> required.
>>>>
>>>> The NPE might be thrown because of two timers that fire one after the
>>>> other without a new record being processed in between the onTimer() calls.
>>>> In that case the state is cleared in the first call and null in the second.
>>>>
>>>> Best, Fabian
>>>>
>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> Thanks.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>> think that the core should take care of any synchronization issues between
>>>>> calls to onElement and onTimer in case of a keyed stream but tests do not
>>>>> seem to suggest that.
>>>>>
>>>>>
>>>>>
>>>>> I have  specifically 2 questions.
>>>>>
>>>>>
>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ?
>>>>> As in on a keyed stream, is there a  way that 2 or more threads can
>>>>> execute on the more than one element of a single key at one time ? Would I
>>>>> have to synchronize this construction
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == 
>>>>> null) {            accumulator = acc.createNew();        }*
>>>>>
>>>>>
>>>>>
>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>>>> same key ? I intend to clean up state but I see  NullPointers in
>>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>>>> executed on 2  separate threads, with on Timer removing the state (
>>>>> clear() ) but after another thread has registered a Timer ( in onElement 
>>>>> ).
>>>>>
>>>>>
>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers 
>>>>> on Race Conditions*
>>>>>         accumulatorState.clear();
>>>>>     }
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> PS. This is the full code.
>>>>>
>>>>>
>>>>>
>>>>> @Override
>>>>> public  void processElement(IN event, Context context, Collector<OUT> 
>>>>> out) throws Exception {
>>>>>     TimerService timerService = context.timerService();
>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>         OUT accumulator = accumulatorState.value();
>>>>>         if (accumulator == null) {
>>>>>             accumulator = acc.createNew();
>>>>>         }
>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>         accumulatorState.update(accumulator);
>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>     }
>>>>> }
>>>>>
>>>>> @Override
>>>>> public  void onTimer(long timestamp, OnTimerContext context, 
>>>>> Collector<OUT> out) throws Exception {
>>>>>     OUT accumulator = accumulatorState.value();
>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // 
>>>>> NullPointers on Race Conditions*
>>>>>         accumulatorState.clear();
>>>>>     }
>>>>> }
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> That's correct. Removal of timers is not supported in
>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>> timers and have a ValueState that stores the valid timestamp on which the
>>>>>> onTimer method should be executed.
>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>> whether the timestamp is the correct one and leaves the method if that is
>>>>>> not the case.
>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>> register multiple timers on (currentWatermark + 1). Since there is only 
>>>>>> one
>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>
>>>>>> On the performance of the timer service. AFAIK, all methods that work
>>>>>> with some kind of timer use this service. So there is not much choice.
>>>>>>
>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>> Service. I tried to peruse through the architecture behind it but cold 
>>>>>>> not
>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many 
>>>>>>> threads
>>>>>>> etc...
>>>>>>>
>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another 
>>>>>>>> version
>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>
>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Vishal,
>>>>>>>>>
>>>>>>>>> it is not guaranteed that add() and onElement() receive the same
>>>>>>>>> object, and even if they do it is not guaranteed that a mutation of 
>>>>>>>>> the
>>>>>>>>> object in onElement() has an effect. The object might have been 
>>>>>>>>> serialized
>>>>>>>>> and stored in RocksDB.
>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>
>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>> because there is no interaction of window assigner, trigger, and 
>>>>>>>>> window
>>>>>>>>> function.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>> Operator.java#L362
>>>>>>>>>>
>>>>>>>>>> is where We could fashion as to what is emitted. Again for us it
>>>>>>>>>> seems natural to use WM to materialize a micro batches with 
>>>>>>>>>> "approximate"
>>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any 
>>>>>>>>>> pointers as
>>>>>>>>>> to how we could write an implementation that allows for "up till WM
>>>>>>>>>> emission" through a trigger on a Session Window would be very 
>>>>>>>>>> helpful. In
>>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial.
>>>>>>>>>>
>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>
>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>> Again the motive is the keep the state lean as we desire to search 
>>>>>>>>>>> for
>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( and 
>>>>>>>>>>> thus of
>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this
>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call 
>>>>>>>>>>>> after telling
>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>
>>>>>>>>>>>> public class SessionProcessWindow extends 
>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>
>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> 
>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", 
>>>>>>>>>>>> Session.class);
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void process(String key, Context context, 
>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws Exception 
>>>>>>>>>>>> {
>>>>>>>>>>>>
>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : 
>>>>>>>>>>>> new Session();
>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>         }
>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>>>>>>> context.currentWatermark();
>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>>     @Override
>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>     }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>
>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions 
>>>>>>>>>>>>> rather than off
>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non 
>>>>>>>>>>>>> mergeable
>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>
>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>> fires every configured time progression of WM  and a Window 
>>>>>>>>>>>>> implementation
>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might have
>>>>>>>>>>>>> more data but that data has event time grater than the WM ). I am 
>>>>>>>>>>>>> not sure
>>>>>>>>>>>>> we have that built in option and thus was asking for an access 
>>>>>>>>>>>>> the current
>>>>>>>>>>>>> WM for the window operator to allow  us handle the "*only
>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records
>>>>>>>>>>>>>> with the current watermark before passing them into the window 
>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an 
>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations
>>>>>>>>>>>>>>> to IN in the onElement() is not visible to the Accumulator that 
>>>>>>>>>>>>>>> is carrying
>>>>>>>>>>>>>>> it as a previous element  reference albeit in the next 
>>>>>>>>>>>>>>> invocation of add().
>>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense 
>>>>>>>>>>>>>>> only if theses
>>>>>>>>>>>>>>> reference point to different objects.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>> .trigger(new 
>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                 /** This method is called before onElement of 
>>>>>>>>>>>>>>> the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>         Serializable & 
>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> 
>>>>>>>>>>>>>>> extends Trigger<T, W> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long timestamp, W 
>>>>>>>>>>>>>>> window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>>>>>>> TriggerContext.
>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by 
>>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not on a 
>>>>>>>>>>>>>>>> distributed
>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a 
>>>>>>>>>>>>>>>> supplied
>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger 
>>>>>>>>>>>>>>>> on that
>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there 
>>>>>>>>>>>>>>>> any
>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence 
>>>>>>>>>>>>>>>> ( local
>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to