Thanks very much Rez, I'll give that a try and let you know how it goes.
Just one question on the cache variable, it has to be a final, however it tries 
to be set in the constructor, which causes a compilation error. Is there a nice 
way to set this to the correct Coder?

    On Sunday, 29 March 2020, 04:32:49 BST, Reza Rokni <[email protected]> wrote: 
 
 
 Andrew, with regards to the delay use case, assuming I have understood the use 
case, you can explore using State and Timers.
Something like this pattern below, from a code sample that I hope to add to 
https://beam.apache.org/documentation/patterns/overview/ once I get time... If 
it works for your use case it would be great to get feedback so we can make the 
pattern generalized for Beam folks :-) 
Please also note I am using BagState below as not all runners support MapState 
yet. Also for the Key you should balance the key space, maybe into a range of 
1000-10000. The output won't be exactly 5 mins, but hopefully close enough to 
the SLO that you are looking for. 
    public static class FutureCache<K, V> extends DoFn<KV<K, 
TimestampedValue<V>>, V> {

        Coder<V> objectCoder;

        @TimerId("Process")
        private final TimerSpec processTimer = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId("cache")
        private final StateSpec<BagState<TimestampedValue<V>>> cache;

        public FutureCache(Coder<V> objectCoder) {
            this.objectCoder = objectCoder;
            cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
        }

        @ProcessElement
        public void process(
                @TimerId("Process") Timer timer,
                @StateId("cache") BagState<TimestampedValue<V>> cache,
                @Element KV<K, TimestampedValue<V>> input) {
            cache.add(input.getValue());
            // Will just keep updating for each element.
            timer.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("Process")
        public void OnTimer(
                @TimerId("Process") Timer timer,
                OnTimerContext otc,
                @StateId("cache") BagState<TimestampedValue<V>> cache) {

... Output based on Timestamp being less than (now() - yourDelta..)


On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <[email protected]> wrote:

Yes, since you don't care about the grouping, you could use any random key that 
you want.
On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <[email protected]> wrote:

To be able to invoke GroupByKey, does that mean I need to create a KV object 
for my Pub Sub messages that are received from the topic?

On 27 Mar 2020, at 18:30, Luke Cwik <[email protected]> wrote:



The trigger only applies to when the output of a GroupByKey is produced and 
won't put in the delay without one so as long as inputEvents is followed by a 
GroupByKey you'll see the delay.
Why do you want the delay as there might be different solutions for the problem 
you're trying to solve?
On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <[email protected]> wrote:

Thanks Luke. I wasn’t sure if that was advised or even possible, but it seems 
to be working well.
I would like to introduce a 5 minute delay between reading each message and 
publishing it to the output topic. Is the correct way to do this in Apache 
Beam, using triggers?
I am trying the approach below in my pipeline, but the messages just output 
without the delay.
PCollection windowed_inputEvents = inputEvents.apply( 
Window.into(FixedWindows.of(Duration.standardMinutes(1)))                       
                                      
.triggering(AfterProcessingTime.pastFirstElementInPane().                       
                                     plusDelayOf(Duration.standardMinutes(5))). 
                                                           
withAllowedLateness(Duration.standardMinutes(1)).                               
                             discardingFiredPanes());
Thanks

On 25 Mar 2020, at 23:11, Luke Cwik <[email protected]> wrote:
Is there a reason you don't keep them separate and apply the same transforms to 
each output with the only difference being the final writing transform being 
configured for the correct topic?
On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <[email protected]> wrote:


Hi,

So I have 5 input Pub Sub topics and each one of those has an output Pub Sub 
topic. When I Flatten the 5 inputs, is there a way I can ensure that I can 
route the messages to their respective output topics? Is there any kind of 
built in attribute in the messages I can read to determine their input topic 
and therefore which output topic to write to?

Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub 
Output topic 1 for example.




Thanks







  

Reply via email to