Hi Lukasz,
Thanks for your help, Bart and I got the side-input windowing issues figured
out.
Next to this, I noticed that when streaming updates into a side-input map that
are already present, it will result in an exception being thrown saying there
are duplicates values [1].
By this, I assume the new KV<String, Long> are just added to the map without
any check if the map already contains them. Only at the point where we do a get
on the key, the exception is thrown.
Shouldn’t there be a check when a new entry is added?
I’ve created a workaround for the issue by adding an extra ParDo with a map as
cache:
public static class MapFilter extends DoFn<KV<String, Long>, KV<String, Long>> {
private HashMap<String, Long> cache;
public MapFilter() {
this.cache = new HashMap<>();
}
@ProcessElement
public void processElement(ProcessContext c) {
if (!cache.containsKey(c.element().getKey())) {
cache.put(c.element().getKey(), c.element().getValue());
c.output(c.element());
}
}
}
PCollectionView<Map<String, Long>> mapping = ticks
.apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
returns a String
.apply(ParDo.of(new MapFn())) // turns the String (json
data) into a Map (KV<String, Long>)
.apply(ParDo.of(new MapFilter()))
.apply(Window.<KV<String, Long>>into(new GlobalWindows())
...
Any thoughts on this? I’ve noticed that the code that throws the exception is
written by you (PCollectionViews.class:326)
Thanks!
Wout
[1] java.lang.RuntimeException: Exception while fetching side input:
com.google.cloud.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:195)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:285)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:69)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:632)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:130)
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:52)
com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)
com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35)
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
Caused by:
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
java.lang.IllegalArgumentException: Duplicate values for
feature_option#4ff36b600007890000000225
From: Lukasz Cwik <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, 4 September 2018 at 22:57
To: "[email protected]" <[email protected]>
Subject: Re: How to update side inputs.
Sorry,
* If this is ok for you, then this is likely the easiest solution but if not
you need to guarantee that the computations are happening with the updated side
input data, you'll need to modify your triggers and pipeline to be based upon
watermarks (suggestion #2).
On Tue, Sep 4, 2018 at 1:49 PM Lukasz Cwik
<[email protected]<mailto:[email protected]>> wrote:
Bart, the error you are hitting is because the other part of the pipeline is
operating on a global window.
Every time a side input is looked up in the DoFn, the main window (global
window in your case) is mapped onto the side input window (a fixed window).
There is no logical mapping from global window to fixed window that makes sense
(you could write your own window mapping function though but it must be
deterministic which isn't very useful for what your trying to do). You'll want
to either:
1) update your side input to produce results in the global window
2) modify your pipeline so the other part is in something that is compatible
with fixed windows.
For 1, consider
PCollection<Long> ticks = p
// Produce 1 "tick" per 10 seconds
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollectionView<Map<String, Long>> mapping = ticks
.apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
returns a String
.apply(ParDo.of(new MapFn())) // turns the String (json
data) into a Map (KV<String, Long>)
.apply(Window.<KV<String, Long>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply(View.<String, Long>asMap());
Note that runners will attempt to update the side input every 10 seconds but
are not required to do so eagerly. This means that results may still be
computed using stale data. If this is ok for you, then this is likely the
easiest solution but if not you need to modify your triggers and pipeline to be
based upon watermarks (suggestion #2).
On Tue, Sep 4, 2018 at 1:00 PM Bart Aelterman
<[email protected]<mailto:[email protected]>> wrote:
Hi Lukasz,
My solution yields a java.lang.IllegalArgumentException: Attempted to get side
input window for GlobalWindow from non-global WindowFn
I am now setting a fixedWindow on the ticks (as was shown in the stack overflow
example) and a couple of lines later on the Map. I've tried removing one or
both of them, but I keep getting the same issue. The other part of my pipeline
is operating on a global window at that point. So it seems there is a mismatch
but I'm not sure how to resolve it.
Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik
<[email protected]<mailto:[email protected]>>:
Jose, what Bart is recommending is a path that should work.
Bart, what do you mean by conflicting windows?
On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman
<[email protected]<mailto:[email protected]>> wrote:
Hi Jose,
You could generate a sequence of "ticks" and use that as input to continuously
update your side input. This is what is suggested in this stack overflow post:
https://stackoverflow.com/a/41271159/1805725.
However, the CountingInput no longer exists apparently (at least, I can't find
it).
I've been working on this problem myself the last couple of days. I try to read
a file from storage, convert it into a Map and pass that as a side input. Here
is what I've come up with so far, however I am currently still resolving issues
with conflicting windows so this code does not work.:
PCollection<Long> ticks = p
// Produce 1 "tick" per 10 seconds
.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
// Window the ticks into 1-minute windows
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
PCollectionView<Map<String, Long>> mapping = ticks
.apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
returns a String
.apply(ParDo.of(new MapFn())) // turns the String (json
data) into a Map (KV<String, Long>)
.apply(Window.<KV<String,
Long>>into(FixedWindows.of(Duration.standardDays(1)))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes()
)
.apply(View.<String, Long>asMap());
If you would be able to get a complete working example, would you mind sharing
it too please?
Thanks,
Bart
Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo
<[email protected]<mailto:[email protected]>>:
Hi.
Currently, I'm building a real time pipeline to process user interactions, I
have to filter interactions based on a black-list. I used a side input to to
store the list. The problem is that I'm required to fetch new elements for the
black list every day.
I don't want to restart the pipe to re-create the side input. My second option
was to move black-list to a Redis and fetch table using the StartBundle
annotation, but I think as batches are going to be small, I'm going to be doing
thousands off calls to Redis, and it is going to get harder as table size
increases.
What other options do I have?
Thanks.
--
Met vriendelijke groeten,
Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com
--
Met vriendelijke groeten,
Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com