Andrew, with regards to the delay use case, assuming I have understood the
use case, you can explore using State and Timers.

Something like this pattern below, from a code sample that I hope to add to
https://beam.apache.org/documentation/patterns/overview/ once I get
time... If it works for your use case it would be great to get feedback so
we can make the pattern generalized for Beam folks :-)

Please also note I am using BagState below as not all runners support
MapState yet. Also for the Key you should balance the key space, maybe into
a range of 1000-10000. The output won't be exactly 5 mins, but hopefully
close enough to the SLO that you are looking for.

    public static class FutureCache<K, V> extends DoFn<KV<K,
TimestampedValue<V>>, V> {

        Coder<V> objectCoder;

        @TimerId("Process")
        private final TimerSpec processTimer =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        @StateId("cache")
        private final StateSpec<BagState<TimestampedValue<V>>> cache;

        public FutureCache(Coder<V> objectCoder) {
            this.objectCoder = objectCoder;
            cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
        }

        @ProcessElement
        public void process(
                @TimerId("Process") Timer timer,
                @StateId("cache") BagState<TimestampedValue<V>> cache,
                @Element KV<K, TimestampedValue<V>> input) {
            cache.add(input.getValue());
            // Will just keep updating for each element.
            timer.offset(Duration.standardSeconds(1)).setRelative();
        }

        @OnTimer("Process")
        public void OnTimer(
                @TimerId("Process") Timer timer,
                OnTimerContext otc,
                @StateId("cache") BagState<TimestampedValue<V>> cache) {

... Output based on Timestamp being less than (now() - yourDelta..)



On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <[email protected]> wrote:

> Yes, since you don't care about the grouping, you could use any random key
> that you want.
>
> On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <[email protected]> wrote:
>
>> To be able to invoke GroupByKey, does that mean I need to create a KV
>> object for my Pub Sub messages that are received from the topic?
>>
>> On 27 Mar 2020, at 18:30, Luke Cwik <[email protected]> wrote:
>>
>> 
>> The trigger only applies to when the output of a GroupByKey is produced
>> and won't put in the delay without one so as long as inputEvents is
>> followed by a GroupByKey you'll see the delay.
>>
>> Why do you want the delay as there might be different solutions for the
>> problem you're trying to solve?
>>
>> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <[email protected]> wrote:
>>
>>> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
>>> seems to be working well.
>>>
>>> I would like to introduce a 5 minute delay between reading each message
>>> and publishing it to the output topic. Is the correct way to do this in
>>> Apache Beam, using triggers?
>>>
>>> I am trying the approach below in my pipeline, but the messages just
>>> output without the delay.
>>>
>>> PCollection windowed_inputEvents = inputEvents.apply(
>>> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>>>
>>> .triggering(AfterProcessingTime.pastFirstElementInPane().
>>>
>>> plusDelayOf(Duration.standardMinutes(5))).
>>>
>>> withAllowedLateness(Duration.standardMinutes(1)).
>>>
>>> discardingFiredPanes());
>>>
>>> Thanks
>>>
>>> On 25 Mar 2020, at 23:11, Luke Cwik <[email protected]> wrote:
>>>
>>> Is there a reason you don't keep them separate and apply the same
>>> transforms to each output with the only difference being the final writing
>>> transform being configured for the correct topic?
>>>
>>> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <[email protected]> wrote:
>>>
>>>> Hi,
>>>>
>>>> So I have 5 input Pub Sub topics and each one of those has an output
>>>> Pub Sub topic. When I Flatten the 5 inputs, is there a way I can ensure
>>>> that I can route the messages to their respective output topics? Is there
>>>> any kind of built in attribute in the messages I can read to determine
>>>> their input topic and therefore which output topic to write to?
>>>>
>>>> Pub Sub Input topic 1 messages need to be output from the pipeline to
>>>> Pub Sub Output topic 1 for example.
>>>>
>>>>
>>>> Thanks
>>>>
>>>
>>>

Reply via email to