Re: Runtime Windows/Aggregation Computations in Beam
For 2) Depending on the runner, the Iterable inside the KVmay not be materialized in memory. For example, in Dataflow, the Iterable is implemented as a pointer which you walk which loads and caches blocks of data to prevent the out of memory issue that is common for keys with lots of values. On Thu, Sep 22, 2016 at 12:44 PM, Chawla,Sumit wrote: > Hi Robert > > Thanks a lot for the answer. I looked at the NonMergingWindowFn, and > understand the way to send data to multiple windows. I have two followup > questions: > > 1. My input contains a Tuple [A, B, C, D, E] . Since different > computations have to happen over different elements in Tuple, I have to > partition the input so that each partition can be then be output to desired > window. Either i can send the entire tuple to each window, or i can > partition the tuple by doing a output for each element specifically. e.g > >c.output('A', Tuple[A]) // For A specific aggregation > c.output('B', Tuple[B]) // For B specific aggregation > c.output('AB', [Tuple[A], Tuple[B])) // Here A, B are both need to > do aggregation > > Do you suggest any BEAM primitive to achieve this? > > 2. This is a general question about GroupByKey operator. The GroupByKey > will provide KV to the downstream operators . Since this > iterable is Java iterable, this will consume lots of lots of memory , and > can cause OOM exceptions. Windows in my case could be few hours or even > days long. ( May be having a CustomEvictor will help so that window does > not grow beyond a certain size?) > > > > Regards > Sumit Chawla > > > On Thu, Sep 22, 2016 at 1:00 AM, Robert Bradshaw > wrote: > >> So what you can do is write a WindowFn (say, >> subclass NonMergingWindowFn) that, for each element, assigns it to the >> set of windows your interested in (of various sizes) based on its >> timestamp. The group-by-key will put the element in a grouping for *each* >> window it's assigned to. You can then inspect this window in a subsequent >> DoFn to react differently to the different window sizes. E.g. suppose my >> WindowFn's assignWindows makes the following assignments: >> >> (k, v1) -> [0, 3), [0, 10) >> (k, v2) -> [4, 6), [0, 10) >> (k, v3) -> [0, 3). >> >> Then after a GroupByKey the PCollection will have elements >> >> (k, [v1, v3]) in window [0, 3) >> (k, [v1, v2]) in window [0, 10) >> (k, [v2]) in window [4, 6) >> >> (and similarly for CombineValues or any other grouping operation). >> >> >> On Wed, Sep 21, 2016 at 11:34 PM, Chawla,Sumit >> wrote: >> >>> Hi Robert >>> >>> The configuration can be stored in a registry, which can be polled >>> periodically. Or it can be another stream input to the pipeline. User >>> will define new aggregations and add to registry. >>> >>> Can you please elaborate a bit on what would that Custom WindowFn be? >>> >>> Regards >>> Sumit Chawla >>> >>> >>> On Wed, Sep 21, 2016 at 11:31 PM, Robert Bradshaw < >>> rober...@google.com.invalid> wrote: >>> This may be possible with a custom WindowFn. Where is the configuration of what aggregations to do coming from? On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit wrote: > Attaching the Image. > > > > > Regards > Sumit Chawla > > > On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit > wrote: > >> Hi All >> >> I am trying to code a solution for following scenarios. >> >> 1. I have a stream of Tuples with multiple numeric fields (e.g. A, >> B, C, D, E ... etc ) >> 2. I want the ability to do different Windowing and Aggregation on >> each field or a group of fields in the Tuple. e.g. Sum A over a Period >> of >> 2 minutes, Avg B over a period of 3 minutes, Sum of C grouped by D over >> a >> period of 15 minutes >> 3. *These window requirements can be added by user at runtime*. My >> pipeline should be able to compute a new aggregation at runtime. >> 4. Plan to support only simple aggregation windows like SUM, AVG, >> MAX, MIN, COUNT etc. >> >> >> As i understand in BEAM pipelines ( with Flink Runner), the DAG of >> computations cannot be altered once the pipeline is deployed. I am >> trying >> to see how can i support above use case. I would love to hear your >> feedback on this, and suggestions on doing it in a completely different >> way. >> >> *My Design:* >> >> 1. Create 1 minute buckets per Field or Group of Fields and compute >> basic aggregations for bucket. e.g. Buckets are highlighted in Yellow >> here. For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket. >> ( >> Bucket size of 1 minute is defined for simplicity, and would limit the >> minimum window size to 1 minute) >> >> 2. Downstream
Re: Runtime Windows/Aggregation Computations in Beam
Hi Robert Thanks a lot for the answer. I looked at the NonMergingWindowFn, and understand the way to send data to multiple windows. I have two followup questions: 1. My input contains a Tuple [A, B, C, D, E] . Since different computations have to happen over different elements in Tuple, I have to partition the input so that each partition can be then be output to desired window. Either i can send the entire tuple to each window, or i can partition the tuple by doing a output for each element specifically. e.g c.output('A', Tuple[A]) // For A specific aggregation c.output('B', Tuple[B]) // For B specific aggregation c.output('AB', [Tuple[A], Tuple[B])) // Here A, B are both need to do aggregation Do you suggest any BEAM primitive to achieve this? 2. This is a general question about GroupByKey operator. The GroupByKey will provide KVto the downstream operators . Since this iterable is Java iterable, this will consume lots of lots of memory , and can cause OOM exceptions. Windows in my case could be few hours or even days long. ( May be having a CustomEvictor will help so that window does not grow beyond a certain size?) Regards Sumit Chawla On Thu, Sep 22, 2016 at 1:00 AM, Robert Bradshaw wrote: > So what you can do is write a WindowFn (say, subclass NonMergingWindowFn) > that, > for each element, assigns it to the set of windows your interested in (of > various sizes) based on its timestamp. The group-by-key will put the > element in a grouping for *each* window it's assigned to. You can then > inspect this window in a subsequent DoFn to react differently to the > different window sizes. E.g. suppose my WindowFn's assignWindows makes the > following assignments: > > (k, v1) -> [0, 3), [0, 10) > (k, v2) -> [4, 6), [0, 10) > (k, v3) -> [0, 3). > > Then after a GroupByKey the PCollection will have elements > > (k, [v1, v3]) in window [0, 3) > (k, [v1, v2]) in window [0, 10) > (k, [v2]) in window [4, 6) > > (and similarly for CombineValues or any other grouping operation). > > > On Wed, Sep 21, 2016 at 11:34 PM, Chawla,Sumit > wrote: > >> Hi Robert >> >> The configuration can be stored in a registry, which can be polled >> periodically. Or it can be another stream input to the pipeline. User >> will define new aggregations and add to registry. >> >> Can you please elaborate a bit on what would that Custom WindowFn be? >> >> Regards >> Sumit Chawla >> >> >> On Wed, Sep 21, 2016 at 11:31 PM, Robert Bradshaw < >> rober...@google.com.invalid> wrote: >> >>> This may be possible with a custom WindowFn. Where is the configuration >>> of what aggregations to do coming from? >>> >>> On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit >>> wrote: >>> Attaching the Image. Regards Sumit Chawla On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit wrote: > Hi All > > I am trying to code a solution for following scenarios. > > 1. I have a stream of Tuples with multiple numeric fields (e.g. A, B, > C, D, E ... etc ) > 2. I want the ability to do different Windowing and Aggregation on > each field or a group of fields in the Tuple. e.g. Sum A over a Period of > 2 minutes, Avg B over a period of 3 minutes, Sum of C grouped by D over a > period of 15 minutes > 3. *These window requirements can be added by user at runtime*. My > pipeline should be able to compute a new aggregation at runtime. > 4. Plan to support only simple aggregation windows like SUM, AVG, > MAX, MIN, COUNT etc. > > > As i understand in BEAM pipelines ( with Flink Runner), the DAG of > computations cannot be altered once the pipeline is deployed. I am trying > to see how can i support above use case. I would love to hear your > feedback on this, and suggestions on doing it in a completely different > way. > > *My Design:* > > 1. Create 1 minute buckets per Field or Group of Fields and compute > basic aggregations for bucket. e.g. Buckets are highlighted in Yellow > here. For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket. ( > Bucket size of 1 minute is defined for simplicity, and would limit the > minimum window size to 1 minute) > > 2. Downstream process these buckets, and compute the user defined > aggregations. Following diagram depicts Tumbling Window computations. > The > Aggregation functions in GREEN are just NATIVE functions consuming > different buckets, and doing aggregations on top of these buckets. > > > > > > > > *P.S.* > > * Some of the design choices that i have decided not to go for are:* > > 1. Multiple Pipelines for doing computation. One master pipeline > does grouping, and sends to a different topic based on
Re: Runtime Windows/Aggregation Computations in Beam
Hi Robert The configuration can be stored in a registry, which can be polled periodically. Or it can be another stream input to the pipeline. User will define new aggregations and add to registry. Can you please elaborate a bit on what would that Custom WindowFn be? Regards Sumit Chawla On Wed, Sep 21, 2016 at 11:31 PM, Robert Bradshaw < rober...@google.com.invalid> wrote: > This may be possible with a custom WindowFn. Where is the configuration of > what aggregations to do coming from? > > On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit> wrote: > >> Attaching the Image. >> >> >> >> >> Regards >> Sumit Chawla >> >> >> On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit >> wrote: >> >>> Hi All >>> >>> I am trying to code a solution for following scenarios. >>> >>> 1. I have a stream of Tuples with multiple numeric fields (e.g. A, B, >>> C, D, E ... etc ) >>> 2. I want the ability to do different Windowing and Aggregation on each >>> field or a group of fields in the Tuple. e.g. Sum A over a Period of 2 >>> minutes, Avg B over a period of 3 minutes, Sum of C grouped by D over a >>> period of 15 minutes >>> 3. *These window requirements can be added by user at runtime*. My >>> pipeline should be able to compute a new aggregation at runtime. >>> 4. Plan to support only simple aggregation windows like SUM, AVG, MAX, >>> MIN, COUNT etc. >>> >>> >>> As i understand in BEAM pipelines ( with Flink Runner), the DAG of >>> computations cannot be altered once the pipeline is deployed. I am trying >>> to see how can i support above use case. I would love to hear your >>> feedback on this, and suggestions on doing it in a completely different >>> way. >>> >>> *My Design:* >>> >>> 1. Create 1 minute buckets per Field or Group of Fields and compute >>> basic aggregations for bucket. e.g. Buckets are highlighted in Yellow >>> here. For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket. ( >>> Bucket size of 1 minute is defined for simplicity, and would limit the >>> minimum window size to 1 minute) >>> >>> 2. Downstream process these buckets, and compute the user defined >>> aggregations. Following diagram depicts Tumbling Window computations. The >>> Aggregation functions in GREEN are just NATIVE functions consuming >>> different buckets, and doing aggregations on top of these buckets. >>> >>> >>> >>> >>> >>> >>> >>> *P.S.* >>> >>> * Some of the design choices that i have decided not to go for are:* >>> >>> 1. Multiple Pipelines for doing computation. One master pipeline does >>> grouping, and sends to a different topic based on user configured window >>> size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and have a >>> different pipeline consume each topic. >>> >>> 2. Single pipeline doing all the business with predefined Windows >>> defined for Downstream processing. e.g. 5, 15, 30, 60 minute windows will >>> be defined which will consume from different Side Inputs. User is only >>> allowed only to select these Window sizes. Upstream Group By operator will >>> route to the data to different Window Function based on user configuration. >>> >>> >>> >>> Regards >>> Sumit Chawla >>> >>> >> >
Runtime Windows/Aggregation Computations in Beam
Hi All I am trying to code a solution for following scenarios. 1. I have a stream of Tuples with multiple numeric fields (e.g. A, B, C, D, E ... etc ) 2. I want the ability to do different Windowing and Aggregation on each field or a group of fields in the Tuple. e.g. Sum A over a Period of 2 minutes, Avg B over a period of 3 minutes, Sum of C grouped by D over a period of 15 minutes 3. *These window requirements can be added by user at runtime*. My pipeline should be able to compute a new aggregation at runtime. 4. Plan to support only simple aggregation windows like SUM, AVG, MAX, MIN, COUNT etc. As i understand in BEAM pipelines ( with Flink Runner), the DAG of computations cannot be altered once the pipeline is deployed. I am trying to see how can i support above use case. I would love to hear your feedback on this, and suggestions on doing it in a completely different way. *My Design:* 1. Create 1 minute buckets per Field or Group of Fields and compute basic aggregations for bucket. e.g. Buckets are highlighted in Yellow here. For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket. ( Bucket size of 1 minute is defined for simplicity, and would limit the minimum window size to 1 minute) 2. Downstream process these buckets, and compute the user defined aggregations. Following diagram depicts Tumbling Window computations. The Aggregation functions in GREEN are just NATIVE functions consuming different buckets, and doing aggregations on top of these buckets. *P.S.* * Some of the design choices that i have decided not to go for are:* 1. Multiple Pipelines for doing computation. One master pipeline does grouping, and sends to a different topic based on user configured window size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and have a different pipeline consume each topic. 2. Single pipeline doing all the business with predefined Windows defined for Downstream processing. e.g. 5, 15, 30, 60 minute windows will be defined which will consume from different Side Inputs. User is only allowed only to select these Window sizes. Upstream Group By operator will route to the data to different Window Function based on user configuration. Regards Sumit Chawla