On Mon, Dec 4, 2017 at 3:22 PM, Lukasz Cwik <[email protected]> wrote:

> Since processing can happen out of order, for example if the input was:
> ```
> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
> ```
> would the output be 3 and then 5 or would you still want 1, 4, and then 5?
>

My own guess here would be 2, 3, then 5.

You won't be able to do this with a sequence of summations, but you could
Combine.perKey() where the per-"parent_id" accumulator tracks the latest
value and timestamp for each "id". The trouble is going to be in the global
window if you have either an unbounded domain for "id" or "parent_id" you
won't be able to collect any expired state. You can accomplish the same
with a stateful ParDo using a MapState, and gain tight control over when to
output. But you have the same question to answer - how do you decide when a
value is safe to forget about? (or safe to merge into a global bucket
because it won't be overwritten any more)

Kenn



> On Mon, Dec 4, 2017 at 2:13 PM, Vilhelm von Ehrenheim <
> [email protected]> wrote:
>
>> Hi all!
>> First of all great work on the 2.2.0 release! really excited to start
>> using it.
>>
>> I have a problem with how I should construct a pipeline that should emit
>> a sum of latest values which I hope someone might have some ideas on how to
>> solve.
>>
>> Here is what I have:
>>
>> I have a stateful stream of events that contain updates to a long amonst
>> other things. These events looks something like this
>>
>> ```
>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>> ```
>>
>> I want to emit sums of the `amount` per `parent_id` but only using the
>> latest record per `id`. Here that would result in sums of 1, 4 and then 5.
>>
>> To make it harder I need to do this in a global window with triggering
>> based on element count. I could maybe combine that w a processing time
>> trigger though. At least I need a global sum over all events.
>>
>> I have tried to do this with Latest.perKey and Sum.perKey but as you
>> probably realize that will give some strange results as the downstream sum
>> will not discard elements that are replaced by newer updates in the latest
>> transform.
>>
>> I also though I could write a custom CombineFn for this but I need to do
>> it for different keys which leaves me really confused.
>>
>> Any help or pointers are greatly appreciated.
>>
>> Thanks!
>> Vilhelm von Ehrenheim
>>
>
>

Reply via email to