Jose, what Bart is recommending is a path that should work.

Bart, what do you mean by conflicting windows?


On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <[email protected]>
wrote:

> Hi Jose,
>
>
> You could generate a sequence of "ticks" and use that as input to
> continuously update your side input. This is what is suggested in this
> stack overflow post: https://stackoverflow.com/a/41271159/1805725.
> However, the CountingInput no longer exists apparently (at least, I can't
> find it).
>
> I've been working on this problem myself the last couple of days. I try to
> read a file from storage, convert it into a Map and pass that as a side
> input. Here is what I've come up with so far, however I am currently still
> resolving issues with conflicting windows so this code *does not work*.:
>
> PCollection<Long> ticks = p
>
>   // Produce 1 "tick" per 10 seconds
>
>   .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10)))
>   // Window the ticks into 1-minute windows
>   .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1))));
>
>
> PCollectionView<Map<String, Long>> mapping = ticks
> .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and
> returns a String
> .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map
> (KV<String, Long>)
> .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays
> (1)))
> .triggering(Repeatedly.forever(AfterProcessingTime
> .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10))))
> .withAllowedLateness(Duration.ZERO)
> .discardingFiredPanes()
> )
> .apply(View.<String, Long>asMap());
>
>
> If you would be able to get a complete working example, would you mind
> sharing it too please?
>
> Thanks,
>
> Bart
>
> Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <[email protected]>:
>
>> Hi.
>>
>> Currently, I'm building a real time pipeline to process user
>> interactions, I have to filter interactions based on a black-list. I used a
>> side input to to store the list. The problem is that I'm required to fetch
>> new elements for the black list every day.
>>
>> I don't want to restart the pipe to re-create the side input. My second
>> option was to move black-list to a Redis and fetch table using the
>> StartBundle annotation, but I think as batches are going to be small, I'm
>> going to be doing thousands off calls to Redis, and it is going to get
>> harder as table size increases.
>>
>> What other options do I have?
>>
>> Thanks.
>>
>
>
> --
> Met vriendelijke groeten,
>
> Bart Aelterman
> Freelance data scientist
> http://www.bart-aelterman.com
>
>

Reply via email to