Sorry,

* If this is ok for you, then this is likely the easiest solution but if
not you need to guarantee that the computations are happening with the
updated side input data, you'll need to modify your triggers and pipeline
to be based upon watermarks (suggestion #2).


On Tue, Sep 4, 2018 at 1:49 PM Lukasz Cwik <[email protected]> wrote:

> Bart, the error you are hitting is because the other part of the pipeline
> is operating on a global window.
>
> Every time a side input is looked up in the DoFn, the main window (global
> window in your case) is mapped onto the side input window (a fixed window).
> There is no logical mapping from global window to fixed window that makes
> sense (you could write your own window mapping function though but it must
> be deterministic which isn't very useful for what your trying to do).
> You'll want to either:
> 1) update your side input to produce results in the global window
> 2) modify your pipeline so the other part is in something that is
> compatible with fixed windows.
>
> For 1, consider
>
> PCollection<Long> ticks = p
>
>   // Produce 1 "tick" per 10 seconds
>
>   .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
>   // Window the ticks into 1-minute windows
>   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
>
>
> PCollectionView<Map<String, Long>> mapping = ticks
> .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
> returns a String
> .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map
> (KV<String, Long>)
> .apply(Window.<KV<String, Long>>into(new GlobalWindows())
> .triggering(Repeatedly.forever(AfterProcessingTime
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
> .withAllowedLateness(Duration.ZERO)
> .discardingFiredPanes()
> )
> .apply(View.<String, Long>asMap());
>
>
> Note that runners will attempt to update the side input every 10 seconds
> but are not required to do so eagerly. This means that results may still be
> computed using stale data. If this is ok for you, then this is likely the
> easiest solution but if not you need to modify your triggers and pipeline
> to be based upon watermarks (suggestion #2).
>
>
> On Tue, Sep 4, 2018 at 1:00 PM Bart Aelterman <[email protected]>
> wrote:
>
>> Hi Lukasz,
>>
>> My solution yields a java.lang.IllegalArgumentException: Attempted to get
>> side input window for GlobalWindow from non-global WindowFn
>> I am now setting a fixedWindow on the ticks (as was shown in the stack
>> overflow example) and a couple of lines later on the Map. I've tried
>> removing one or both of them, but I keep getting the same issue. The other
>> part of my pipeline is operating on a global window at that point. So it
>> seems there is a mismatch but I'm not sure how to resolve it.
>>
>>
>>
>> Op di 4 sep. 2018 om 19:11 schreef Lukasz Cwik <[email protected]>:
>>
>>> Jose, what Bart is recommending is a path that should work.
>>>
>>> Bart, what do you mean by conflicting windows?
>>>
>>>
>>> On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <[email protected]>
>>> wrote:
>>>
>>>> Hi Jose,
>>>>
>>>>
>>>> You could generate a sequence of "ticks" and use that as input to
>>>> continuously update your side input. This is what is suggested in this
>>>> stack overflow post: https://stackoverflow.com/a/41271159/1805725.
>>>> However, the CountingInput no longer exists apparently (at least, I
>>>> can't find it).
>>>>
>>>> I've been working on this problem myself the last couple of days. I try
>>>> to read a file from storage, convert it into a Map and pass that as a side
>>>> input. Here is what I've come up with so far, however I am currently still
>>>> resolving issues with conflicting windows so this code *does not work*
>>>> .:
>>>>
>>>> PCollection<Long> ticks = p
>>>>
>>>>   // Produce 1 "tick" per 10 seconds
>>>>
>>>>   .apply(GenerateSequence.from(0).withRate(1, 
>>>> Duration.standardSeconds(10)))
>>>>   // Window the ticks into 1-minute windows
>>>>   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
>>>>
>>>>
>>>> PCollectionView<Map<String, Long>> mapping = ticks
>>>> .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
>>>> returns a String
>>>> .apply(ParDo.of(new MapFn())) // turns the String (json data) into a
>>>> Map (KV<String, Long>)
>>>> .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.
>>>> standardDays(1)))
>>>> .triggering(Repeatedly.forever(AfterProcessingTime
>>>> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
>>>> .withAllowedLateness(Duration.ZERO)
>>>> .discardingFiredPanes()
>>>> )
>>>> .apply(View.<String, Long>asMap());
>>>>
>>>>
>>>> If you would be able to get a complete working example, would you mind
>>>> sharing it too please?
>>>>
>>>> Thanks,
>>>>
>>>> Bart
>>>>
>>>> Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <[email protected]
>>>> >:
>>>>
>>>>> Hi.
>>>>>
>>>>> Currently, I'm building a real time pipeline to process user
>>>>> interactions, I have to filter interactions based on a black-list. I used 
>>>>> a
>>>>> side input to to store the list. The problem is that I'm required to fetch
>>>>> new elements for the black list every day.
>>>>>
>>>>> I don't want to restart the pipe to re-create the side input. My
>>>>> second option was to move black-list to a Redis and fetch table using the
>>>>> StartBundle annotation, but I think as batches are going to be small, I'm
>>>>> going to be doing thousands off calls to Redis, and it is going to get
>>>>> harder as table size increases.
>>>>>
>>>>> What other options do I have?
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>>>
>>>> --
>>>> Met vriendelijke groeten,
>>>>
>>>> Bart Aelterman
>>>> Freelance data scientist
>>>> http://www.bart-aelterman.com
>>>>
>>>>
>>
>> --
>> Met vriendelijke groeten,
>>
>> Bart Aelterman
>> Freelance data scientist
>> http://www.bart-aelterman.com
>>
>>

Reply via email to