Re: Joining PCollections to aggregates of themselves

2019-10-11 Thread Kenneth Knowles
This seems a great example of use of stateful DoFn. It has essentially the same structure as the example on the Beam blog but is more meaningful. Kenn On Fri, Oct 11, 2019 at 12:38 PM Robert Bradshaw wrote: > OK, the only way to do this would be via a non-determanistic stateful > DoFn that

Re: Joining PCollections to aggregates of themselves

2019-10-11 Thread Robert Bradshaw
OK, the only way to do this would be via a non-determanistic stateful DoFn that buffers elements as they come in and computes averages by looking at the buffer each time. This could also be represented with an extension to window merging and a join, where the trigger would be explicitly used to

Re: Joining PCollections to aggregates of themselves

2019-10-11 Thread Sam Stephens
On 2019/10/10 18:23:46, Eugene Kirpichov wrote: > " input elements can pass through the Joiner DoFn before the sideInput > corresponding to that element is present" > > I don't think this is correct. Runners will evaluate a DoFn with side > inputs on elements in a given window only after all

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread Robert Bradshaw
Looking at the naive solution PCollectionView agg = input .apply(Windows.sliding(10mins, 1sec hops).trigger(Repeatedly.forever(AfterPane.elementCountAtLeast(1 .apply(Mean.globally()) .apply(View.asSingleton()); PCollection output = input .apply(ParDo.of(new

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread Eugene Kirpichov
" input elements can pass through the Joiner DoFn before the sideInput corresponding to that element is present" I don't think this is correct. Runners will evaluate a DoFn with side inputs on elements in a given window only after all side inputs are ready (have triggered at least once) in this

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread rahul patwari
With Stateful DoFn, each instance of DoFn will have elements which belong to the same window and have the same key. So, the parallelism is limited by [no. of keys * no. of Windows] In batch mode, as all the elements belong to the same window, i.e. Global Window, the parallelism will be limited by

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread Sam Stephens
Hi Rahul, Thanks for the response. I did consider State, but actually I was tentative because of a different requirement that I didn't specify - the same pipeline should work for batch and stream modes. I'm not sure how Stateful DoFn's behave in the batch world: can you get Beam to pass the

Re: Joining PCollections to aggregates of themselves

2019-10-10 Thread rahul patwari
Hi Sam, (Assuming all the tuples have the same key) One solution could be to use ParDo with State(to calculate mean) => For each element as they occur, calculate the Mean(store the sum and count as the state) and emit the tuple with the new average value. But it will limit the parallelism count.

Joining PCollections to aggregates of themselves

2019-10-10 Thread Sam Stephens
My team and I have been puzzling for a while how to solve a specific problem. Say you have an input stream of tuples: And you want to output a stream containing: Where the average is an aggregation over a 10 minute sliding window of the "value" field. There are a couple of extra