Hi Robert

Thanks a lot for the answer.  I looked at the NonMergingWindowFn, and
understand the way to send data to multiple windows. I have two followup
questions:

1.  My input contains a Tuple [A, B, C, D, E] .  Since different
computations have to happen over different elements in Tuple, I have to
partition the input so that each partition can be then be output to desired
window.  Either i can send the entire tuple to each window, or i can
partition the tuple by doing a output for each element specifically. e.g

       c.output('A', Tuple[A]) // For A specific aggregation
      c.output('B', Tuple[B]) // For B specific aggregation
      c.output('AB', [Tuple[A], Tuple[B])) // Here A, B are both need to do
aggregation

     Do you suggest any BEAM primitive to achieve this?

2.  This is a general question about GroupByKey operator.  The GroupByKey
will provide KV<K, Iterable<V>> to the downstream operators .  Since this
iterable is Java iterable, this will consume lots of lots of memory , and
can cause OOM exceptions.  Windows in my case could be few hours or even
days long.  ( May be having a CustomEvictor will help so that window does
not grow beyond a certain size?)



Regards
Sumit Chawla


On Thu, Sep 22, 2016 at 1:00 AM, Robert Bradshaw <rober...@google.com>
wrote:

> So what you can do is write a WindowFn (say, subclass NonMergingWindowFn) 
> that,
> for each element, assigns it to the set of windows your interested in (of
> various sizes) based on its timestamp. The group-by-key will put the
> element in a grouping for *each* window it's assigned to. You can then
> inspect this window in a subsequent DoFn to react differently to the
> different window sizes. E.g. suppose my WindowFn's assignWindows makes the
> following assignments:
>
> (k, v1) -> [0, 3), [0, 10)
> (k, v2) -> [4, 6), [0, 10)
> (k, v3) -> [0, 3).
>
> Then after a GroupByKey the PCollection will have elements
>
> (k, [v1, v3]) in window [0, 3)
> (k, [v1, v2]) in window [0, 10)
> (k, [v2]) in window [4, 6)
>
> (and similarly for CombineValues or any other grouping operation).
>
>
> On Wed, Sep 21, 2016 at 11:34 PM, Chawla,Sumit <sumitkcha...@gmail.com>
> wrote:
>
>> Hi Robert
>>
>> The configuration can be stored in a registry, which can be polled
>> periodically.  Or it can be another stream input to the pipeline.  User
>> will define new aggregations and add to registry.
>>
>> Can you please elaborate a bit on what would that Custom WindowFn be?
>>
>> Regards
>> Sumit Chawla
>>
>>
>> On Wed, Sep 21, 2016 at 11:31 PM, Robert Bradshaw <
>> rober...@google.com.invalid> wrote:
>>
>>> This may be possible with a custom WindowFn. Where is the configuration
>>> of what aggregations to do coming from?
>>>
>>> On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit <sumitkcha...@gmail.com>
>>> wrote:
>>>
>>>> Attaching the Image.
>>>>
>>>>
>>>> ​
>>>>
>>>> Regards
>>>> Sumit Chawla
>>>>
>>>>
>>>> On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit <sumitkcha...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All
>>>>>
>>>>> I am trying to code a solution for following scenarios.
>>>>>
>>>>> 1.  I have a stream of Tuples with multiple numeric fields (e.g. A, B,
>>>>> C, D, E ... etc )
>>>>> 2.  I want the ability to do different Windowing and Aggregation on
>>>>> each field or a group of fields in the Tuple.  e.g. Sum A over a Period of
>>>>> 2 minutes, Avg B over a period of 3 minutes,  Sum of C grouped by D over a
>>>>> period of 15 minutes
>>>>> 3.  *These window requirements can be added by user at runtime*.  My
>>>>> pipeline should be able to compute a new aggregation at runtime.
>>>>> 4.  Plan to support only simple aggregation windows like SUM, AVG,
>>>>> MAX, MIN, COUNT etc.
>>>>>
>>>>>
>>>>> As i understand in BEAM pipelines ( with Flink Runner), the DAG of
>>>>> computations cannot be altered once the pipeline is deployed.  I am trying
>>>>> to see how can i support above use case.  I would love to hear your
>>>>> feedback on this, and suggestions on doing it in a completely different
>>>>> way.
>>>>>
>>>>> *My Design:*
>>>>>
>>>>> 1.   Create 1 minute buckets per Field or Group of Fields and compute
>>>>> basic aggregations for bucket.  e.g.  Buckets are highlighted in Yellow
>>>>> here.  For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket.  (
>>>>> Bucket size of 1 minute is defined for simplicity, and would limit the
>>>>> minimum window size to 1 minute)
>>>>>
>>>>> 2.  Downstream process these buckets, and compute the user defined
>>>>> aggregations.  Following diagram depicts Tumbling Window computations.  
>>>>> The
>>>>> Aggregation functions in GREEN are just NATIVE functions consuming
>>>>> different buckets, and doing aggregations on top of these buckets.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> ​
>>>>> ​
>>>>>
>>>>> *P.S.*
>>>>>
>>>>> * Some of the design choices that i have decided not to go for are:*
>>>>>
>>>>> 1.  Multiple Pipelines for doing computation.  One master pipeline
>>>>> does grouping, and sends to a different topic based on user configured
>>>>> window size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and 
>>>>> have
>>>>> a different pipeline consume each topic.
>>>>>
>>>>> 2.  Single pipeline doing all the business with predefined Windows
>>>>> defined for Downstream processing. e.g. 5, 15, 30, 60 minute windows will
>>>>> be defined which will consume from different Side Inputs.  User is only
>>>>> allowed only to select these Window sizes.  Upstream Group By operator 
>>>>> will
>>>>> route to the data to different Window Function based on user 
>>>>> configuration.
>>>>>
>>>>>
>>>>>
>>>>> Regards
>>>>> Sumit Chawla
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to