Jose, what Bart is recommending is a path that should work. Bart, what do you mean by conflicting windows?
On Mon, Sep 3, 2018 at 11:29 PM Bart Aelterman <[email protected]> wrote: > Hi Jose, > > > You could generate a sequence of "ticks" and use that as input to > continuously update your side input. This is what is suggested in this > stack overflow post: https://stackoverflow.com/a/41271159/1805725. > However, the CountingInput no longer exists apparently (at least, I can't > find it). > > I've been working on this problem myself the last couple of days. I try to > read a file from storage, convert it into a Map and pass that as a side > input. Here is what I've come up with so far, however I am currently still > resolving issues with conflicting windows so this code *does not work*.: > > PCollection<Long> ticks = p > > // Produce 1 "tick" per 10 seconds > > .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10))) > // Window the ticks into 1-minute windows > .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); > > > PCollectionView<Map<String, Long>> mapping = ticks > .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and > returns a String > .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map > (KV<String, Long>) > .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays > (1))) > .triggering(Repeatedly.forever(AfterProcessingTime > .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))) > .withAllowedLateness(Duration.ZERO) > .discardingFiredPanes() > ) > .apply(View.<String, Long>asMap()); > > > If you would be able to get a complete working example, would you mind > sharing it too please? > > Thanks, > > Bart > > Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <[email protected]>: > >> Hi. >> >> Currently, I'm building a real time pipeline to process user >> interactions, I have to filter interactions based on a black-list. I used a >> side input to to store the list. The problem is that I'm required to fetch >> new elements for the black list every day. >> >> I don't want to restart the pipe to re-create the side input. My second >> option was to move black-list to a Redis and fetch table using the >> StartBundle annotation, but I think as batches are going to be small, I'm >> going to be doing thousands off calls to Redis, and it is going to get >> harder as table size increases. >> >> What other options do I have? >> >> Thanks. >> > > > -- > Met vriendelijke groeten, > > Bart Aelterman > Freelance data scientist > http://www.bart-aelterman.com > >
