Re: Aggregate-and-join in Beam

2020-02-14 Thread Luke Cwik
So you have your input PCollection containing KV that has been
windowed for lets say sliding windows 5 mins and you have your PCollection
after the CombinePerKey stream containing KV that has also
been windowed using sliding windows of 5 mins.

You can use CoGroupByKey[1] to join these two PCollections and get a
PCollection containing KV, iterable> where
the iterable should only contain a single element. You can then
apply a ParDo that uses the single meanLastHr value to produce your
Reading(id, value, meanLastHr) output.

Since both PCollections share the same keyspace, I wouldn't suggest using a
map side input[2] even though it would work.

1: https://beam.apache.org/documentation/programming-guide/#cogroupbykey
2: https://beam.apache.org/documentation/programming-guide/#side-inputs

On Fri, Feb 14, 2020 at 6:16 AM Paweł Kordek 
wrote:

> I've just read this message again and realized that the requirement "in an
> hour preceding the timestamp" is not feasible nor necessary, for this part
> I will use sliding windows, let's say by 5 minutes. The rest of the
> question still stands, I still haven't wrapped my head around it. The
> important part is that the emitted value's aggregate takes into account
> current element's value itself.
>
> --
> *From:* Paweł Kordek
> *Sent:* Friday, February 14, 2020 10:53
> *To:* user@beam.apache.org 
> *Subject:* Aggregate-and-join in Beam
>
> Hello
>
> I am working on a relatively simple streaming data pipeline. Imagine the
> elements of PCollection represent something like sensor readings (with
> event timestamps):
>
> Reading(id: String, value: Int)
>
> For each incoming element I want to have at the end of the pipeline:
>
> Reading(id: String, value: Int, meanLastHr: Double)
>
> where meanLastHr is the mean of all the values for a given id in an hour
> preceding the timestamp of this particular reading.
>
> If I am to implement this in Beam, to get the means, I have to transform
> the input into PCollection of KVs (id, value), apply 1hr window,
> CombinePerKey and trigger on every element. My question is what is the
> simplest/most idiomatic way to join these values back with the original
> reading? This is one-to-one join on (event timestamp, id), I can assume
> that there are no readings where this pair of values would be the same. One
> way I can think of is to use the PCollection with aggregated values as a
> side input to a ParDo, but I'm not sure how the windows would be mapped to
> a main global-windowed collection (what if I applied a global windowing to
> the side input?). If, for example, I used Flink directly I could go with
> KeyedCoProcessFunction, but I don't see any concept in Beam that would map
> to it directly. Any help and suggestions would be appreciated.
>
> Best regards
> Pawel
>


Re: Aggregate-and-join in Beam

2020-02-14 Thread Paweł Kordek
I've just read this message again and realized that the requirement "in an hour 
preceding the timestamp" is not feasible nor necessary, for this part I will 
use sliding windows, let's say by 5 minutes. The rest of the question still 
stands, I still haven't wrapped my head around it. The important part is that 
the emitted value's aggregate takes into account current element's value itself.


From: Paweł Kordek
Sent: Friday, February 14, 2020 10:53
To: user@beam.apache.org 
Subject: Aggregate-and-join in Beam

Hello

I am working on a relatively simple streaming data pipeline. Imagine the 
elements of PCollection represent something like sensor readings (with event 
timestamps):

Reading(id: String, value: Int)

For each incoming element I want to have at the end of the pipeline:

Reading(id: String, value: Int, meanLastHr: Double)

where meanLastHr is the mean of all the values for a given id in an hour 
preceding the timestamp of this particular reading.

If I am to implement this in Beam, to get the means, I have to transform the 
input into PCollection of KVs (id, value), apply 1hr window, CombinePerKey and 
trigger on every element. My question is what is the simplest/most idiomatic 
way to join these values back with the original reading? This is one-to-one 
join on (event timestamp, id), I can assume that there are no readings where 
this pair of values would be the same. One way I can think of is to use the 
PCollection with aggregated values as a side input to a ParDo, but I'm not sure 
how the windows would be mapped to a main global-windowed collection (what if I 
applied a global windowing to the side input?). If, for example, I used Flink 
directly I could go with KeyedCoProcessFunction, but I don't see any concept in 
Beam that would map to it directly. Any help and suggestions would be 
appreciated.

Best regards
Pawel