Hi Lukasz,

Thanks for your help, Bart and I got the side-input windowing issues figured 
out.

Next to this, I noticed that when streaming updates into a side-input map that 
are already present, it will result in an exception being thrown saying there 
are duplicates values [1].
By this, I assume the new KV<String, Long> are just added to the map without 
any check if the map already contains them. Only at the point where we do a get 
on the key, the exception is thrown.
Shouldn’t there be a check when a new entry is added?

I’ve created a workaround for the issue by adding an extra ParDo with a map as 
cache:

public static class MapFilter extends DoFn<KV<String, Long>, KV<String, Long>> {

    private HashMap<String, Long> cache;

    public MapFilter() {
        this.cache = new HashMap<>();
    }

    @ProcessElement
    public void processElement(ProcessContext c) {
        if (!cache.containsKey(c.element().getKey())) {
            cache.put(c.element().getKey(), c.element().getValue());
            c.output(c.element());
        }
    }
}

PCollectionView<Map<String, Long>> mapping = ticks
  .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and 
returns a String
  .apply(ParDo.of(new MapFn()))                    // turns the String (json 
data) into a Map (KV<String, Long>)
  .apply(ParDo.of(new MapFilter()))
  .apply(Window.<KV<String, Long>>into(new GlobalWindows())
...

Any thoughts on this? I’ve noticed that the code that throws the exception is 
written by you (PCollectionViews.class:326)

Thanks!
Wout

[1] java.lang.RuntimeException: Exception while fetching side input:
        
com.google.cloud.dataflow.worker.StateFetcher.fetchSideInput(StateFetcher.java:195)
        
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.fetchSideInput(StreamingModeExecutionContext.java:285)
        
com.google.cloud.dataflow.worker.StreamingModeExecutionContext.access$500(StreamingModeExecutionContext.java:69)
        
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:632)
        
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
       
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.getReadyWindows(StreamingSideInputFetcher.java:130)
        
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.startBundle(StreamingSideInputDoFnRunner.java:52)
        
com.google.cloud.dataflow.worker.SimpleParDoFn.reallyStartBundle(SimpleParDoFn.java:300)
        
com.google.cloud.dataflow.worker.SimpleParDoFn.startBundle(SimpleParDoFn.java:226)
        
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.start(ParDoOperation.java:35)
        
com.google.cloud.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:75)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1227)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:135)
        
com.google.cloud.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:966)
        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)
Caused by: 
com.google.cloud.dataflow.worker.repackaged.com.google.common.util.concurrent.UncheckedExecutionException:
 java.lang.IllegalArgumentException: Duplicate values for 
feature_option#4ff36b600007890000000225

From: Lukasz Cwik <[email protected]>
Reply-To: "[email protected]" <[email protected]>
Date: Tuesday, 4 September 2018 at 22:57
To: "[email protected]" <[email protected]>
Subject: Re: How to update side inputs.

Sorry,

* If this is ok for you, then this is likely the easiest solution but if not 
you need to guarantee that the computations are happening with the updated side 
input data, you'll need to modify your triggers and pipeline to be based upon 
watermarks (suggestion #2).


On Tue, Sep 4, 2018 at 1:49 PM Lukasz Cwik 
<[email protected]<mailto:[email protected]>> wrote:
Bart, the error you are hitting is because the other part of the pipeline is 
operating on a global window.

Every time a side input is looked up in the DoFn, the main window (global 
window in your case) is mapped onto the side input window (a fixed window). 
There is no logical mapping from global window to fixed window that makes sense 
(you could write your own window mapping function though but it must be 
deterministic which isn't very useful for what your trying to do). You'll want 
to either:
1) update your side input to produce results in the global window
2) modify your pipeline so the other part is in something that is compatible 
with fixed windows.

For 1, consider

PCollection<Long> ticks = p

  // Produce 1 "tick" per 10 seconds

  .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

PCollectionView<Map<String, Long>> mapping = ticks
  .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and 
returns a String
  .apply(ParDo.of(new MapFn()))                    // turns the String (json 
data) into a Map (KV<String, Long>)
  .apply(Window.<KV<String, Long>>into(new GlobalWindows())
     .triggering(Repeatedly.forever(AfterProcessingTime
       .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
     .withAllowedLateness(Duration.ZERO)
     .discardingFiredPanes()
   )
   .apply(View.<String, Long>asMap());

Note that runners will attempt to update the side input every 10 seconds but 
are not required to do so eagerly. This means that results may still be 
computed using stale data. If this is ok for you, then this is likely the 
easiest solution but if not you need to modify your triggers and pipeline to be 
based upon watermarks (suggestion #2).


On Tue, Sep 4, 2018 at 1:00 PM Bart Aelterman 
<[email protected]<mailto:[email protected]>> wrote:
Hi Lukasz,

My solution yields a java.lang.IllegalArgumentException: Attempted to get side 
input window for GlobalWindow from non-global WindowFn
I am now setting a fixedWindow on the ticks (as was shown in the stack overflow 
example) and a couple of lines later on the Map. I've tried removing one or 
both of them, but I keep getting the same issue. The other part of my pipeline 
is operating on a global window at that point. So it seems there is a mismatch 
but I'm not sure how to resolve it.



Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik 
<[email protected]<mailto:[email protected]>>:
Jose, what Bart is recommending is a path that should work.

Bart, what do you mean by conflicting windows?


On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman 
<[email protected]<mailto:[email protected]>> wrote:
Hi Jose,


You could generate a sequence of "ticks" and use that as input to continuously 
update your side input. This is what is suggested in this stack overflow post: 
https://stackoverflow.com/a/41271159/1805725.
However, the CountingInput no longer exists apparently (at least, I can't find 
it).

I've been working on this problem myself the last couple of days. I try to read 
a file from storage, convert it into a Map and pass that as a side input. Here 
is what I've come up with so far, however I am currently still resolving issues 
with conflicting windows so this code does not work.:


PCollection<Long> ticks = p

  // Produce 1 "tick" per 10 seconds

  .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
  // Window the ticks into 1-minute windows
  .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));

PCollectionView<Map<String, Long>> mapping = ticks
  .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and 
returns a String
  .apply(ParDo.of(new MapFn()))                    // turns the String (json 
data) into a Map (KV<String, Long>)
  .apply(Window.<KV<String, 
Long>>into(FixedWindows.of(Duration.standardDays(1)))
     .triggering(Repeatedly.forever(AfterProcessingTime
       .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
     .withAllowedLateness(Duration.ZERO)
     .discardingFiredPanes()
   )
   .apply(View.<String, Long>asMap());


If you would be able to get a complete working example, would you mind sharing 
it too please?

Thanks,

Bart

Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo 
<[email protected]<mailto:[email protected]>>:
Hi.

Currently, I'm building a real time pipeline to process user interactions, I 
have to filter interactions based on a black-list. I used a side input to to 
store the list. The problem is that I'm required to fetch new elements for the 
black list every day.

I don't want to restart the pipe to re-create the side input. My second option 
was to move black-list to a Redis and fetch table using the StartBundle 
annotation, but I think as batches are going to be small, I'm going to be doing 
thousands off calls to Redis, and it is going to get harder as table size 
increases.

What other options do I have?

Thanks.


--
Met vriendelijke groeten,

Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com



--
Met vriendelijke groeten,

Bart Aelterman
Freelance data scientist
http://www.bart-aelterman.com

Reply via email to