Makes sense. Did a first stab at Using ProcessFunction. The TimeService exposed by the Context does not have remove timer. Is it primarily b'coz A Priority Queue is the storage ad remove from a PriorityQueue is expensive ? Trigger Context does expose another version that has removal abilities so was wondering why this dissonance...
On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vishal, > > it is not guaranteed that add() and onElement() receive the same object, > and even if they do it is not guaranteed that a mutation of the object in > onElement() has an effect. The object might have been serialized and stored > in RocksDB. > Hence, elements should not be modified in onElement(). > > Have you considered to implement the operation completely in a > ProcessFunction instead of a session window? > This might be more code but easier to design and reason about because > there is no interaction of window assigner, trigger, and window function. > > > 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/ >> org/apache/flink/streaming/runtime/operators/windowing/Wi >> ndowOperator.java#L362 >> >> is where We could fashion as to what is emitted. Again for us it seems >> natural to use WM to materialize a micro batches with "approximate" order ( >> and no I am not a fan of spark micro batches :)). Any pointers as to how we >> could write an implementation that allows for "up till WM emission" through >> a trigger on a Session Window would be very helpful. In essence I believe >> that for any "funnel" analysis it is crucial. >> >> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98 >> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/ >> org/apache/flink/streaming/runtime/operators/windowing/Ev >> ictingWindowOperator.java#L346 >> >> I know I am simplifying this and there has to be more to it... >> >> >> >> >> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> The Trigger in this case would be some CountBased Trigger.... Again the >>> motive is the keep the state lean as we desire to search for patterns, >>> sorted on even time, in the incoming sessionized ( and thus of un >>> deterministic length ) stream.... >>> >>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>> vishal.santo...@gmail.com> wrote: >>> >>>> For example, this would have worked perfect if it did not complain >>>> about MergeableWindow and state. The Session class in this encapsulates >>>> the trim up to watermark behavior ( reduce call after telling it the >>>> current WM ) we desire >>>> >>>> public class SessionProcessWindow extends ProcessWindowFunction<Event, >>>> Session, String, TimeWindow> { >>>> >>>> private static final ValueStateDescriptor<Session> sessionState = new >>>> ValueStateDescriptor<>("session", Session.class); >>>> >>>> @Override >>>> public void process(String key, Context context, Iterable<Event> >>>> elements, Collector<Session> out) throws Exception { >>>> >>>> ValueState<Session> session = >>>> context.windowState().getState(sessionState); >>>> Session s = session.value() != null ? session.value() : new >>>> Session(); >>>> for (Event e : elements) { >>>> s.add(e); >>>> } >>>> s.lastWaterMarkedEventLite.serverTime = context.currentWatermark(); >>>> s.reduce(); >>>> out.collect(s); >>>> session.update(s); >>>> } >>>> >>>> @Override >>>> public void clear(Context context){ >>>> ValueState<Session> session = >>>> context.windowState().getState(sessionState); >>>> session.clear(); >>>> } >>>> } >>>> >>>> >>>> >>>> >>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>> vishal.santo...@gmail.com> wrote: >>>> >>>>> Hello Fabian, Thank you for the response. >>>>> >>>>> I think that does not work, as it is the WM of the Window Operator is >>>>> what is desired to make deterministic decisions rather than off an >>>>> operator >>>>> the precedes the Window ? This is doable using ProcessWindowFunction using >>>>> state but only in the case of non mergeable windows. >>>>> >>>>> The best API option I think is a TimeBaseTrigger that fires every >>>>> configured time progression of WM and a Window implementation that >>>>> materializes *only data up till that WM* ( it might have more data >>>>> but that data has event time grater than the WM ). I am not sure we have >>>>> that built in option and thus was asking for an access the current WM for >>>>> the window operator to allow us handle the "*only data up till that >>>>> WM" *range retrieval using some custom data structure. >>>>> >>>>> Regards. >>>>> >>>>> >>>>> >>>>> >>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi Vishal, >>>>>> >>>>>> the Trigger is not designed to augment records but just to control >>>>>> when a window is evaluated. >>>>>> I would recommend to use a ProcessFunction to enrich records with the >>>>>> current watermark before passing them into the window operator. >>>>>> The context object of the processElement() method gives access to the >>>>>> current watermark and timestamp of a record. >>>>>> >>>>>> Please note that watermarks are not deterministic but may depend on >>>>>> the order in which parallel inputs are consumed by an operator. >>>>>> >>>>>> Best, Fabian >>>>>> >>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>> >: >>>>>> >>>>>>> An addendum >>>>>>> >>>>>>> Is the element reference IN in onElement(IN element.. ) in >>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN value) in >>>>>>> Accumulator<IN,..>. It seems that any mutations to IN in the >>>>>>> onElement() is >>>>>>> not visible to the Accumulator that is carrying it as a previous element >>>>>>> reference albeit in the next invocation of add(). This seems to be only >>>>>>> in >>>>>>> distributed mode, which makes sense only if theses reference point to >>>>>>> different objects. >>>>>>> >>>>>>> The pipeline >>>>>>> >>>>>>> .keyBy(keySelector) >>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>> .trigger(new >>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>> .aggregate( >>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>> >>>>>>> @Override >>>>>>> public ACC createAccumulator() { >>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>> newInstance.resetLocal(); >>>>>>> return newInstance; >>>>>>> } >>>>>>> >>>>>>> @Override >>>>>>> public void add(IN value, ACC accumulator) { >>>>>>> >>>>>>> /** This method is called before onElement of the >>>>>>> Trigger and keeps the reference to the last IN **/ >>>>>>> >>>>>>> >>>>>>> accumulator.add(value); >>>>>>> >>>>>>> } >>>>>>> ..... >>>>>>> >>>>>>> The Trigger >>>>>>> >>>>>>> >>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W >>>>>>> extends Window> extends Trigger<T, W> { >>>>>>> >>>>>>> >>>>>>> @Override >>>>>>> >>>>>>> public TriggerResult onElement(T element, long timestamp, W window, >>>>>>> TriggerContext ctx) throws Exception { >>>>>>> >>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>> >>>>>>> . >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>> TriggerContext. >>>>>>>> The sequence of execution is this sequence >>>>>>>> >>>>>>>> 1. call to add() in the accumulator for the window and save the >>>>>>>> POJO reference in the Accumulator. >>>>>>>> 2. call to onElement on Tigger >>>>>>>> 3. set watermark to the POJO >>>>>>>> >>>>>>>> The next add() method should have the last reference and any >>>>>>>> mutation done in step 3. >>>>>>>> >>>>>>>> That works in a local test case, using LocalFlinkMiniCluster, as in >>>>>>>> I have access to the mutation by the onElement() in the POJO in the >>>>>>>> subsequent add(), but not on a distributed cluster. The specific >>>>>>>> question >>>>>>>> I had is whether add() on a supplied accumulator on a window and >>>>>>>> onElement() method of the trigger on that window are inline >>>>>>>> executions, on >>>>>>>> the same thread or is there any serialization/deserialization IPC that >>>>>>>> causes these divergence ( local versus distributed ) >>>>>>>> >>>>>>>> Regards. >>>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >