Thanks very much Rez, I'll give that a try and let you know how it goes.
Just one question on the cache variable, it has to be a final, however it tries
to be set in the constructor, which causes a compilation error. Is there a nice
way to set this to the correct Coder?
On Sunday, 29 March 2020, 04:32:49 BST, Reza Rokni <[email protected]> wrote:
Andrew, with regards to the delay use case, assuming I have understood the use
case, you can explore using State and Timers.
Something like this pattern below, from a code sample that I hope to add to
https://beam.apache.org/documentation/patterns/overview/ once I get time... If
it works for your use case it would be great to get feedback so we can make the
pattern generalized for Beam folks :-)
Please also note I am using BagState below as not all runners support MapState
yet. Also for the Key you should balance the key space, maybe into a range of
1000-10000. The output won't be exactly 5 mins, but hopefully close enough to
the SLO that you are looking for.
public static class FutureCache<K, V> extends DoFn<KV<K,
TimestampedValue<V>>, V> {
Coder<V> objectCoder;
@TimerId("Process")
private final TimerSpec processTimer =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("cache")
private final StateSpec<BagState<TimestampedValue<V>>> cache;
public FutureCache(Coder<V> objectCoder) {
this.objectCoder = objectCoder;
cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
}
@ProcessElement
public void process(
@TimerId("Process") Timer timer,
@StateId("cache") BagState<TimestampedValue<V>> cache,
@Element KV<K, TimestampedValue<V>> input) {
cache.add(input.getValue());
// Will just keep updating for each element.
timer.offset(Duration.standardSeconds(1)).setRelative();
}
@OnTimer("Process")
public void OnTimer(
@TimerId("Process") Timer timer,
OnTimerContext otc,
@StateId("cache") BagState<TimestampedValue<V>> cache) {
... Output based on Timestamp being less than (now() - yourDelta..)
On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <[email protected]> wrote:
Yes, since you don't care about the grouping, you could use any random key that
you want.
On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <[email protected]> wrote:
To be able to invoke GroupByKey, does that mean I need to create a KV object
for my Pub Sub messages that are received from the topic?
On 27 Mar 2020, at 18:30, Luke Cwik <[email protected]> wrote:
The trigger only applies to when the output of a GroupByKey is produced and
won't put in the delay without one so as long as inputEvents is followed by a
GroupByKey you'll see the delay.
Why do you want the delay as there might be different solutions for the problem
you're trying to solve?
On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <[email protected]> wrote:
Thanks Luke. I wasn’t sure if that was advised or even possible, but it seems
to be working well.
I would like to introduce a 5 minute delay between reading each message and
publishing it to the output topic. Is the correct way to do this in Apache
Beam, using triggers?
I am trying the approach below in my pipeline, but the messages just output
without the delay.
PCollection windowed_inputEvents = inputEvents.apply(
Window.into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(AfterProcessingTime.pastFirstElementInPane().
plusDelayOf(Duration.standardMinutes(5))).
withAllowedLateness(Duration.standardMinutes(1)).
discardingFiredPanes());
Thanks
On 25 Mar 2020, at 23:11, Luke Cwik <[email protected]> wrote:
Is there a reason you don't keep them separate and apply the same transforms to
each output with the only difference being the final writing transform being
configured for the correct topic?
On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <[email protected]> wrote:
Hi,
So I have 5 input Pub Sub topics and each one of those has an output Pub Sub
topic. When I Flatten the 5 inputs, is there a way I can ensure that I can
route the messages to their respective output topics? Is there any kind of
built in attribute in the messages I can read to determine their input topic
and therefore which output topic to write to?
Pub Sub Input topic 1 messages need to be output from the pipeline to Pub Sub
Output topic 1 for example.
Thanks