Hi Jose,
You could generate a sequence of "ticks" and use that as input to continuously update your side input. This is what is suggested in this stack overflow post: https://stackoverflow.com/a/41271159/1805725. However, the CountingInput no longer exists apparently (at least, I can't find it). I've been working on this problem myself the last couple of days. I try to read a file from storage, convert it into a Map and pass that as a side input. Here is what I've come up with so far, however I am currently still resolving issues with conflicting windows so this code *does not work*.: PCollection<Long> ticks = p // Produce 1 "tick" per 10 seconds .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(10))) // Window the ticks into 1-minute windows .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))); PCollectionView<Map<String, Long>> mapping = ticks .apply(ParDo.of(new GetFileWithSideInputData())) // reads the file and returns a String .apply(ParDo.of(new MapFn())) // turns the String (json data) into a Map (KV<String, Long>) .apply(Window.<KV<String, Long>>into(FixedWindows.of(Duration.standardDays(1 ))) .triggering(Repeatedly.forever(AfterProcessingTime .pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(10)))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes() ) .apply(View.<String, Long>asMap()); If you would be able to get a complete working example, would you mind sharing it too please? Thanks, Bart Op di 4 sep. 2018 om 08:05 schreef Jose Bermeo <[email protected]>: > Hi. > > Currently, I'm building a real time pipeline to process user interactions, > I have to filter interactions based on a black-list. I used a side input to > to store the list. The problem is that I'm required to fetch new elements > for the black list every day. > > I don't want to restart the pipe to re-create the side input. My second > option was to move black-list to a Redis and fetch table using the > StartBundle annotation, but I think as batches are going to be small, I'm > going to be doing thousands off calls to Redis, and it is going to get > harder as table size increases. > > What other options do I have? > > Thanks. > -- Met vriendelijke groeten, Bart Aelterman Freelance data scientist http://www.bart-aelterman.com
