So what you can do is write a WindowFn (say, subclass NonMergingWindowFn) that, for each element, assigns it to the set of windows your interested in (of various sizes) based on its timestamp. The group-by-key will put the element in a grouping for *each* window it's assigned to. You can then inspect this window in a subsequent DoFn to react differently to the different window sizes. E.g. suppose my WindowFn's assignWindows makes the following assignments:
(k, v1) -> [0, 3), [0, 10) (k, v2) -> [4, 6), [0, 10) (k, v3) -> [0, 3). Then after a GroupByKey the PCollection will have elements (k, [v1, v3]) in window [0, 3) (k, [v1, v2]) in window [0, 10) (k, [v2]) in window [4, 6) (and similarly for CombineValues or any other grouping operation). On Wed, Sep 21, 2016 at 11:34 PM, Chawla,Sumit <[email protected]> wrote: > Hi Robert > > The configuration can be stored in a registry, which can be polled > periodically. Or it can be another stream input to the pipeline. User > will define new aggregations and add to registry. > > Can you please elaborate a bit on what would that Custom WindowFn be? > > Regards > Sumit Chawla > > > On Wed, Sep 21, 2016 at 11:31 PM, Robert Bradshaw < > [email protected]> wrote: > >> This may be possible with a custom WindowFn. Where is the configuration >> of what aggregations to do coming from? >> >> On Wed, Sep 21, 2016 at 11:27 PM, Chawla,Sumit <[email protected]> >> wrote: >> >>> Attaching the Image. >>> >>> >>> >>> >>> Regards >>> Sumit Chawla >>> >>> >>> On Wed, Sep 21, 2016 at 11:24 PM, Chawla,Sumit <[email protected]> >>> wrote: >>> >>>> Hi All >>>> >>>> I am trying to code a solution for following scenarios. >>>> >>>> 1. I have a stream of Tuples with multiple numeric fields (e.g. A, B, >>>> C, D, E ... etc ) >>>> 2. I want the ability to do different Windowing and Aggregation on >>>> each field or a group of fields in the Tuple. e.g. Sum A over a Period of >>>> 2 minutes, Avg B over a period of 3 minutes, Sum of C grouped by D over a >>>> period of 15 minutes >>>> 3. *These window requirements can be added by user at runtime*. My >>>> pipeline should be able to compute a new aggregation at runtime. >>>> 4. Plan to support only simple aggregation windows like SUM, AVG, MAX, >>>> MIN, COUNT etc. >>>> >>>> >>>> As i understand in BEAM pipelines ( with Flink Runner), the DAG of >>>> computations cannot be altered once the pipeline is deployed. I am trying >>>> to see how can i support above use case. I would love to hear your >>>> feedback on this, and suggestions on doing it in a completely different >>>> way. >>>> >>>> *My Design:* >>>> >>>> 1. Create 1 minute buckets per Field or Group of Fields and compute >>>> basic aggregations for bucket. e.g. Buckets are highlighted in Yellow >>>> here. For each field i calculate [SUM, COUNT, MAX, MIN] in the bucket. ( >>>> Bucket size of 1 minute is defined for simplicity, and would limit the >>>> minimum window size to 1 minute) >>>> >>>> 2. Downstream process these buckets, and compute the user defined >>>> aggregations. Following diagram depicts Tumbling Window computations. The >>>> Aggregation functions in GREEN are just NATIVE functions consuming >>>> different buckets, and doing aggregations on top of these buckets. >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> *P.S.* >>>> >>>> * Some of the design choices that i have decided not to go for are:* >>>> >>>> 1. Multiple Pipelines for doing computation. One master pipeline does >>>> grouping, and sends to a different topic based on user configured window >>>> size. (e.g. topic_window_by_5_min, topic_window_by_15_min), and have a >>>> different pipeline consume each topic. >>>> >>>> 2. Single pipeline doing all the business with predefined Windows >>>> defined for Downstream processing. e.g. 5, 15, 30, 60 minute windows will >>>> be defined which will consume from different Side Inputs. User is only >>>> allowed only to select these Window sizes. Upstream Group By operator will >>>> route to the data to different Window Function based on user configuration. >>>> >>>> >>>> >>>> Regards >>>> Sumit Chawla >>>> >>>> >>> >> >
