So you have your input PCollection containing KV<id, value> that has been windowed for lets say sliding windows 5 mins and you have your PCollection after the CombinePerKey stream containing KV<id, meanLastHr> that has also been windowed using sliding windows of 5 mins.
You can use CoGroupByKey to join these two PCollections and get a PCollection containing KV<id, iterable<value>, iterable<meanLastHr>> where the iterable<meanLastHr> should only contain a single element. You can then apply a ParDo that uses the single meanLastHr value to produce your Reading(id, value, meanLastHr) output. Since both PCollections share the same keyspace, I wouldn't suggest using a map side input even though it would work. 1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey 2: https://beam.apache.org/documentation/programming-guide/#side-inputs On Fri, Feb 14, 2020 at 6:16 AM Paweł Kordek <pawel.kor...@outlook.com> wrote: > I've just read this message again and realized that the requirement "in an > hour preceding the timestamp" is not feasible nor necessary, for this part > I will use sliding windows, let's say by 5 minutes. The rest of the > question still stands, I still haven't wrapped my head around it. The > important part is that the emitted value's aggregate takes into account > current element's value itself. > > ------------------------------ > *From:* Paweł Kordek > *Sent:* Friday, February 14, 2020 10:53 > *To:* email@example.com <firstname.lastname@example.org> > *Subject:* Aggregate-and-join in Beam > > Hello > > I am working on a relatively simple streaming data pipeline. Imagine the > elements of PCollection represent something like sensor readings (with > event timestamps): > > Reading(id: String, value: Int) > > For each incoming element I want to have at the end of the pipeline: > > Reading(id: String, value: Int, meanLastHr: Double) > > where meanLastHr is the mean of all the values for a given id in an hour > preceding the timestamp of this particular reading. > > If I am to implement this in Beam, to get the means, I have to transform > the input into PCollection of KVs (id, value), apply 1hr window, > CombinePerKey and trigger on every element. My question is what is the > simplest/most idiomatic way to join these values back with the original > reading? This is one-to-one join on (event timestamp, id), I can assume > that there are no readings where this pair of values would be the same. One > way I can think of is to use the PCollection with aggregated values as a > side input to a ParDo, but I'm not sure how the windows would be mapped to > a main global-windowed collection (what if I applied a global windowing to > the side input?). If, for example, I used Flink directly I could go with > KeyedCoProcessFunction, but I don't see any concept in Beam that would map > to it directly. Any help and suggestions would be appreciated. > > Best regards > Pawel >