I think the problem with your coder is that you specified that the
accumulator type is a HashMap, more specific than just Map. Beam's coder
inference won't select the MapCoder (which only guarantees you get a Map
back, not a HashMap) and falling back to SerializableCoder which is "all or
nothing" and doesn't look at coders registered for any type parameters. If
you change it to a Map<String, Funding> then you should see MapCoder
selected, and it will recursively choose AvroCoder for your types.

On Tue, Dec 5, 2017 at 11:55 AM, Vilhelm von Ehrenheim <
[email protected]> wrote:

> The error got a bit strange there.
>
> Here it is w line breaks:
>
> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state
> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(
> WindmillStateInternals.java:218) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext$StepContext.flushState(
> StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(
> StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.worker.
> StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(
> StreamingDataflowWorker.java:841) java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745) Caused by:
> org.apache.beam.sdk.coders.CoderException: unable to serialize record
> {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=
> Entity(id=8655fe63-b7b8-2835-4559-ea2cb763ad62,
> sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]},
> updatedAt=1504856143000, version=1), org=othera, raisedAmount=null,
> raisedAmountUsd=null, currency=null, series=null, announcedOn=null,
> type=null, investors=[])} org.apache.beam.sdk.coders.
> SerializableCoder.encode(SerializableCoder.java:127)
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> WindmillBag.persistDirectly(WindmillStateInternals.java:575)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> SimpleWindmillState.persist(WindmillStateInternals.java:320)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> WindmillCombiningState.persist(WindmillStateInternals.java:952)
> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(
> WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext$StepContext.flushState(
> StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(
> StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.worker.
> StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(
> StreamingDataflowWorker.java:841) java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745) Caused by: 
> java.io.NotSerializableException:
> co.motherbrain.cyrano.model.Funding java.io.ObjectOutputStream.
> writeObject0(ObjectOutputStream.java:1184) java.io.ObjectOutputStream.
> writeObject(ObjectOutputStream.java:348) java.util.HashMap.
> internalWriteEntries(HashMap.java:1785) 
> java.util.HashMap.writeObject(HashMap.java:1362)
> sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) sun.reflect.
> DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> java.lang.reflect.Method.invoke(Method.java:498)
> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028)
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496)
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124)
> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> WindmillBag.persistDirectly(WindmillStateInternals.java:575)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> SimpleWindmillState.persist(WindmillStateInternals.java:320)
> com.google.cloud.dataflow.worker.WindmillStateInternals$
> WindmillCombiningState.persist(WindmillStateInternals.java:952)
> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(
> WindmillStateInternals.java:216) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext$StepContext.flushState(
> StreamingModeExecutionContext.java:513) com.google.cloud.dataflow.worker.
> StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(
> StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.worker.
> StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(
> StreamingDataflowWorker.java:841) java.util.concurrent.
> ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> java.lang.Thread.run(Thread.java:745)
>
>
>
> On Tue, Dec 5, 2017 at 8:52 PM, Vilhelm von Ehrenheim <
> [email protected]> wrote:
>
>> No the order is not so important as long as it is correct and doesnt emit
>> sums for late values.
>>
>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}
>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}
>>
>> Would produce 3, 4 then 5
>>
>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2}
>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3}
>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1}
>>
>> would produce only 2 and 5 (value 1 is excluded as it is too late
>> compared to value 2).
>>
>> After your tips I wrote up a custom CombineFn that does this by saving
>> the latest records and computing the result as it extracts the output. The
>> data examples I sent were a bit simplified but the result is the similar.
>> The Funding class just has a few more fields. It is also used successfully
>> in a lot of places.
>>
>> Example Funding object:
>>
>> Funding(id=2, updatedAt=1491868800000, version=2, org=the-empire, 
>> raisedAmountUsd=2, announcedOn=1292284800000, type="A")
>>
>> Here is the CombineFn:
>>
>> public class SumLatestFundingFn extends Combine.CombineFn<Funding, 
>> HashMap<String,Funding>, SumLatestFundingFn.Result>{
>>     @Data
>>     @DefaultCoder(AvroCoder.class)
>>     public static class Result {
>>         Long totalFunding;
>>         Funding latestFunding;
>>
>>         public Result() {}
>>         public Result(Long totalFunding, Funding latestFunding) {
>>             this.totalFunding = totalFunding;
>>             this.latestFunding = latestFunding;
>>         }
>>     }
>>
>>     @Override
>>     public HashMap<String, Funding> createAccumulator() { return new 
>> HashMap<>(); }
>>
>>     @Override
>>     public HashMap<String,Funding> addInput(HashMap<String,Funding> accum, 
>> Funding input) {
>>         if (!accum.containsKey(input.getId()) ||
>>                 input.getVersion() > accum.get(input.getId()).getVersion()) {
>>             accum.put(input.getId(), input);
>>         }
>>         return accum;
>>     }
>>
>>     @Override
>>     public HashMap<String,Funding> 
>> mergeAccumulators(Iterable<HashMap<String,Funding>> accums) {
>>         HashMap<String,Funding> merged = createAccumulator();
>>         for (HashMap<String,Funding> accum : accums) {
>>             for (Funding funding : accum.values()) {
>>                 merged = addInput(merged, funding);
>>             }
>>         }
>>         return merged;
>>     }
>>
>>     @Override
>>     public Result extractOutput(HashMap<String,Funding> accum) {
>>         Long totalFunding = accum.values().stream()
>>                 .mapToLong(funding -> 
>> firstNonNull(funding.getRaisedAmountUsd(), 0L)).sum();
>>
>>         Funding latestFunding = accum.values().stream()
>>                 .max((first, second) ->
>>                         (int) (firstNonNull(first.getAnnouncedOn(), 
>> Long.MIN_VALUE) -
>>                                 firstNonNull(second.getAnnouncedOn(), 
>> Long.MIN_VALUE)))
>>                 .orElse(new Funding());
>>
>>         return new Result(totalFunding, latestFunding);
>>     }
>> }
>>
>> I’m using Lombok annotations to generate getters, setters, equals and
>> hashcode. This works in a lot of pipelines I have already.
>>
>> This works great when testing it with teststream but I get a nasy error
>> in dataflow when I use a Repeatedly.forever(AfterPane.elementCountAtLeast(1))
>> trigger. I tried w a less eager trigger but with the same error. If I
>> remove Repeatedly.forever() the pipeline works but gives me incorrect
>> results as the trigger only fire once.
>>
>> Here is the error:
>>
>> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state 
>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:218)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  java.lang.Thread.run(Thread.java:745) Caused by: 
>> org.apache.beam.sdk.coders.CoderException: unable to serialize record 
>> {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity(id=8655fe63-b7b8-2835-4559-ea2cb763ad62,
>>  sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]}, 
>> updatedAt=1504856143000, version=1), org=othera, raisedAmount=null, 
>> raisedAmountUsd=null, currency=null, series=null, announcedOn=null, 
>> type=null, investors=[])} 
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:127)
>>  
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
>>  org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) 
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  java.lang.Thread.run(Thread.java:745) Caused by: 
>> java.io.NotSerializableException: co.motherbrain.cyrano.model.Funding 
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
>> java.util.HashMap.internalWriteEntries(HashMap.java:1785) 
>> java.util.HashMap.writeObject(HashMap.java:1362) 
>> sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) 
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>  java.lang.reflect.Method.invoke(Method.java:498) 
>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) 
>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) 
>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124)
>>  
>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47)
>>  org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) 
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952)
>>  
>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513)
>>  
>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133)
>>  
>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841)
>>  
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>  
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>  java.lang.Thread.run(Thread.java:745)
>>
>> What I find very strange is that the error is from the SerializableCoder.
>> I have specified DefaultCoder(AvroCoder.class) on all my classes (including
>> Funding).
>>
>> Do you think this is a bug or am I missing something? Really strange that
>> the tests work and that it is fine as long as I do not use
>> Repeatedly.forever.
>>
>> Really thankful for your help!
>>
>> // Vilhelm
>>
>> On 5 Dec 2017 02:00, “Lukasz Cwik” <[email protected]> wrote:
>>
>> I believe you can provide ordering if you decide to put any unconsumed
>>> records into state. Every time you read state and check to see if its the
>>> next corresponding id. If so then emit the new sum otherwise push it back
>>> onto state until you get the missing ids allowing you to backfill all the
>>> prior values that should have been emitted.
>>>
>>> On Mon, Dec 4, 2017 at 4:26 PM, Kenneth Knowles <[email protected]> wrote:
>>>
>>>>
>>>>
>>>> On Mon, Dec 4, 2017 at 3:22 PM, Lukasz Cwik <[email protected]> wrote:
>>>>
>>>>> Since processing can happen out of order, for example if the input was:
>>>>> ```
>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>>>> ```
>>>>> would the output be 3 and then 5 or would you still want 1, 4, and
>>>>> then 5?
>>>>>
>>>>
>>>> My own guess here would be 2, 3, then 5.
>>>>
>>>> You won't be able to do this with a sequence of summations, but you
>>>> could Combine.perKey() where the per-"parent_id" accumulator tracks the
>>>> latest value and timestamp for each "id". The trouble is going to be in the
>>>> global window if you have either an unbounded domain for "id" or
>>>> "parent_id" you won't be able to collect any expired state. You can
>>>> accomplish the same with a stateful ParDo using a MapState, and gain tight
>>>> control over when to output. But you have the same question to answer - how
>>>> do you decide when a value is safe to forget about? (or safe to merge into
>>>> a global bucket because it won't be overwritten any more)
>>>>
>>>> Kenn
>>>>
>>>>
>>>>
>>>>> On Mon, Dec 4, 2017 at 2:13 PM, Vilhelm von Ehrenheim <
>>>>> [email protected]> wrote:
>>>>>
>>>>>> Hi all!
>>>>>> First of all great work on the 2.2.0 release! really excited to start
>>>>>> using it.
>>>>>>
>>>>>> I have a problem with how I should construct a pipeline that should
>>>>>> emit a sum of latest values which I hope someone might have some ideas on
>>>>>> how to solve.
>>>>>>
>>>>>> Here is what I have:
>>>>>>
>>>>>> I have a stateful stream of events that contain updates to a long
>>>>>> amonst other things. These events looks something like this
>>>>>>
>>>>>> ```
>>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1}
>>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3}
>>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2}
>>>>>> ```
>>>>>>
>>>>>> I want to emit sums of the `amount` per `parent_id` but only using
>>>>>> the latest record per `id`. Here that would result in sums of 1, 4 and 
>>>>>> then
>>>>>> 5.
>>>>>>
>>>>>> To make it harder I need to do this in a global window with
>>>>>> triggering based on element count. I could maybe combine that w a
>>>>>> processing time trigger though. At least I need a global sum over all
>>>>>> events.
>>>>>>
>>>>>> I have tried to do this with Latest.perKey and Sum.perKey but as you
>>>>>> probably realize that will give some strange results as the downstream 
>>>>>> sum
>>>>>> will not discard elements that are replaced by newer updates in the 
>>>>>> latest
>>>>>> transform.
>>>>>>
>>>>>> I also though I could write a custom CombineFn for this but I need to
>>>>>> do it for different keys which leaves me really confused.
>>>>>>
>>>>>> Any help or pointers are greatly appreciated.
>>>>>>
>>>>>> Thanks!
>>>>>> Vilhelm von Ehrenheim
>>>>>>
>>>>>
>>>>>
>>>>
>>> ​
>>
>
>

Reply via email to