Thanks for updating the description and I feel as though I better
understand your problem and your idea about publishing to a queue is clever
and if you feel like it works for you you should go with it.

Here are some more details about if you were going to use the overlapping
sliding windows or a StatefulDoFn.

With overlapping sliding windows, you'll have a fixed set of the 1 minute
aggregations which means that you can only compute an aggregation
approximation for T. This can be fine if your aggregation function uses
something like an exponential decay as the further you go back, the less
relevant the information is.

If you use a StatefulDoFn, it is partitioned per key and window, which
means that the only way you can see all the data for a single key is to use
the global window. Every time a new record comes in, you look at all the 5m
aggregations that you have stored in state and recompute and update state
and also output each 5m aggregation that would have changed. Note that you
should evict aggregations from state for records that are old (knowing when
to evict is dependent on what is the trigger defined on the 1min
aggregation function).

Depending on which one is appropriate for your aggregation function, I
believe using the overlapping sliding windows will be best as all garbage
collection will be handled automatically by the system so you don't have to
worry about leaks but if your aggregation function doesn't work well as an
approximation, then use the StatefulDoFn or your idea with the queue.


On Fri, May 4, 2018 at 9:39 AM Stephan Kotze <[email protected]>
wrote:

> More experimenting:.
>
> I can get the concept working using a CoGroupBy IF I break the circular
> dependency by introducing a Message Queue for example: (Same should apply
> for using the Queue as a side input)
>
> ...
>
> PCollection<KV<String, OrderAggregateTS>> per1minAggs = ...;
>
> PCollection<KV<String, OrderAggregateTS>> previous5minAggs = p
> .apply(*READ_FROM_PREVIOUS_5MIN_AGGS_QUEUE()*)
> .apply(*SHIFT_INTO_NEW_WINDOW_VIA_CUSTOM_WINDOWFN()*);
>
> PCollection<KV<String, CoGbkResult>> coGroup = KeyedPCollectionTuple
> .of(per1minAggsTag, *PER_1MIN_TAG*)
> .and(previousPer5minAggsTag, *PREVIOUS_PER_5MIN_TAG*)
> .apply(CoGroupByKey.create());
> ...
>
> ....apply(ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String,
> OrderAggregateTS>>() {
> @ProcessElement
> public void processElement(ProcessContext c, BoundedWindow w) {
> ....
> Iterable<OrderAggregateTS> current1MinResults =
> getByTupleTag(*PER_1MIN_TAG*)
> OrderAggregateTS previous5MinResult =
> getByTupleTag(*PREVIOUS_PER_5MIN_TAG*)
> result = wooooohoooo(previous5MinResult,current1MinResults)!
> c.output(result);
> .....
> }
> }))
> .apply(WRITE_TO_PREVIOUS_5MIN_AGGS_QUEUE())
>
> Because I can create the DoFn without a direct "confluent api" reference,
> I can declare the DoFn with a Stream of data that will be there when it
> runs, but doesn't need to be available at "compile time"  so to speak.
>
> It feels like a hoop too many to jump through, which makes me think it's
> not the correct way to approach the issue of trying to carry values between
> invocations of the same DoFn.
>
> Am I missing something obvious?
>
> Stephan
>
>
>
> On Fri, May 4, 2018 at 12:40 PM, Stephan Kotze <[email protected]>
> wrote:
>
>> Thanks Lukasz.
>>
>> So I've been playing with the problem a bit more (and updated the
>> question on Stack Overflow to reflect).
>>
>> The custom windowFn, love it! Will learn to embrace it.
>>
>> The problem however, is now the recursive nature of the declaration of
>> the DoFn/Stream/SideInput (which probably shows I don't understand the
>> fundamental interplay properly).
>>
>> I think a bit of pseudo code might illustrate best:
>>
>> /*My 5 minute aggregator  (over 1 minute intervals) that also requires
>> some if it's previous running results*/
>>
>>
>> PCollection<KV<String, Aggregate>> per1MinAggs =
>> PCollection<KV<String, Aggregate>> per5MinAggs =
>>             per1MinAggs
>>                 .apply(Window.into(5m))
>>                 .apply(GroupByKey.create())
>>                 .apply(
>>                     ParDo.of(
>>                         new CalculateAggregateFromAggregateTS()
>>                     )
>>                     .withSideInputs(previous5MinAggsResultView)   //*1
>>                   );
>>
>>
>> //My view projecting backwards
>> PCollectionView<KV<String, Aggregate>> previous5MinAggsResultView =
>>             per5MinAggs     //*2
>>                 .apply(Window.into(shiftIntoNext5mwindow()))
>>                 .apply(Combine.fn()).asSingletonView());
>>
>>
>> private static class CalculateAggregateFromAggregates extends DoFn<> {
>>
>>         public void processElement(...) {
>>
>>             Iterable<Aggregate> newAggregates = c.element().getValue();
>>             Aggregate previousResult =
>> ctx.sideinput(previous5MinAggsResultView).get();
>>
>>                        //I aggregate over the one minute Aggs, but I also
>> need to compare some of their values to the running total.
>>             result = fn(newAggregates,previousResult)
>>
>>             c.output(KV.of(c.element().getKey(), result));
>>         }
>> }
>>
>> So to the notes:
>> *1: In order to build the SingleOutput here, I need to be able to provide
>> a reference to the View . (But I don't have it yet, as I can only declare
>> after I've declared the PCollection *2)
>> *2 I can only declare the View AFTER I've created the PCollection that it
>> depends on.
>>
>> The API appears to leave me with a bit of a chicken and egg situation.
>> (The View depends on the PCollection and the PCollection depends on the
>> view).
>> Soooo:
>> * I'm not seeing a way with to break this circular dependency with the
>> builder API.
>> * Maybe this is intentional as the execution actually requires this to be
>> declared, in order to create a workable topology.
>> * Maybe is just an API thing.
>> * Most Likely, I'm approaching this all wrong, or I'm misunderstanding
>> some of the API's intricacies. (I do have a feeling though that the concept
>> of applying such a "reductive" operation may not be that wrong, and could
>> be a nice way of Accumulating and Aggregating data along ad infinitum :) )
>>
>> Something I'd love to add though (on the Views)
>> * When I use the Singleton, how can I ensure that I'm only getting the
>> value for the Key I want in the DoFn.  I see there is an asMap(), but it
>> feels suboptimal to pass all the KVs in the window and then just grab the
>> one I need from a Map, rather then reducing the Collection to the correct
>> record before hand.
>> * The combine/reduce function is something I need to play with more, but
>> it seems to be per-window and not per window per key (then joining only
>> with the DoFns acting on the same key).
>>
>> A bit long for the SO question, But I'll update it if/when I have this
>> figured out.
>>
>> Stephan
>>
>>
>>
>>
>>
>> On Thu, May 3, 2018 at 10:30 PM, Lukasz Cwik <[email protected]> wrote:
>>
>>> Based upon your description on SO, I believe you are going down the
>>> correct path by looking at side inputs.
>>>
>>> Instead of modifying the timestamp of records that are materialized so
>>> that they appear in the current window, you should supply a window mapping
>>> fn which just maps the current window on to the past one.
>>> You'll want to create a custom window fn which implements the window
>>> mapping behavior that you want paying special attention to overriding the
>>> getDefaultWindowMappingFn function.
>>>
>>> Your pipeline would be like:
>>>
>>> PCollection<T> mySource = /* data */
>>> PCollectionView<SumT> view = mySource
>>>     .apply(Window.into(myCustomWindowFnWithNewWindowMappingFn))
>>>     .apply(Combine.globally(myCombiner).asSingletonView());
>>>
>>> mySource.apply(ParDo.of(/* DoFn that consumes side input 
>>> */).withSideInputs(view));
>>>
>>> Pay special attention to the default value the combiner will produce
>>> since this will be the default value when the view has had no data emitted
>>> to it.
>>> Also, the easiest way to write your own custom window function is to
>>> copy an existing one.
>>>
>>>
>>>
>>>
>>> On Thu, May 3, 2018 at 11:27 AM Stephan Kotze <[email protected]>
>>> wrote:
>>>
>>>> Hi everyone
>>>>
>>>> I've already posted this question on stackoverflow @
>>>> https://stackoverflow.com/questions/50161186/apache-beam-delta-between-windows
>>>>
>>>> But I thought it made sense to post it here as well.
>>>>
>>>> So here we go:
>>>>
>>>> I am trying to determine the delta between values calculated in
>>>> different fixed windows and emit them.
>>>>
>>>>                                T-2                         T-1
>>>>                  T
>>>>
>>>> |---------------------------------|--------------------------------
>>>> |---------------------------------|
>>>>  userID     |       clickEventCount = 3       |       clickEventCount =
>>>> 1       |       clickEventCount = 6       |
>>>>
>>>>                                 |                                 |
>>>>                            |
>>>>                                 ˅                                 ˅
>>>>                            ˅
>>>> userID           clickEventCountDelta = 3
>>>>  clickEventCountDelta = -2         clickEventCountDelta = 5
>>>>
>>>>
>>>> Something like this would be grand:
>>>>         1. KV<userID,clickEvent> --> groupByKey() -->
>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
>>>>         2. KV<userID,clickEventCount> --> ?(deltaFn) -->
>>>> KV<userId,clickEventCountDelta>
>>>>
>>>>
>>>> I'm struggling a bit to find a solution that is both elegant and
>>>> scalable, so any help would be greatly appreciated.
>>>>
>>>> Some things I've considered/tried:
>>>>     Window of Windows:
>>>>           1: KV<userID,clickEvent> --> groupByKey() -->
>>>> fixedWindow(1hour) --> count() --> KV<userID,clickEventCount>
>>>>           2: KV<userID,clickEventCount> --> groupByKey()
>>>> fixedWindow(X*hour) --> ParDo(Iterable<KV<userID,clickEventCount>>)
>>>>
>>>>     This allows me get my hands on the collection of
>>>> KV<userID,clickEventCount> that I can iterate over and emit the delta
>>>> between them.
>>>>     However, the larger fixed window seems arbitrary and unnecessary as
>>>> I want to carry the delta indefinitely, not start from 0 every few hours.
>>>>
>>>>     Shifting the previous count's timestamp
>>>>         1: Essentially attempting to take the
>>>> KV<userID,clickEventCount> PTransform and emitting an additional
>>>> KV<userID,clickEventCount> with an adjusted timestamp.
>>>>         2: Then grabbing this time shifted timestamp as a side input to
>>>> a PTransform/doFn for calculating the delta with the previous period.
>>>>
>>>>     This seemed like a cute idea, but it very quickly became a mess and
>>>> doesn't "feel" to be the right approach.
>>>>
>>>>     Using an external cache:
>>>>         1: writing the KV<userID,clickEventCount> + timestamp+window to
>>>> a distributed cache.
>>>>         2: Grabbing the previous value from the cache and computing the
>>>> delta.
>>>>
>>>>     Doesn’t seem totally unreasonable to be able to reach back in time
>>>> to the count of the previous window via a cache. However it "feels" wrong
>>>> and highly inefficient given that I have the data in a PCollection
>>>> somewhere nearby.
>>>>
>>>>     Stateful DoFns
>>>>     Seems reasonable, but is it overkill (having to initialise them
>>>> with cache lookbacks anyways as they are reset on window closes)
>>>>
>>>>     Sliding Window over 2 X Windows:
>>>>     If I create a sliding window 2x the duration of the underlying
>>>> count windows, I could potentially indefinitely emit the delta between
>>>> every two events.
>>>>     This also feels odd, but could be the most elegant solution here?
>>>>
>>>>
>>>> None of the above really feel like the right way to approach deltas,
>>>> and it does appear that the Compute model's state Doesn't cater for the
>>>> "feed forward" of data in time.
>>>>
>>>> So :)
>>>> I've done quite a bit of searching and I can't seem to find anything
>>>> leading me clearly into a particular direction.
>>>> I'm probably missing something big here, so any help would be much
>>>> appreciated.
>>>>
>>>>
>>
>

Reply via email to