Hello Fabian, Thank you for your response. I thought about it and may be am missing something obvious here. The code below is what I think you suggest. The issue is that the window now is a list of Session's ( or shall subsets of the Session).
What is required is that on a new watermark * We sort these Session objects * Get the subset that are before the new Watermark and an emit without purge. I do not see how the Trigger approach helps us. It does tell us that the watermark has progressed but to get a subset of the ListState that falls before the watermark, we would need access to *the new value of the watermark*. That was what my initial query was. public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, TimeWindow> { OUT toCreateNew; Long gap; private final ListStateDescriptor< OUT> mergingSetsStateDescriptor; public SessionProcessWindow(TypeInformation<OUT> aggregationResultType, OUT toCreateNew) { this.toCreateNew = toCreateNew; mergingSetsStateDescriptor = new ListStateDescriptor<>("sessions", aggregationResultType); } @Override public void process(String s, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception { OUT session = toCreateNew.createNew(); elements.forEach(f -> session.add(f)); context.windowState().getListState(mergingSetsStateDescriptor).add(session); } } On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Hi Vishal, > > thanks for sharing your solution! > > Looking at this issue again and your mail in which you shared your > SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the > ValueState that prevents the ProcessWindowFunction to be used in a > mergeable window. > You could have created a new Session object in each invocation of the > ProcessWindowFucntion and simply keep the elements in the (mergable) list > state of the window. > In that case you would simply need a custom trigger that calls the > ProcessWindowFunction when a new watermark arrives. For intermediate calls, > you just FIRE and for the final call you FIRE_AND_PURGE to remove the > elements from the window's state. > Did you try that? > > Best, Fabian > > > > 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > >> Dear Fabian, >> >> I was able to create a pretty functional ProcessFunction and >> here is the synopsis and please see if it makes sense. >> >> Sessionization is unique as in it entails windows of dynamic length. The >> way flink approaches is pretty simple. It will create a TimeWindow of size >> "gap" relative to the event time, find an overlapping window ( intersection >> ) and create a covering window. Each such window has a "state" associated >> with it, which too has to be merged when a cover window is created on >> intersection of 2 or more incident windows.To be more precise if Window1 >> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and t1<=t3<=t2 >> a new Window is created ( t1, t4 ) and the associated states are merged. >> >> >> In the current Window API the states are external and are >> Accumulator based. This approach pretty much works for all cases where >> the aggregation is accumulative/reduced and does not depend on order, >> as in no order list of incoming records needs to be kept and reduction is >> to a single aggregated element ( think counts, min max etc). In path >> analysis ( and other use cases ) however this approach has drawbacks. Even >> though in our accumulator we could keep an ordered list of events it >> becomes unreasonable if not within bounds. An approach that does >> *attempt* to bind state, is to preemptively analyze paths using the WM >> as the marker that defines the *subset* of the state that is safe to >> analyze. So if we have n events in the window state and m fall before WM, >> we can safely analyze the m subset, emitting paths seen and reducing the >> cumulative state size. There are caveats though that I will go into later. >> >> >> >> Unfortunately the Accumulators in Flink Window runtime defaults do not >> have access to the WM. >> >> >> This lead to this generic approach ( implemented and tested ) >> >> >> * Use a low level ProcessFunction that allows access to WM and definitely >> nearer to the guts of Flink. >> >> >> * Still use the merge Windows on intersection approach but use WM to >> trigger ( through Timers) reductions in state. This is not very >> dissimilar to what Flink does but we have more control over what to do and >> when to do it. Essentially have exposed a lifecycle method that reacts >> to WM progression. >> >> >> * There are essentially 2 Timers. The first timer is the maxTimeStamp() >> of a Window, which if there is no further mutation b'coz of merge etc will >> fire to reflect a Session End. The second one is on currentWaterMark+1 >> that essentially calls a "reduceToWM" on each keyed Window and thus State. >> >> >> * There are 2 ways to short circuit a Session 1. On Session time span 2. >> On Session size. >> >> >> * There is a safety valve to blacklist keys when it is obvious that it is >> a bot ( again >> >> >> The solution will thus preemptively push out Patterns ( and correct >> patterns ) while keeping the ordered state within reasonable bounds. The >> incident data of course has to be analyzed . Are the paths to large etc. >> But one has full control over how to fashion the solution. >> >> >> >> >> Regards and Thanks, >> >> >> Vishal >> >> >> >> >> >> >> >> >> >> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi < >> vishal.santo...@gmail.com> wrote: >> >>> This makes sense. Thanks. >>> >>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> all calls to onElement() or onTimer() are syncronized for any keys. >>>> Think of a single thread calling these methods. >>>> Event-time timers are called when a watermark passes the timer. >>>> Watermarks are received as special records, so the methods are called in >>>> the same order as records (actual records or watermarks) arrive at the >>>> function. Only for processing-time timers, actual synchronization is >>>> required. >>>> >>>> The NPE might be thrown because of two timers that fire one after the >>>> other without a new record being processed in between the onTimer() calls. >>>> In that case the state is cleared in the first call and null in the second. >>>> >>>> Best, Fabian >>>> >>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> Thanks. >>>>> >>>>> >>>>> >>>>> >>>>> I have a few follow up questions regarding ProcessFunction. I >>>>> think that the core should take care of any synchronization issues between >>>>> calls to onElement and onTimer in case of a keyed stream but tests do not >>>>> seem to suggest that. >>>>> >>>>> >>>>> >>>>> I have specifically 2 questions. >>>>> >>>>> >>>>> 1. Are calls to onElement(..) single threaded if scoped to a key ? >>>>> As in on a keyed stream, is there a way that 2 or more threads can >>>>> execute on the more than one element of a single key at one time ? Would I >>>>> have to synchronize this construction >>>>> >>>>> >>>>> >>>>> >>>>> *OUT accumulator = accumulatorState.value(); if (accumulator == >>>>> null) { accumulator = acc.createNew(); }* >>>>> >>>>> >>>>> >>>>> 2. Can concurrent calls happen onTimer(..) and onElement(..) for the >>>>> same key ? I intend to clean up state but I see NullPointers in >>>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are >>>>> executed on 2 separate threads, with on Timer removing the state ( >>>>> clear() ) but after another thread has registered a Timer ( in onElement >>>>> ). >>>>> >>>>> >>>>> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers >>>>> on Race Conditions* >>>>> accumulatorState.clear(); >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> PS. This is the full code. >>>>> >>>>> >>>>> >>>>> @Override >>>>> public void processElement(IN event, Context context, Collector<OUT> >>>>> out) throws Exception { >>>>> TimerService timerService = context.timerService(); >>>>> if (context.timestamp() > timerService.currentWatermark()) { >>>>> OUT accumulator = accumulatorState.value(); >>>>> if (accumulator == null) { >>>>> accumulator = acc.createNew(); >>>>> } >>>>> accumulator.setLastModified(context.timestamp()); >>>>> accumulatorState.update(accumulator); >>>>> timerService.registerEventTimeTimer(context.timestamp() + gap); >>>>> } >>>>> } >>>>> >>>>> @Override >>>>> public void onTimer(long timestamp, OnTimerContext context, >>>>> Collector<OUT> out) throws Exception { >>>>> OUT accumulator = accumulatorState.value(); >>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>> NullPointers on Race Conditions* >>>>> accumulatorState.clear(); >>>>> } >>>>> } >>>>> >>>>> >>>>> >>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>> wrote: >>>>> >>>>>> That's correct. Removal of timers is not supported in >>>>>> ProcessFunction. Not sure why this is supported for Triggers. >>>>>> The common workaround for ProcessFunctions is to register multiple >>>>>> timers and have a ValueState that stores the valid timestamp on which the >>>>>> onTimer method should be executed. >>>>>> When a timer fires and calls onTimer(), the method first checks >>>>>> whether the timestamp is the correct one and leaves the method if that is >>>>>> not the case. >>>>>> If you want to fire on the next watermark, another trick is to >>>>>> register multiple timers on (currentWatermark + 1). Since there is only >>>>>> one >>>>>> timer per timestamp, there is only one timer which gets continuously >>>>>> overwritten. The timer is called when the watermark is advanced. >>>>>> >>>>>> On the performance of the timer service. AFAIK, all methods that work >>>>>> with some kind of timer use this service. So there is not much choice. >>>>>> >>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com >>>>>> >: >>>>>> >>>>>>> And that further begs the question.. how performant is Timer >>>>>>> Service. I tried to peruse through the architecture behind it but cold >>>>>>> not >>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many >>>>>>> threads >>>>>>> etc... >>>>>>> >>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>> >>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The >>>>>>>> TimeService exposed by the Context does not have remove timer. Is it >>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a >>>>>>>> PriorityQueue is expensive ? Trigger Context does expose another >>>>>>>> version >>>>>>>> that has removal abilities so was wondering why this dissonance... >>>>>>>> >>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Vishal, >>>>>>>>> >>>>>>>>> it is not guaranteed that add() and onElement() receive the same >>>>>>>>> object, and even if they do it is not guaranteed that a mutation of >>>>>>>>> the >>>>>>>>> object in onElement() has an effect. The object might have been >>>>>>>>> serialized >>>>>>>>> and stored in RocksDB. >>>>>>>>> Hence, elements should not be modified in onElement(). >>>>>>>>> >>>>>>>>> Have you considered to implement the operation completely in a >>>>>>>>> ProcessFunction instead of a session window? >>>>>>>>> This might be more code but easier to design and reason about >>>>>>>>> because there is no interaction of window assigner, trigger, and >>>>>>>>> window >>>>>>>>> function. >>>>>>>>> >>>>>>>>> >>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>> >>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>>>>>>> Operator.java#L362 >>>>>>>>>> >>>>>>>>>> is where We could fashion as to what is emitted. Again for us it >>>>>>>>>> seems natural to use WM to materialize a micro batches with >>>>>>>>>> "approximate" >>>>>>>>>> order ( and no I am not a fan of spark micro batches :)). Any >>>>>>>>>> pointers as >>>>>>>>>> to how we could write an implementation that allows for "up till WM >>>>>>>>>> emission" through a trigger on a Session Window would be very >>>>>>>>>> helpful. In >>>>>>>>>> essence I believe that for any "funnel" analysis it is crucial. >>>>>>>>>> >>>>>>>>>> Something like https://github.com/apache >>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s >>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti >>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346 >>>>>>>>>> >>>>>>>>>> I know I am simplifying this and there has to be more to it... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> The Trigger in this case would be some CountBased Trigger.... >>>>>>>>>>> Again the motive is the keep the state lean as we desire to search >>>>>>>>>>> for >>>>>>>>>>> patterns, sorted on even time, in the incoming sessionized ( and >>>>>>>>>>> thus of >>>>>>>>>>> un deterministic length ) stream.... >>>>>>>>>>> >>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> For example, this would have worked perfect if it did not >>>>>>>>>>>> complain about MergeableWindow and state. The Session class in this >>>>>>>>>>>> encapsulates the trim up to watermark behavior ( reduce call >>>>>>>>>>>> after telling >>>>>>>>>>>> it the current WM ) we desire >>>>>>>>>>>> >>>>>>>>>>>> public class SessionProcessWindow extends >>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> { >>>>>>>>>>>> >>>>>>>>>>>> private static final ValueStateDescriptor<Session> >>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", >>>>>>>>>>>> Session.class); >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void process(String key, Context context, >>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws Exception >>>>>>>>>>>> { >>>>>>>>>>>> >>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>> Session s = session.value() != null ? session.value() : >>>>>>>>>>>> new Session(); >>>>>>>>>>>> for (Event e : elements) { >>>>>>>>>>>> s.add(e); >>>>>>>>>>>> } >>>>>>>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>>>>>>> context.currentWatermark(); >>>>>>>>>>>> s.reduce(); >>>>>>>>>>>> out.collect(s); >>>>>>>>>>>> session.update(s); >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @Override >>>>>>>>>>>> public void clear(Context context){ >>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>> session.clear(); >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>>>>>>> >>>>>>>>>>>>> I think that does not work, as it is the WM of the Window >>>>>>>>>>>>> Operator is what is desired to make deterministic decisions >>>>>>>>>>>>> rather than off >>>>>>>>>>>>> an operator the precedes the Window ? This is doable using >>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non >>>>>>>>>>>>> mergeable >>>>>>>>>>>>> windows. >>>>>>>>>>>>> >>>>>>>>>>>>> The best API option I think is a TimeBaseTrigger that >>>>>>>>>>>>> fires every configured time progression of WM and a Window >>>>>>>>>>>>> implementation >>>>>>>>>>>>> that materializes *only data up till that WM* ( it might have >>>>>>>>>>>>> more data but that data has event time grater than the WM ). I am >>>>>>>>>>>>> not sure >>>>>>>>>>>>> we have that built in option and thus was asking for an access >>>>>>>>>>>>> the current >>>>>>>>>>>>> WM for the window operator to allow us handle the "*only >>>>>>>>>>>>> data up till that WM" *range retrieval using some custom >>>>>>>>>>>>> data structure. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske < >>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>> >>>>>>>>>>>>>> the Trigger is not designed to augment records but just to >>>>>>>>>>>>>> control when a window is evaluated. >>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich records >>>>>>>>>>>>>> with the current watermark before passing them into the window >>>>>>>>>>>>>> operator. >>>>>>>>>>>>>> The context object of the processElement() method gives >>>>>>>>>>>>>> access to the current watermark and timestamp of a record. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Please note that watermarks are not deterministic but may >>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by an >>>>>>>>>>>>>> operator. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> An addendum >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Is the element reference IN in onElement(IN element.. ) in >>>>>>>>>>>>>>> Trigger<IN,..>, the same as IN the one provided to add(IN >>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any mutations >>>>>>>>>>>>>>> to IN in the onElement() is not visible to the Accumulator that >>>>>>>>>>>>>>> is carrying >>>>>>>>>>>>>>> it as a previous element reference albeit in the next >>>>>>>>>>>>>>> invocation of add(). >>>>>>>>>>>>>>> This seems to be only in distributed mode, which makes sense >>>>>>>>>>>>>>> only if theses >>>>>>>>>>>>>>> reference point to different objects. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The pipeline >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> .keyBy(keySelector) >>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>>>>>>> .trigger(new >>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>>>>>>> .aggregate( >>>>>>>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>>>>>>> return newInstance; >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> /** This method is called before onElement of >>>>>>>>>>>>>>> the Trigger and keeps the reference to the last IN **/ >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> accumulator.add(value); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The Trigger >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>>>>>>> Serializable & >>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends Window> >>>>>>>>>>>>>>> extends Trigger<T, W> { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public TriggerResult onElement(T element, long timestamp, W >>>>>>>>>>>>>>> window, TriggerContext ctx) throws Exception { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> /** The element T is mutated to carry the watermark **/ >>>>>>>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> . >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>>>>>>> TriggerContext. >>>>>>>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window and >>>>>>>>>>>>>>>> save the POJO reference in the Accumulator. >>>>>>>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The next add() method should have the last reference and >>>>>>>>>>>>>>>> any mutation done in step 3. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> That works in a local test case, using >>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation by >>>>>>>>>>>>>>>> the >>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(), but not on a >>>>>>>>>>>>>>>> distributed >>>>>>>>>>>>>>>> cluster. The specific question I had is whether add() on a >>>>>>>>>>>>>>>> supplied >>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the trigger >>>>>>>>>>>>>>>> on that >>>>>>>>>>>>>>>> window are inline executions, on the same thread or is there >>>>>>>>>>>>>>>> any >>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these divergence >>>>>>>>>>>>>>>> ( local >>>>>>>>>>>>>>>> versus distributed ) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >