That made all the sense and solved my problem! Thank you so much! On Tue, Dec 5, 2017 at 9:05 PM, Kenneth Knowles <[email protected]> wrote:
> I think the problem with your coder is that you specified that the > accumulator type is a HashMap, more specific than just Map. Beam's coder > inference won't select the MapCoder (which only guarantees you get a Map > back, not a HashMap) and falling back to SerializableCoder which is "all or > nothing" and doesn't look at coders registered for any type parameters. If > you change it to a Map<String, Funding> then you should see MapCoder > selected, and it will recursively choose AvroCoder for your types. > > On Tue, Dec 5, 2017 at 11:55 AM, Vilhelm von Ehrenheim < > [email protected]> wrote: > >> The error got a bit strange there. >> >> Here it is w line breaks: >> >> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state >> com.google.cloud.dataflow.worker.WindmillStateInternals.pers >> ist(WindmillStateInternals.java:218) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext$StepContext.flushState(Stre >> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro >> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work >> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8. >> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo >> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) Caused by: >> org.apache.beam.sdk.coders.CoderException: unable to serialize record >> {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity( >> id=8655fe63-b7b8-2835-4559-ea2cb763ad62, >> sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]}, >> updatedAt=1504856143000, version=1), org=othera, raisedAmount=null, >> raisedAmountUsd=null, currency=null, series=null, announcedOn=null, >> type=null, investors=[])} org.apache.beam.sdk.coders.Ser >> ializableCoder.encode(SerializableCoder.java:127) >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47) >> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind >> millBag.persistDirectly(WindmillStateInternals.java:575) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Simp >> leWindmillState.persist(WindmillStateInternals.java:320) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind >> millCombiningState.persist(WindmillStateInternals.java:952) >> com.google.cloud.dataflow.worker.WindmillStateInternals.pers >> ist(WindmillStateInternals.java:216) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext$StepContext.flushState(Stre >> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro >> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work >> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8. >> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo >> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) Caused by: java.io >> .NotSerializableException: co.motherbrain.cyrano.model.Funding >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> java.util.HashMap.internalWriteEntries(HashMap.java:1785) >> java.util.HashMap.writeObject(HashMap.java:1362) >> sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> java.lang.reflect.Method.invoke(Method.java:498) >> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) >> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124) >> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47) >> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind >> millBag.persistDirectly(WindmillStateInternals.java:575) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Simp >> leWindmillState.persist(WindmillStateInternals.java:320) >> com.google.cloud.dataflow.worker.WindmillStateInternals$Wind >> millCombiningState.persist(WindmillStateInternals.java:952) >> com.google.cloud.dataflow.worker.WindmillStateInternals.pers >> ist(WindmillStateInternals.java:216) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext$StepContext.flushState(Stre >> amingModeExecutionContext.java:513) com.google.cloud.dataflow.work >> er.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker.pro >> cess(StreamingDataflowWorker.java:1071) com.google.cloud.dataflow.work >> er.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8. >> run(StreamingDataflowWorker.java:841) java.util.concurrent.ThreadPoo >> lExecutor.runWorker(ThreadPoolExecutor.java:1142) >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >> java.lang.Thread.run(Thread.java:745) >> >> >> >> On Tue, Dec 5, 2017 at 8:52 PM, Vilhelm von Ehrenheim < >> [email protected]> wrote: >> >>> No the order is not so important as long as it is correct and doesnt >>> emit sums for late values. >>> >>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3} >>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1} >>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2} >>> >>> Would produce 3, 4 then 5 >>> >>> {"id": "1", "parent_id": "a", "timestamp": 3, "amount": 2} >>> {"id": "2", "parent_id": "a", "timestamp": 2, "amount": 3} >>> {"id": "1", "parent_id": "a", "timestamp": 1. "amount": 1} >>> >>> would produce only 2 and 5 (value 1 is excluded as it is too late >>> compared to value 2). >>> >>> After your tips I wrote up a custom CombineFn that does this by saving >>> the latest records and computing the result as it extracts the output. The >>> data examples I sent were a bit simplified but the result is the similar. >>> The Funding class just has a few more fields. It is also used successfully >>> in a lot of places. >>> >>> Example Funding object: >>> >>> Funding(id=2, updatedAt=1491868800000, version=2, org=the-empire, >>> raisedAmountUsd=2, announcedOn=1292284800000, type="A") >>> >>> Here is the CombineFn: >>> >>> public class SumLatestFundingFn extends Combine.CombineFn<Funding, >>> HashMap<String,Funding>, SumLatestFundingFn.Result>{ >>> @Data >>> @DefaultCoder(AvroCoder.class) >>> public static class Result { >>> Long totalFunding; >>> Funding latestFunding; >>> >>> public Result() {} >>> public Result(Long totalFunding, Funding latestFunding) { >>> this.totalFunding = totalFunding; >>> this.latestFunding = latestFunding; >>> } >>> } >>> >>> @Override >>> public HashMap<String, Funding> createAccumulator() { return new >>> HashMap<>(); } >>> >>> @Override >>> public HashMap<String,Funding> addInput(HashMap<String,Funding> accum, >>> Funding input) { >>> if (!accum.containsKey(input.getId()) || >>> input.getVersion() > accum.get(input.getId()).getVersion()) >>> { >>> accum.put(input.getId(), input); >>> } >>> return accum; >>> } >>> >>> @Override >>> public HashMap<String,Funding> >>> mergeAccumulators(Iterable<HashMap<String,Funding>> accums) { >>> HashMap<String,Funding> merged = createAccumulator(); >>> for (HashMap<String,Funding> accum : accums) { >>> for (Funding funding : accum.values()) { >>> merged = addInput(merged, funding); >>> } >>> } >>> return merged; >>> } >>> >>> @Override >>> public Result extractOutput(HashMap<String,Funding> accum) { >>> Long totalFunding = accum.values().stream() >>> .mapToLong(funding -> >>> firstNonNull(funding.getRaisedAmountUsd(), 0L)).sum(); >>> >>> Funding latestFunding = accum.values().stream() >>> .max((first, second) -> >>> (int) (firstNonNull(first.getAnnouncedOn(), >>> Long.MIN_VALUE) - >>> firstNonNull(second.getAnnouncedOn(), >>> Long.MIN_VALUE))) >>> .orElse(new Funding()); >>> >>> return new Result(totalFunding, latestFunding); >>> } >>> } >>> >>> I’m using Lombok annotations to generate getters, setters, equals and >>> hashcode. This works in a lot of pipelines I have already. >>> >>> This works great when testing it with teststream but I get a nasy error >>> in dataflow when I use a >>> Repeatedly.forever(AfterPane.elementCountAtLeast(1)) >>> trigger. I tried w a less eager trigger but with the same error. If I >>> remove Repeatedly.forever() the pipeline works but gives me incorrect >>> results as the trigger only fire once. >>> >>> Here is the error: >>> >>> (6e1443def795dcc9): java.lang.RuntimeException: Unable to persist state >>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:218) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) Caused by: >>> org.apache.beam.sdk.coders.CoderException: unable to serialize record >>> {8655fe63-b7b8-2835-4559-ea2cb763ad62=Funding(super=Entity(id=8655fe63-b7b8-2835-4559-ea2cb763ad62, >>> sources={crunchbase=[8655fe63-b7b8-2835-4559-ea2cb763ad62]}, >>> updatedAt=1504856143000, version=1), org=othera, raisedAmount=null, >>> raisedAmountUsd=null, currency=null, series=null, announcedOn=null, >>> type=null, investors=[])} >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:127) >>> >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47) >>> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) Caused by: >>> java.io.NotSerializableException: co.motherbrain.cyrano.model.Funding >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> java.util.HashMap.internalWriteEntries(HashMap.java:1785) >>> java.util.HashMap.writeObject(HashMap.java:1362) >>> sun.reflect.GeneratedMethodAccessor284.invoke(Unknown Source) >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> java.lang.reflect.Method.invoke(Method.java:498) >>> java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:1028) >>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1496) >>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) >>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) >>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:124) >>> >>> org.apache.beam.sdk.coders.SerializableCoder.encode(SerializableCoder.java:47) >>> org.apache.beam.sdk.coders.Coder.encode(Coder.java:143) >>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillBag.persistDirectly(WindmillStateInternals.java:575) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals$SimpleWindmillState.persist(WindmillStateInternals.java:320) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals$WindmillCombiningState.persist(WindmillStateInternals.java:952) >>> >>> com.google.cloud.dataflow.worker.WindmillStateInternals.persist(WindmillStateInternals.java:216) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.flushState(StreamingModeExecutionContext.java:513) >>> >>> com.google.cloud.dataflow.worker.StreamingModeExecutionContext.flushState(StreamingModeExecutionContext.java:363) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1071) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:133) >>> >>> com.google.cloud.dataflow.worker.StreamingDataflowWorker$8.run(StreamingDataflowWorker.java:841) >>> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> java.lang.Thread.run(Thread.java:745) >>> >>> What I find very strange is that the error is from the >>> SerializableCoder. I have specified DefaultCoder(AvroCoder.class) on all my >>> classes (including Funding). >>> >>> Do you think this is a bug or am I missing something? Really strange >>> that the tests work and that it is fine as long as I do not use >>> Repeatedly.forever. >>> >>> Really thankful for your help! >>> >>> // Vilhelm >>> >>> On 5 Dec 2017 02:00, “Lukasz Cwik” <[email protected]> wrote: >>> >>> I believe you can provide ordering if you decide to put any unconsumed >>>> records into state. Every time you read state and check to see if its the >>>> next corresponding id. If so then emit the new sum otherwise push it back >>>> onto state until you get the missing ids allowing you to backfill all the >>>> prior values that should have been emitted. >>>> >>>> On Mon, Dec 4, 2017 at 4:26 PM, Kenneth Knowles <[email protected]> wrote: >>>> >>>>> >>>>> >>>>> On Mon, Dec 4, 2017 at 3:22 PM, Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> Since processing can happen out of order, for example if the input >>>>>> was: >>>>>> ``` >>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3} >>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1} >>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2} >>>>>> ``` >>>>>> would the output be 3 and then 5 or would you still want 1, 4, and >>>>>> then 5? >>>>>> >>>>> >>>>> My own guess here would be 2, 3, then 5. >>>>> >>>>> You won't be able to do this with a sequence of summations, but you >>>>> could Combine.perKey() where the per-"parent_id" accumulator tracks the >>>>> latest value and timestamp for each "id". The trouble is going to be in >>>>> the >>>>> global window if you have either an unbounded domain for "id" or >>>>> "parent_id" you won't be able to collect any expired state. You can >>>>> accomplish the same with a stateful ParDo using a MapState, and gain tight >>>>> control over when to output. But you have the same question to answer - >>>>> how >>>>> do you decide when a value is safe to forget about? (or safe to merge into >>>>> a global bucket because it won't be overwritten any more) >>>>> >>>>> Kenn >>>>> >>>>> >>>>> >>>>>> On Mon, Dec 4, 2017 at 2:13 PM, Vilhelm von Ehrenheim < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Hi all! >>>>>>> First of all great work on the 2.2.0 release! really excited to >>>>>>> start using it. >>>>>>> >>>>>>> I have a problem with how I should construct a pipeline that should >>>>>>> emit a sum of latest values which I hope someone might have some ideas >>>>>>> on >>>>>>> how to solve. >>>>>>> >>>>>>> Here is what I have: >>>>>>> >>>>>>> I have a stateful stream of events that contain updates to a long >>>>>>> amonst other things. These events looks something like this >>>>>>> >>>>>>> ``` >>>>>>> {"id": "1", parent_id: "a", "timestamp": 1. "amount": 1} >>>>>>> {"id": "2", parent_id: "a", "timestamp": 2, "amount": 3} >>>>>>> {"id": "1", parent_id: "a", "timestamp": 3, "amount": 2} >>>>>>> ``` >>>>>>> >>>>>>> I want to emit sums of the `amount` per `parent_id` but only using >>>>>>> the latest record per `id`. Here that would result in sums of 1, 4 and >>>>>>> then >>>>>>> 5. >>>>>>> >>>>>>> To make it harder I need to do this in a global window with >>>>>>> triggering based on element count. I could maybe combine that w a >>>>>>> processing time trigger though. At least I need a global sum over all >>>>>>> events. >>>>>>> >>>>>>> I have tried to do this with Latest.perKey and Sum.perKey but as you >>>>>>> probably realize that will give some strange results as the downstream >>>>>>> sum >>>>>>> will not discard elements that are replaced by newer updates in the >>>>>>> latest >>>>>>> transform. >>>>>>> >>>>>>> I also though I could write a custom CombineFn for this but I need >>>>>>> to do it for different keys which leaves me really confused. >>>>>>> >>>>>>> Any help or pointers are greatly appreciated. >>>>>>> >>>>>>> Thanks! >>>>>>> Vilhelm von Ehrenheim >>>>>>> >>>>>> >>>>>> >>>>> >>>> >>> >> >> >
