No the order is not so important as long as it is correct and doesnt emit
sums for late values.

{"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
{"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}
{"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}

Would produce 3, 4 then 5

{"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}
{"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
{"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}

would produce only 2 and 5 (value 1 is excluded as it is too late compared
to value 2).

After your tips I wrote up a custom CombineFn that does this by saving the
latest records and computing the result as it extracts the output. The data
examples I sent were a bit simplified but the result is the similar. The
Funding class just has a few more fields. It is also used successfully in a
lot of places.

Example Funding object:

Funding(id=2, updatedAt=1491868800000, version=2, org=the-empire,
raisedAmountUsd=2, announcedOn=1292284800000, type="A")

Here is the CombineFn:

public class SumLatestFundingFn extends Combine.CombineFn<Funding,
HashMap<String,Funding>, SumLatestFundingFn.Result>{
    @Data
    @DefaultCoder(AvroCoder.class)
    public static class Result {
        Long totalFunding;
        Funding latestFunding;

        public Result() {}
        public Result(Long totalFunding, Funding latestFunding) {
            this.totalFunding = totalFunding;
            this.latestFunding = latestFunding;
        }
    }

    @Override
    public HashMap<String, Funding> createAccumulator() { return new
HashMap<>(); }

    @Override
    public HashMap<String,Funding> addInput(HashMap<String,Funding>
accum, Funding input) {
        if (!accum.containsKey(input.getId()) ||
                input.getVersion() > accum.get(input.getId()).getVersion()) {
            accum.put(input.getId(), input);
        }
        return accum;
    }

    @Override
    public HashMap<String,Funding>
mergeAccumulators(Iterable<HashMap<String,Funding>> accums) {
        HashMap<String,Funding> merged = createAccumulator();
        for (HashMap<String,Funding> accum : accums) {
            for (Funding funding : accum.values()) {
                merged = addInput(merged, funding);
            }
        }
        return merged;
    }

    @Override
    public Result extractOutput(HashMap<String,Funding> accum) {
        Long totalFunding = accum.values().stream()
                .mapToLong(funding ->
firstNonNull(funding.getRaisedAmountUsd(), 0L)).sum();

        Funding latestFunding = accum.values().stream()
                .max((first, second) ->
                        (int) (firstNonNull(first.getAnnouncedOn(),
Long.MIN_VALUE) -
                                firstNonNull(second.getAnnouncedOn(),
Long.MIN_VALUE)))
                .orElse(new Funding());

        return new Result(totalFunding, latestFunding);
    }
}

I’m using Lombok annotations to generate getters, setters, equals and
hashcode. This works in a lot of pipelines I have already.

This works great when testing it with teststream but I get a nasy error in
dataflow when I use a Repeatedly.forever(AfterPane.elementCountAtLeast(1))
trigger. I tried w a less eager trigger but with the same error. If I
remove Repeatedly.forever() the pipeline works but gives me incorrect
results as the trigger only fire once.

Here is the error:

(6e1443def795dcc9): java.lang.RuntimeException: Unable to persist
state 
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:218)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745) Caused by:
org.apache.beam.sdk.coders.CoderException: unable to serialize record
{8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity(id=8655fe63-b7b8-2835-4559-ea2cb763ad62,
sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]},
updatedAt=1504856143000, version=1), org=othera, raisedAmount=null,
raisedAmountUsd=null, currency=null, series=null, announcedOn=null,
type=null, investors=[])}
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:127)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745) Caused by:
java.io.NotSerializableException: co.motherbrain.cyrano.model.Funding
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
java.util.HashMap.internalWriteEntries(HashMap.java:1785)
java.util.HashMap.writeObject(HashMap.java:1362)
sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124)
org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

What I find very strange is that the error is from the SerializableCoder. I
have specified DefaultCoder(AvroCoder.class) on all my classes (including
Funding).

Do you think this is a bug or am I missing something? Really strange that
the tests work and that it is fine as long as I do not use
Repeatedly.forever.

Really thankful for your help!

// Vilhelm

On 5 Dec 2017 02:00, “Lukasz Cwik” <[email protected]> wrote:

I believe you can provide ordering if you decide to put any unconsumed
> records into state. Every time you read state and check to see if its the
> next corresponding id. If so then emit the new sum otherwise push it back
> onto state until you get the missing ids allowing you to backfill all the
> prior values that should have been emitted.
>
> On Mon, Dec 4, 2017 at 4:26 PM, Kenneth Knowles <[email protected]> wrote:
>
>>
>>
>> On Mon, Dec 4, 2017 at 3:22 PM, Lukasz Cwik <[email protected]> wrote:
>>
>>> Since processing can happen out of order, for example if the input was:
>>> ```
>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>> ```
>>> would the output be 3 and then 5 or would you still want 1, 4, and then
>>> 5?
>>>
>>
>> My own guess here would be 2, 3, then 5.
>>
>> You won't be able to do this with a sequence of summations, but you could
>> Combine.perKey() where the per-"parent_id" accumulator tracks the latest
>> value and timestamp for each "id". The trouble is going to be in the global
>> window if you have either an unbounded domain for "id" or "parent_id" you
>> won't be able to collect any expired state. You can accomplish the same
>> with a stateful ParDo using a MapState, and gain tight control over when to
>> output. But you have the same question to answer - how do you decide when a
>> value is safe to forget about? (or safe to merge into a global bucket
>> because it won't be overwritten any more)
>>
>> Kenn
>>
>>
>>
>>> On Mon, Dec 4, 2017 at 2:13 PM, Vilhelm von Ehrenheim <
>>> [email protected]> wrote:
>>>
>>>> Hi all!
>>>> First of all great work on the 2.2.0 release! really excited to start
>>>> using it.
>>>>
>>>> I have a problem with how I should construct a pipeline that should
>>>> emit a sum of latest values which I hope someone might have some ideas on
>>>> how to solve.
>>>>
>>>> Here is what I have:
>>>>
>>>> I have a stateful stream of events that contain updates to a long
>>>> amonst other things. These events looks something like this
>>>>
>>>> ```
>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>>> ```
>>>>
>>>> I want to emit sums of the `amount` per `parent_id` but only using the
>>>> latest record per `id`. Here that would result in sums of 1, 4 and then 5.
>>>>
>>>> To make it harder I need to do this in a global window with triggering
>>>> based on element count. I could maybe combine that w a processing time
>>>> trigger though. At least I need a global sum over all events.
>>>>
>>>> I have tried to do this with Latest.perKey and Sum.perKey but as you
>>>> probably realize that will give some strange results as the downstream sum
>>>> will not discard elements that are replaced by newer updates in the latest
>>>> transform.
>>>>
>>>> I also though I could write a custom CombineFn for this but I need to
>>>> do it for different keys which leaves me really confused.
>>>>
>>>> Any help or pointers are greatly appreciated.
>>>>
>>>> Thanks!
>>>> Vilhelm von Ehrenheim
>>>>
>>>
>>>
>>
> ​

Reply via email to