Makes sense. Did a first stab at Using ProcessFunction. The TimeService
exposed by the Context does not have remove timer. Is it primarily b'coz A
Priority Queue is the storage ad remove from a PriorityQueue is expensive
?  Trigger Context does expose another version that has removal abilities
so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Vishal,
>
> it is not guaranteed that add() and onElement() receive the same object,
> and even if they do it is not guaranteed that a mutation of the object in
> onElement() has an effect. The object might have been serialized and stored
> in RocksDB.
> Hence, elements should not be modified in onElement().
>
> Have you considered to implement the operation completely in a
> ProcessFunction instead of a session window?
> This might be more code but easier to design and reason about because
> there is no interaction of window assigner, trigger, and window function.
>
>
> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>
>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L362
>>
>> is where We could fashion as to what is emitted. Again for us it seems
>> natural to use WM to materialize a micro batches with "approximate" order (
>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>> could write an implementation that allows for "up till WM emission" through
>> a trigger on a Session Window would be very helpful. In essence I believe
>> that for any "funnel" analysis it is crucial.
>>
>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Ev
>> ictingWindowOperator.java#L346
>>
>> I know I am simplifying this and there has to be more to it...
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> The Trigger in this case would be some CountBased Trigger.... Again the
>>> motive is the keep the state lean as we desire to search for  patterns,
>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>> deterministic length ) stream....
>>>
>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> For example, this would have worked perfect if it did not complain
>>>> about MergeableWindow and state. The Session class in this encapsulates
>>>> the  trim up to watermark behavior ( reduce call after telling it the
>>>> current WM )  we desire
>>>>
>>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, 
>>>> Session, String, TimeWindow> {
>>>>
>>>>     private static final ValueStateDescriptor<Session> sessionState = new 
>>>> ValueStateDescriptor<>("session", Session.class);
>>>>
>>>>     @Override
>>>>     public void process(String key, Context context, Iterable<Event> 
>>>> elements, Collector<Session> out) throws Exception {
>>>>
>>>>         ValueState<Session> session = 
>>>> context.windowState().getState(sessionState);
>>>>         Session s = session.value() != null ? session.value() : new 
>>>> Session();
>>>>         for (Event e : elements) {
>>>>             s.add(e);
>>>>         }
>>>>         s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>>>         s.reduce();
>>>>         out.collect(s);
>>>>         session.update(s);
>>>>     }
>>>>
>>>>     @Override
>>>>     public void clear(Context context){
>>>>         ValueState<Session> session = 
>>>> context.windowState().getState(sessionState);
>>>>         session.clear();
>>>>     }
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> Hello Fabian, Thank you for the response.
>>>>>
>>>>>  I think that does not work, as it is the WM of the Window Operator is
>>>>> what is desired to make deterministic decisions rather than off an 
>>>>> operator
>>>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>>>> state but only in the case of non mergeable windows.
>>>>>
>>>>>    The best API  option I think is a TimeBaseTrigger that fires every
>>>>> configured time progression of WM  and a Window implementation that
>>>>> materializes *only data up till that WM* ( it might have more data
>>>>> but that data has event time grater than the WM ). I am not sure we have
>>>>> that built in option and thus was asking for an access the current WM for
>>>>> the window operator to allow  us handle the "*only data up till that
>>>>> WM" *range retrieval using some  custom data structure.
>>>>>
>>>>> Regards.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Vishal,
>>>>>>
>>>>>> the Trigger is not designed to augment records but just to control
>>>>>> when a window is evaluated.
>>>>>> I would recommend to use a ProcessFunction to enrich records with the
>>>>>> current watermark before passing them into the window operator.
>>>>>> The context object of the processElement() method gives access to the
>>>>>> current watermark and timestamp of a record.
>>>>>>
>>>>>> Please note that watermarks are not deterministic but may depend on
>>>>>> the order in which parallel inputs are consumed by an operator.
>>>>>>
>>>>>> Best, Fabian
>>>>>>
>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com
>>>>>> >:
>>>>>>
>>>>>>> An addendum
>>>>>>>
>>>>>>> Is the element reference IN  in onElement(IN element.. ) in
>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in
>>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the 
>>>>>>> onElement() is
>>>>>>> not visible to the Accumulator that is carrying it as a previous element
>>>>>>> reference albeit in the next invocation of add(). This seems to be only 
>>>>>>> in
>>>>>>> distributed mode, which makes sense only if theses reference point to
>>>>>>> different objects.
>>>>>>>
>>>>>>> The pipeline
>>>>>>>
>>>>>>> .keyBy(keySelector)
>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>> .trigger(new 
>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>> .aggregate(
>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public ACC createAccumulator() {
>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>                 newInstance.resetLocal();
>>>>>>>                 return newInstance;
>>>>>>>             }
>>>>>>>
>>>>>>>             @Override
>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>
>>>>>>>                 /** This method is called before onElement of the 
>>>>>>> Trigger and keeps the reference to the last IN **/
>>>>>>>
>>>>>>>
>>>>>>>                 accumulator.add(value);
>>>>>>>
>>>>>>>             }
>>>>>>>             .....
>>>>>>>
>>>>>>>    The Trigger
>>>>>>>
>>>>>>>
>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>         Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>>>>>>> extends Window> extends Trigger<T, W> {
>>>>>>>
>>>>>>>
>>>>>>>     @Override
>>>>>>>
>>>>>>>     public TriggerResult onElement(T element, long timestamp, W window, 
>>>>>>> TriggerContext ctx) throws Exception {
>>>>>>>
>>>>>>>         /** The element T is mutated to carry the watermark **/
>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>
>>>>>>>         .
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>
>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>> TriggerContext.
>>>>>>>> The sequence of execution is this sequence
>>>>>>>>
>>>>>>>> 1. call to add() in the accumulator for the window  and save the
>>>>>>>> POJO  reference in the Accumulator.
>>>>>>>> 2. call to onElement on Tigger
>>>>>>>> 3. set watermark to the POJO
>>>>>>>>
>>>>>>>> The next add() method should have the last reference and any
>>>>>>>> mutation done in step 3.
>>>>>>>>
>>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in
>>>>>>>> I have access to the mutation by the onElement() in the POJO in the
>>>>>>>> subsequent add(),  but not on a distributed cluster. The specific 
>>>>>>>> question
>>>>>>>> I had is whether  add() on a supplied accumulator on a window and
>>>>>>>> onElement() method of the trigger on that window are inline 
>>>>>>>> executions, on
>>>>>>>> the same thread or is there any serialization/deserialization IPC that
>>>>>>>> causes these divergence ( local versus distributed )
>>>>>>>>
>>>>>>>> Regards.
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to