Andrew, with regards to the delay use case, assuming I have understood the use case, you can explore using State and Timers.
Something like this pattern below, from a code sample that I hope to add to https://beam.apache.org/documentation/patterns/overview/ once I get time... If it works for your use case it would be great to get feedback so we can make the pattern generalized for Beam folks :-) Please also note I am using BagState below as not all runners support MapState yet. Also for the Key you should balance the key space, maybe into a range of 1000-10000. The output won't be exactly 5 mins, but hopefully close enough to the SLO that you are looking for. public static class FutureCache<K, V> extends DoFn<KV<K, TimestampedValue<V>>, V> { Coder<V> objectCoder; @TimerId("Process") private final TimerSpec processTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @StateId("cache") private final StateSpec<BagState<TimestampedValue<V>>> cache; public FutureCache(Coder<V> objectCoder) { this.objectCoder = objectCoder; cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder)); } @ProcessElement public void process( @TimerId("Process") Timer timer, @StateId("cache") BagState<TimestampedValue<V>> cache, @Element KV<K, TimestampedValue<V>> input) { cache.add(input.getValue()); // Will just keep updating for each element. timer.offset(Duration.standardSeconds(1)).setRelative(); } @OnTimer("Process") public void OnTimer( @TimerId("Process") Timer timer, OnTimerContext otc, @StateId("cache") BagState<TimestampedValue<V>> cache) { ... Output based on Timestamp being less than (now() - yourDelta..) On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <[email protected]> wrote: > Yes, since you don't care about the grouping, you could use any random key > that you want. > > On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <[email protected]> wrote: > >> To be able to invoke GroupByKey, does that mean I need to create a KV >> object for my Pub Sub messages that are received from the topic? >> >> On 27 Mar 2020, at 18:30, Luke Cwik <[email protected]> wrote: >> >> >> The trigger only applies to when the output of a GroupByKey is produced >> and won't put in the delay without one so as long as inputEvents is >> followed by a GroupByKey you'll see the delay. >> >> Why do you want the delay as there might be different solutions for the >> problem you're trying to solve? >> >> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <[email protected]> wrote: >> >>> Thanks Luke. I wasn’t sure if that was advised or even possible, but it >>> seems to be working well. >>> >>> I would like to introduce a 5 minute delay between reading each message >>> and publishing it to the output topic. Is the correct way to do this in >>> Apache Beam, using triggers? >>> >>> I am trying the approach below in my pipeline, but the messages just >>> output without the delay. >>> >>> PCollection windowed_inputEvents = inputEvents.apply( >>> Window.into(FixedWindows.of(Duration.standardMinutes(1))) >>> >>> .triggering(AfterProcessingTime.pastFirstElementInPane(). >>> >>> plusDelayOf(Duration.standardMinutes(5))). >>> >>> withAllowedLateness(Duration.standardMinutes(1)). >>> >>> discardingFiredPanes()); >>> >>> Thanks >>> >>> On 25 Mar 2020, at 23:11, Luke Cwik <[email protected]> wrote: >>> >>> Is there a reason you don't keep them separate and apply the same >>> transforms to each output with the only difference being the final writing >>> transform being configured for the correct topic? >>> >>> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <[email protected]> wrote: >>> >>>> Hi, >>>> >>>> So I have 5 input Pub Sub topics and each one of those has an output >>>> Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure >>>> that I can route the messages to their respective output topics? Is there >>>> any kind of built in attribute in the messages I can read to determine >>>> their input topic and therefore which output topic to write to? >>>> >>>> Pub Sub Input topic 1 messages need to be output from the pipeline to >>>> Pub Sub Output topic 1 for example. >>>> >>>> >>>> Thanks >>>> >>> >>>
