Your getting a view over a PCollection as it is changing over time where every firing adds all those values to the PCollection. The issue is that when you have mutliple firings for the same key and window, your effectively producing a multimap for that window. Unfortunately there are no simple convenience methods to do things like sum the values, or replace with latest inside the SDK and it is currently up to the user to use the multimap view and provide their own logic when dealing with multiple values for a key.
On Thu, Sep 6, 2018 at 1:25 AM Wout Scheepers < [email protected]> wrote: > Hi Lukasz, > > > > Thanks for your help, Bart and I got the side-input windowing issues > figured out. > > > > Next to this, I noticed that when streaming updates into a side-input map > that are already present, it will result in an exception being thrown > saying there are duplicates values [1]. > By this, I assume the new KV<String, Long> are just added to the map > without any check if the map already contains them. Only at the point where > we do a get on the key, the exception is thrown. > > Shouldn’t there be a check when a new entry is added? > > > > I’ve created a workaround for the issue by adding an extra ParDo with a > map as cache: > > > > public static class MapFilter extends DoFn<KV<String, Long>, KV<String, > Long>> { > > > > private HashMap<String, Long> cache; > > > > public MapFilter() { > > this.cache = new HashMap<>(); > > } > > > > @ProcessElement > > public void processElement(ProcessContext c) { > > if (!cache.containsKey(c.element().getKey())) { > > cache.put(c.element().getKey(), c.element().getValue()); > > c.output(c.element()); > > } > > } > > } > > > > PCollectionView<Map<String, Long>> mapping = ticks > > .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and > returns a String > > .apply(ParDo.of(new MapFn())) // turns the String > (json data) into a Map (KV<String, Long>) > > .apply(ParDo.of(new MapFilter())) > > .apply(Window.<KV<String, Long>>into(new GlobalWindows()) > > ... > > > > Any thoughts on this? I’ve noticed that the code that throws the exception > is written by you (PCollectionViews.class:326) > > > > Thanks! > Wout > > > > [1] java.lang.RuntimeException: Exception while fetching side input: > > > > com.google.cloud.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:195) > > > com.google.cloud.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:285) > > > com.google.cloud.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:69) > > > com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:632) > > > com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683) > > > > com.google.cloud.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:130) > > > com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:52) > > > com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300) > > > com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226) > > > com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35) > > > com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75) > > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227) > > > com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135) > > > com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966) > > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > java.lang.Thread.run(Thread.java:745) > > Caused by: > com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException: > java.lang.IllegalArgumentException: Duplicate values for > feature_option#4ff36b600007890000000225 > > > > *From: *Lukasz Cwik <[email protected]> > *Reply-To: *"[email protected]" <[email protected]> > *Date: *Tuesday, 4 September 2018 at 22:57 > *To: *"[email protected]" <[email protected]> > *Subject: *Re: How to update side inputs. > > > > Sorry, > > > > * If this is ok for you, then this is likely the easiest solution but if > not you need to guarantee that the computations are happening with the > updated side input data, you'll need to modify your triggers and pipeline > to be based upon watermarks (suggestion #2). > > > > > > On Tue, Sep 4, 2018 at 1:49 PM Lukasz Cwik <[email protected]> wrote: > > Bart, the error you are hitting is because the other part of the pipeline > is operating on a global window. > > > > Every time a side input is looked up in the DoFn, the main window (global > window in your case) is mapped onto the side input window (a fixed window). > There is no logical mapping from global window to fixed window that makes > sense (you could write your own window mapping function though but it must > be deterministic which isn't very useful for what your trying to do). > You'll want to either: > > 1) update your side input to produce results in the global window > > 2) modify your pipeline so the other part is in something that is > compatible with fixed windows. > > > > For 1, consider > > PCollection<Long> ticks = p > > * // Produce 1 "tick" per 10 seconds* > > .apply(GenerateSequence.*from*(0).withRate(1, > Duration.*standardSeconds*(10))) > > * // Window the ticks into 1-minute windows > *.apply(Window.*into*(FixedWindows.*of*(Duration.*standardMinutes*(1)))); > > > PCollectionView<Map<String, Long>> mapping = ticks > .apply(ParDo.*of*(*new *GetFileWithSideInputData())) // reads the file and > returns a String > .apply(ParDo.*of*(*new *MapFn())) // turns the String > (json data) into a Map (KV<String, Long>) > .apply(Window.<KV<String, Long>>*into*(new GlobalWindows()) > .triggering(Repeatedly.*forever*(AfterProcessingTime > > .*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(10)))) > .withAllowedLateness(Duration.*ZERO*) > .discardingFiredPanes() > ) > .apply(View.<String, Long>*asMap*()); > > > > Note that runners will attempt to update the side input every 10 seconds > but are not required to do so eagerly. This means that results may still be > computed using stale data. If this is ok for you, then this is likely the > easiest solution but if not you need to modify your triggers and pipeline > to be based upon watermarks (suggestion #2). > > > > > > On Tue, Sep 4, 2018 at 1:00 PM Bart Aelterman <[email protected]> > wrote: > > Hi Lukasz, > > > > My solution yields a java.lang.IllegalArgumentException: Attempted to get > side input window for GlobalWindow from non-global WindowFn > > I am now setting a fixedWindow on the ticks (as was shown in the stack > overflow example) and a couple of lines later on the Map. I've tried > removing one or both of them, but I keep getting the same issue. The other > part of my pipeline is operating on a global window at that point. So it > seems there is a mismatch but I'm not sure how to resolve it. > > > > > > > > Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik <[email protected]>: > > Jose, what Bart is recommending is a path that should work. > > > > Bart, what do you mean by conflicting windows? > > > > > > On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <[email protected]> > wrote: > > Hi Jose, > > > > > > You could generate a sequence of "ticks" and use that as input to > continuously update your side input. This is what is suggested in this > stack overflow post: https://stackoverflow.com/a/41271159/1805725. > > However, the CountingInput no longer exists apparently (at least, I can't > find it). > > > > I've been working on this problem myself the last couple of days. I try to > read a file from storage, convert it into a Map and pass that as a side > input. Here is what I've come up with so far, however I am currently still > resolving issues with conflicting windows so this code *does not work*.: > > > > PCollection<Long> ticks = p > > * // Produce 1 "tick" per 10 seconds* > > .apply(GenerateSequence.*from*(0).withRate(1, > Duration.*standardSeconds*(10))) > > * // Window the ticks into 1-minute windows > *.apply(Window.*into*(FixedWindows.*of*(Duration.*standardMinutes*(1)))); > > > PCollectionView<Map<String, Long>> mapping = ticks > .apply(ParDo.*of*(*new *GetFileWithSideInputData())) // reads the file and > returns a String > .apply(ParDo.*of*(*new *MapFn())) // turns the String > (json data) into a Map (KV<String, Long>) > .apply(Window.<KV<String, > Long>>*into*(FixedWindows.*of*(Duration.*standardDays*(1))) > .triggering(Repeatedly.*forever*(AfterProcessingTime > > .*pastFirstElementInPane*().plusDelayOf(Duration.*standardSeconds*(10)))) > .withAllowedLateness(Duration.*ZERO*) > .discardingFiredPanes() > ) > .apply(View.<String, Long>*asMap*()); > > > > If you would be able to get a complete working example, would you mind > sharing it too please? > > Thanks, > > Bart > > > > Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <[email protected]>: > > Hi. > > > > Currently, I'm building a real time pipeline to process user interactions, > I have to filter interactions based on a black-list. I used a side input to > to store the list. The problem is that I'm required to fetch new elements > for the black list every day. > > > > I don't want to restart the pipe to re-create the side input. My second > option was to move black-list to a Redis and fetch table using the > StartBundle annotation, but I think as batches are going to be small, I'm > going to be doing thousands off calls to Redis, and it is going to get > harder as table size increases. > > > > What other options do I have? > > > > Thanks. > > > > > -- > > Met vriendelijke groeten, > > > > Bart Aelterman > > Freelance data scientist > > http://www.bart-aelterman.com > > > > > > > -- > > Met vriendelijke groeten, > > > > Bart Aelterman > > Freelance data scientist > > http://www.bart-aelterman.com > > > >
