Hi Stephan, If you could provide a concrete example with a small input, and the output you except, I think I would understand requirements better. I am still not sure if simple SlidingWindows aren't enough for your use case.
On Fri, May 4, 2018 at 9:44 AM Lukasz Cwik <[email protected]> wrote: > Thanks for updating the description and I feel as though I better > understand your problem and your idea about publishing to a queue is clever > and if you feel like it works for you you should go with it. > > Here are some more details about if you were going to use the overlapping > sliding windows or a StatefulDoFn. > > With overlapping sliding windows, you'll have a fixed set of the 1 minute > aggregations which means that you can only compute an aggregation > approximation for T. This can be fine if your aggregation function uses > something like an exponential decay as the further you go back, the less > relevant the information is. > > If you use a StatefulDoFn, it is partitioned per key and window, which > means that the only way you can see all the data for a single key is to use > the global window. Every time a new record comes in, you look at all the 5m > aggregations that you have stored in state and recompute and update state > and also output each 5m aggregation that would have changed. Note that you > should evict aggregations from state for records that are old (knowing when > to evict is dependent on what is the trigger defined on the 1min > aggregation function). > > Depending on which one is appropriate for your aggregation function, I > believe using the overlapping sliding windows will be best as all garbage > collection will be handled automatically by the system so you don't have to > worry about leaks but if your aggregation function doesn't work well as an > approximation, then use the StatefulDoFn or your idea with the queue. > > > On Fri, May 4, 2018 at 9:39 AM Stephan Kotze <[email protected]> > wrote: > >> More experimenting:. >> >> I can get the concept working using a CoGroupBy IF I break the circular >> dependency by introducing a Message Queue for example: (Same should apply >> for using the Queue as a side input) >> >> ... >> >> PCollection<KV<String, OrderAggregateTS>> per1minAggs = ...; >> >> PCollection<KV<String, OrderAggregateTS>> previous5minAggs = p >> .apply(*READ_FROM_PREVIOUS_5MIN_AGGS_QUEUE()*) >> .apply(*SHIFT_INTO_NEW_WINDOW_VIA_CUSTOM_WINDOWFN()*); >> >> PCollection<KV<String, CoGbkResult>> coGroup = KeyedPCollectionTuple >> .of(per1minAggsTag, *PER_1MIN_TAG*) >> .and(previousPer5minAggsTag, *PREVIOUS_PER_5MIN_TAG*) >> .apply(CoGroupByKey.create()); >> ... >> >> ....apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, >> OrderAggregateTS>>() { >> @ProcessElement >> public void processElement(ProcessContext c, BoundedWindow w) { >> .... >> Iterable<OrderAggregateTS> current1MinResults = >> getByTupleTag(*PER_1MIN_TAG*) >> OrderAggregateTS previous5MinResult = >> getByTupleTag(*PREVIOUS_PER_5MIN_TAG*) >> result = wooooohoooo(previous5MinResult,current1MinResults)! >> c.output(result); >> ..... >> } >> })) >> .apply(WRITE_TO_PREVIOUS_5MIN_AGGS_QUEUE()) >> >> Because I can create the DoFn without a direct "confluent api" reference, >> I can declare the DoFn with a Stream of data that will be there when it >> runs, but doesn't need to be available at "compile time" so to speak. >> >> It feels like a hoop too many to jump through, which makes me think it's >> not the correct way to approach the issue of trying to carry values between >> invocations of the same DoFn. >> >> Am I missing something obvious? >> >> Stephan >> >> >> >> On Fri, May 4, 2018 at 12:40 PM, Stephan Kotze <[email protected] >> > wrote: >> >>> Thanks Lukasz. >>> >>> So I've been playing with the problem a bit more (and updated the >>> question on Stack Overflow to reflect). >>> >>> The custom windowFn, love it! Will learn to embrace it. >>> >>> The problem however, is now the recursive nature of the declaration of >>> the DoFn/Stream/SideInput (which probably shows I don't understand the >>> fundamental interplay properly). >>> >>> I think a bit of pseudo code might illustrate best: >>> >>> /*My 5 minute aggregator (over 1 minute intervals) that also requires >>> some if it's previous running results*/ >>> >>> >>> PCollection<KV<String, Aggregate>> per1MinAggs = >>> PCollection<KV<String, Aggregate>> per5MinAggs = >>> per1MinAggs >>> .apply(Window.into(5m)) >>> .apply(GroupByKey.create()) >>> .apply( >>> ParDo.of( >>> new CalculateAggregateFromAggregateTS() >>> ) >>> .withSideInputs(previous5MinAggsResultView) //*1 >>> ); >>> >>> >>> //My view projecting backwards >>> PCollectionView<KV<String, Aggregate>> previous5MinAggsResultView = >>> per5MinAggs //*2 >>> .apply(Window.into(shiftIntoNext5mwindow())) >>> .apply(Combine.fn()).asSingletonView()); >>> >>> >>> private static class CalculateAggregateFromAggregates extends DoFn<> { >>> >>> public void processElement(...) { >>> >>> Iterable<Aggregate> newAggregates = c.element().getValue(); >>> Aggregate previousResult = >>> ctx.sideinput(previous5MinAggsResultView).get(); >>> >>> //I aggregate over the one minute Aggs, but I >>> also need to compare some of their values to the running total. >>> result = fn(newAggregates,previousResult) >>> >>> c.output(KV.of(c.element().getKey(), result)); >>> } >>> } >>> >>> So to the notes: >>> *1: In order to build the SingleOutput here, I need to be able to >>> provide a reference to the View . (But I don't have it yet, as I can only >>> declare after I've declared the PCollection *2) >>> *2 I can only declare the View AFTER I've created the PCollection that >>> it depends on. >>> >>> The API appears to leave me with a bit of a chicken and egg situation. >>> (The View depends on the PCollection and the PCollection depends on the >>> view). >>> Soooo: >>> * I'm not seeing a way with to break this circular dependency with the >>> builder API. >>> * Maybe this is intentional as the execution actually requires this to >>> be declared, in order to create a workable topology. >>> * Maybe is just an API thing. >>> * Most Likely, I'm approaching this all wrong, or I'm misunderstanding >>> some of the API's intricacies. (I do have a feeling though that the concept >>> of applying such a "reductive" operation may not be that wrong, and could >>> be a nice way of Accumulating and Aggregating data along ad infinitum :) ) >>> >>> Something I'd love to add though (on the Views) >>> * When I use the Singleton, how can I ensure that I'm only getting the >>> value for the Key I want in the DoFn. I see there is an asMap(), but it >>> feels suboptimal to pass all the KVs in the window and then just grab the >>> one I need from a Map, rather then reducing the Collection to the correct >>> record before hand. >>> * The combine/reduce function is something I need to play with more, but >>> it seems to be per-window and not per window per key (then joining only >>> with the DoFns acting on the same key). >>> >>> A bit long for the SO question, But I'll update it if/when I have this >>> figured out. >>> >>> Stephan >>> >>> >>> >>> >>> >>> On Thu, May 3, 2018 at 10:30 PM, Lukasz Cwik <[email protected]> wrote: >>> >>>> Based upon your description on SO, I believe you are going down the >>>> correct path by looking at side inputs. >>>> >>>> Instead of modifying the timestamp of records that are materialized so >>>> that they appear in the current window, you should supply a window mapping >>>> fn which just maps the current window on to the past one. >>>> You'll want to create a custom window fn which implements the window >>>> mapping behavior that you want paying special attention to overriding the >>>> getDefaultWindowMappingFn function. >>>> >>>> Your pipeline would be like: >>>> >>>> PCollection<T> mySource = /* data */ >>>> PCollectionView<SumT> view = mySource >>>> .apply(Window.into(myCustomWindowFnWithNewWindowMappingFn)) >>>> .apply(Combine.globally(myCombiner).asSingletonView()); >>>> >>>> mySource.apply(ParDo.of(/* DoFn that consumes side input >>>> */).withSideInputs(view)); >>>> >>>> Pay special attention to the default value the combiner will produce >>>> since this will be the default value when the view has had no data emitted >>>> to it. >>>> Also, the easiest way to write your own custom window function is to >>>> copy an existing one. >>>> >>>> >>>> >>>> >>>> On Thu, May 3, 2018 at 11:27 AM Stephan Kotze < >>>> [email protected]> wrote: >>>> >>>>> Hi everyone >>>>> >>>>> I've already posted this question on stackoverflow @ >>>>> https://stackoverflow.com/questions/50161186/apache-beam-delta-between-windows >>>>> >>>>> But I thought it made sense to post it here as well. >>>>> >>>>> So here we go: >>>>> >>>>> I am trying to determine the delta between values calculated in >>>>> different fixed windows and emit them. >>>>> >>>>> T-2 T-1 >>>>> T >>>>> >>>>> |---------------------------------|-------------------------------- >>>>> |---------------------------------| >>>>> userID | clickEventCount = 3 | clickEventCount >>>>> = 1 | clickEventCount = 6 | >>>>> >>>>> | | >>>>> | >>>>> ˅ ˅ >>>>> ˅ >>>>> userID clickEventCountDelta = 3 >>>>> clickEventCountDelta = -2 clickEventCountDelta = 5 >>>>> >>>>> >>>>> Something like this would be grand: >>>>> 1. KV<userID,clickEvent> --> groupByKey() --> >>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount> >>>>> 2. KV<userID,clickEventCount> --> ?(deltaFn) --> >>>>> KV<userId,clickEventCountDelta> >>>>> >>>>> >>>>> I'm struggling a bit to find a solution that is both elegant and >>>>> scalable, so any help would be greatly appreciated. >>>>> >>>>> Some things I've considered/tried: >>>>> Window of Windows: >>>>> 1: KV<userID,clickEvent> --> groupByKey() --> >>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount> >>>>> 2: KV<userID,clickEventCount> --> groupByKey() >>>>> fixedWindow(X*hour) --> ParDo(Iterable<KV<userID,clickEventCount>>) >>>>> >>>>> This allows me get my hands on the collection of >>>>> KV<userID,clickEventCount> that I can iterate over and emit the delta >>>>> between them. >>>>> However, the larger fixed window seems arbitrary and unnecessary >>>>> as I want to carry the delta indefinitely, not start from 0 every few >>>>> hours. >>>>> >>>>> Shifting the previous count's timestamp >>>>> 1: Essentially attempting to take the >>>>> KV<userID,clickEventCount> PTransform and emitting an additional >>>>> KV<userID,clickEventCount> with an adjusted timestamp. >>>>> 2: Then grabbing this time shifted timestamp as a side input >>>>> to a PTransform/doFn for calculating the delta with the previous period. >>>>> >>>>> This seemed like a cute idea, but it very quickly became a mess >>>>> and doesn't "feel" to be the right approach. >>>>> >>>>> Using an external cache: >>>>> 1: writing the KV<userID,clickEventCount> + timestamp+window >>>>> to a distributed cache. >>>>> 2: Grabbing the previous value from the cache and computing >>>>> the delta. >>>>> >>>>> Doesn’t seem totally unreasonable to be able to reach back in time >>>>> to the count of the previous window via a cache. However it "feels" wrong >>>>> and highly inefficient given that I have the data in a PCollection >>>>> somewhere nearby. >>>>> >>>>> Stateful DoFns >>>>> Seems reasonable, but is it overkill (having to initialise them >>>>> with cache lookbacks anyways as they are reset on window closes) >>>>> >>>>> Sliding Window over 2 X Windows: >>>>> If I create a sliding window 2x the duration of the underlying >>>>> count windows, I could potentially indefinitely emit the delta between >>>>> every two events. >>>>> This also feels odd, but could be the most elegant solution here? >>>>> >>>>> >>>>> None of the above really feel like the right way to approach deltas, >>>>> and it does appear that the Compute model's state Doesn't cater for the >>>>> "feed forward" of data in time. >>>>> >>>>> So :) >>>>> I've done quite a bit of searching and I can't seem to find anything >>>>> leading me clearly into a particular direction. >>>>> I'm probably missing something big here, so any help would be much >>>>> appreciated. >>>>> >>>>> >>> >>
