Hi Stephan,

If you could provide a concrete example with a small input, and the output
you except, I think I would understand requirements better. I am still not
sure if simple SlidingWindows aren't enough for your use case.

On Fri, May 4, 2018 at 9:44 AM Lukasz Cwik <[email protected]> wrote:

> Thanks for updating the description and I feel as though I better
> understand your problem and your idea about publishing to a queue is clever
> and if you feel like it works for you you should go with it.
>
> Here are some more details about if you were going to use the overlapping
> sliding windows or a StatefulDoFn.
>
> With overlapping sliding windows, you'll have a fixed set of the 1 minute
> aggregations which means that you can only compute an aggregation
> approximation for T. This can be fine if your aggregation function uses
> something like an exponential decay as the further you go back, the less
> relevant the information is.
>
> If you use a StatefulDoFn, it is partitioned per key and window, which
> means that the only way you can see all the data for a single key is to use
> the global window. Every time a new record comes in, you look at all the 5m
> aggregations that you have stored in state and recompute and update state
> and also output each 5m aggregation that would have changed. Note that you
> should evict aggregations from state for records that are old (knowing when
> to evict is dependent on what is the trigger defined on the 1min
> aggregation function).
>
> Depending on which one is appropriate for your aggregation function, I
> believe using the overlapping sliding windows will be best as all garbage
> collection will be handled automatically by the system so you don't have to
> worry about leaks but if your aggregation function doesn't work well as an
> approximation, then use the StatefulDoFn or your idea with the queue.
>
>
> On Fri, May 4, 2018 at 9:39 AM Stephan Kotze <[email protected]>
> wrote:
>
>> More experimenting:.
>>
>> I can get the concept working using a CoGroupBy IF I break the circular
>> dependency by introducing a Message Queue for example: (Same should apply
>> for using the Queue as a side input)
>>
>> ...
>>
>> PCollection<KV<String, OrderAggregateTS>> per1minAggs = ...;
>>
>> PCollection<KV<String, OrderAggregateTS>> previous5minAggs = p
>> .apply(*READ_FROM_PREVIOUS_5MIN_AGGS_QUEUE()*)
>> .apply(*SHIFT_INTO_NEW_WINDOW_VIA_CUSTOM_WINDOWFN()*);
>>
>> PCollection<KV<String, CoGbkResult>> coGroup = KeyedPCollectionTuple
>> .of(per1minAggsTag, *PER_1MIN_TAG*)
>> .and(previousPer5minAggsTag, *PREVIOUS_PER_5MIN_TAG*)
>> .apply(CoGroupByKey.create());
>> ...
>>
>> ....apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String,
>> OrderAggregateTS>>() {
>> @ProcessElement
>> public void processElement(ProcessContext c, BoundedWindow w) {
>> ....
>> Iterable<OrderAggregateTS> current1MinResults =
>> getByTupleTag(*PER_1MIN_TAG*)
>> OrderAggregateTS previous5MinResult =
>> getByTupleTag(*PREVIOUS_PER_5MIN_TAG*)
>> result = wooooohoooo(previous5MinResult,current1MinResults)!
>> c.output(result);
>> .....
>> }
>> }))
>> .apply(WRITE_TO_PREVIOUS_5MIN_AGGS_QUEUE())
>>
>> Because I can create the DoFn without a direct "confluent api" reference,
>> I can declare the DoFn with a Stream of data that will be there when it
>> runs, but doesn't need to be available at "compile time"  so to speak.
>>
>> It feels like a hoop too many to jump through, which makes me think it's
>> not the correct way to approach the issue of trying to carry values between
>> invocations of the same DoFn.
>>
>> Am I missing something obvious?
>>
>> Stephan
>>
>>
>>
>> On Fri, May 4, 2018 at 12:40 PM, Stephan Kotze <[email protected]
>> > wrote:
>>
>>> Thanks Lukasz.
>>>
>>> So I've been playing with the problem a bit more (and updated the
>>> question on Stack Overflow to reflect).
>>>
>>> The custom windowFn, love it! Will learn to embrace it.
>>>
>>> The problem however, is now the recursive nature of the declaration of
>>> the DoFn/Stream/SideInput (which probably shows I don't understand the
>>> fundamental interplay properly).
>>>
>>> I think a bit of pseudo code might illustrate best:
>>>
>>> /*My 5 minute aggregator  (over 1 minute intervals) that also requires
>>> some if it's previous running results*/
>>>
>>>
>>> PCollection<KV<String, Aggregate>> per1MinAggs =
>>> PCollection<KV<String, Aggregate>> per5MinAggs =
>>>             per1MinAggs
>>>                 .apply(Window.into(5m))
>>>                 .apply(GroupByKey.create())
>>>                 .apply(
>>>                     ParDo.of(
>>>                         new CalculateAggregateFromAggregateTS()
>>>                     )
>>>                     .withSideInputs(previous5MinAggsResultView)   //*1
>>>                   );
>>>
>>>
>>> //My view projecting backwards
>>> PCollectionView<KV<String, Aggregate>> previous5MinAggsResultView =
>>>             per5MinAggs     //*2
>>>                 .apply(Window.into(shiftIntoNext5mwindow()))
>>>                 .apply(Combine.fn()).asSingletonView());
>>>
>>>
>>> private static class CalculateAggregateFromAggregates extends DoFn<> {
>>>
>>>         public void processElement(...) {
>>>
>>>             Iterable<Aggregate> newAggregates = c.element().getValue();
>>>             Aggregate previousResult =
>>> ctx.sideinput(previous5MinAggsResultView).get();
>>>
>>>                        //I aggregate over the one minute Aggs, but I
>>> also need to compare some of their values to the running total.
>>>             result = fn(newAggregates,previousResult)
>>>
>>>             c.output(KV.of(c.element().getKey(), result));
>>>         }
>>> }
>>>
>>> So to the notes:
>>> *1: In order to build the SingleOutput here, I need to be able to
>>> provide a reference to the View . (But I don't have it yet, as I can only
>>> declare after I've declared the PCollection *2)
>>> *2 I can only declare the View AFTER I've created the PCollection that
>>> it depends on.
>>>
>>> The API appears to leave me with a bit of a chicken and egg situation.
>>> (The View depends on the PCollection and the PCollection depends on the
>>> view).
>>> Soooo:
>>> * I'm not seeing a way with to break this circular dependency with the
>>> builder API.
>>> * Maybe this is intentional as the execution actually requires this to
>>> be declared, in order to create a workable topology.
>>> * Maybe is just an API thing.
>>> * Most Likely, I'm approaching this all wrong, or I'm misunderstanding
>>> some of the API's intricacies. (I do have a feeling though that the concept
>>> of applying such a "reductive" operation may not be that wrong, and could
>>> be a nice way of Accumulating and Aggregating data along ad infinitum :) )
>>>
>>> Something I'd love to add though (on the Views)
>>> * When I use the Singleton, how can I ensure that I'm only getting the
>>> value for the Key I want in the DoFn.  I see there is an asMap(), but it
>>> feels suboptimal to pass all the KVs in the window and then just grab the
>>> one I need from a Map, rather then reducing the Collection to the correct
>>> record before hand.
>>> * The combine/reduce function is something I need to play with more, but
>>> it seems to be per-window and not per window per key (then joining only
>>> with the DoFns acting on the same key).
>>>
>>> A bit long for the SO question, But I'll update it if/when I have this
>>> figured out.
>>>
>>> Stephan
>>>
>>>
>>>
>>>
>>>
>>> On Thu, May 3, 2018 at 10:30 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Based upon your description on SO, I believe you are going down the
>>>> correct path by looking at side inputs.
>>>>
>>>> Instead of modifying the timestamp of records that are materialized so
>>>> that they appear in the current window, you should supply a window mapping
>>>> fn which just maps the current window on to the past one.
>>>> You'll want to create a custom window fn which implements the window
>>>> mapping behavior that you want paying special attention to overriding the
>>>> getDefaultWindowMappingFn function.
>>>>
>>>> Your pipeline would be like:
>>>>
>>>> PCollection<T> mySource = /* data */
>>>> PCollectionView<SumT> view = mySource
>>>>     .apply(Window.into(myCustomWindowFnWithNewWindowMappingFn))
>>>>     .apply(Combine.globally(myCombiner).asSingletonView());
>>>>
>>>> mySource.apply(ParDo.of(/* DoFn that consumes side input 
>>>> */).withSideInputs(view));
>>>>
>>>> Pay special attention to the default value the combiner will produce
>>>> since this will be the default value when the view has had no data emitted
>>>> to it.
>>>> Also, the easiest way to write your own custom window function is to
>>>> copy an existing one.
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, May 3, 2018 at 11:27 AM Stephan Kotze <
>>>> [email protected]> wrote:
>>>>
>>>>> Hi everyone
>>>>>
>>>>> I've already posted this question on stackoverflow @
>>>>> https://stackoverflow.com/questions/50161186/apache-beam-delta-between-windows
>>>>>
>>>>> But I thought it made sense to post it here as well.
>>>>>
>>>>> So here we go:
>>>>>
>>>>> I am trying to determine the delta between values calculated in
>>>>> different fixed windows and emit them.
>>>>>
>>>>>                                T-2                         T-1
>>>>>                    T
>>>>>
>>>>> |---------------------------------|--------------------------------
>>>>> |---------------------------------|
>>>>>  userID     |       clickEventCount = 3       |       clickEventCount
>>>>> = 1       |       clickEventCount = 6       |
>>>>>
>>>>>                                 |                                 |
>>>>>                            |
>>>>>                                 ˅                                 ˅
>>>>>                            ˅
>>>>> userID           clickEventCountDelta = 3
>>>>>  clickEventCountDelta = -2         clickEventCountDelta = 5
>>>>>
>>>>>
>>>>> Something like this would be grand:
>>>>>         1. KV<userID,clickEvent> --> groupByKey() -->
>>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
>>>>>         2. KV<userID,clickEventCount> --> ?(deltaFn) -->
>>>>> KV<userId,clickEventCountDelta>
>>>>>
>>>>>
>>>>> I'm struggling a bit to find a solution that is both elegant and
>>>>> scalable, so any help would be greatly appreciated.
>>>>>
>>>>> Some things I've considered/tried:
>>>>>     Window of Windows:
>>>>>           1: KV<userID,clickEvent> --> groupByKey() -->
>>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
>>>>>           2: KV<userID,clickEventCount> --> groupByKey()
>>>>> fixedWindow(X*hour) --> ParDo(Iterable<KV<userID,clickEventCount>>)
>>>>>
>>>>>     This allows me get my hands on the collection of
>>>>> KV<userID,clickEventCount> that I can iterate over and emit the delta
>>>>> between them.
>>>>>     However, the larger fixed window seems arbitrary and unnecessary
>>>>> as I want to carry the delta indefinitely, not start from 0 every few 
>>>>> hours.
>>>>>
>>>>>     Shifting the previous count's timestamp
>>>>>         1: Essentially attempting to take the
>>>>> KV<userID,clickEventCount> PTransform and emitting an additional
>>>>> KV<userID,clickEventCount> with an adjusted timestamp.
>>>>>         2: Then grabbing this time shifted timestamp as a side input
>>>>> to a PTransform/doFn for calculating the delta with the previous period.
>>>>>
>>>>>     This seemed like a cute idea, but it very quickly became a mess
>>>>> and doesn't "feel" to be the right approach.
>>>>>
>>>>>     Using an external cache:
>>>>>         1: writing the KV<userID,clickEventCount> + timestamp+window
>>>>> to a distributed cache.
>>>>>         2: Grabbing the previous value from the cache and computing
>>>>> the delta.
>>>>>
>>>>>     Doesn’t seem totally unreasonable to be able to reach back in time
>>>>> to the count of the previous window via a cache. However it "feels" wrong
>>>>> and highly inefficient given that I have the data in a PCollection
>>>>> somewhere nearby.
>>>>>
>>>>>     Stateful DoFns
>>>>>     Seems reasonable, but is it overkill (having to initialise them
>>>>> with cache lookbacks anyways as they are reset on window closes)
>>>>>
>>>>>     Sliding Window over 2 X Windows:
>>>>>     If I create a sliding window 2x the duration of the underlying
>>>>> count windows, I could potentially indefinitely emit the delta between
>>>>> every two events.
>>>>>     This also feels odd, but could be the most elegant solution here?
>>>>>
>>>>>
>>>>> None of the above really feel like the right way to approach deltas,
>>>>> and it does appear that the Compute model's state Doesn't cater for the
>>>>> "feed forward" of data in time.
>>>>>
>>>>> So :)
>>>>> I've done quite a bit of searching and I can't seem to find anything
>>>>> leading me clearly into a particular direction.
>>>>> I'm probably missing something big here, so any help would be much
>>>>> appreciated.
>>>>>
>>>>>
>>>
>>

Reply via email to