Do you happen to have more than one constructor in the class you're
creating?

BTW, if you only have one concrete class that you want to delay then you
can probably do away with the generics and just set the coder for the cache
directly.







On Mon, Mar 30, 2020 at 8:52 PM Andrew Wylie <[email protected]> wrote:

> Thanks very much Rez, I'll give that a try and let you know how it goes.
>
> Just one question on the cache variable, it has to be a final, however it
> tries to be set in the constructor, which causes a compilation error. Is
> there a nice way to set this to the correct Coder?
>
>
> On Sunday, 29 March 2020, 04:32:49 BST, Reza Rokni <[email protected]>
> wrote:
>
>
> Andrew, with regards to the delay use case, assuming I have understood the
> use case, you can explore using State and Timers.
>
> Something like this pattern below, from a code sample that I hope to add
> to https://beam.apache.org/documentation/patterns/overview/ once I get
> time... If it works for your use case it would be great to get feedback so
> we can make the pattern generalized for Beam folks :-)
>
> Please also note I am using BagState below as not all runners support
> MapState yet. Also for the Key you should balance the key space, maybe into
> a range of 1000-10000. The output won't be exactly 5 mins, but hopefully
> close enough to the SLO that you are looking for.
>
>     public static class FutureCache<K, V> extends DoFn<KV<K,
> TimestampedValue<V>>, V> {
>
>         Coder<V> objectCoder;
>
>         @TimerId("Process")
>         private final TimerSpec processTimer =
> TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
>
>         @StateId("cache")
>         private final StateSpec<BagState<TimestampedValue<V>>> cache;
>
>         public FutureCache(Coder<V> objectCoder) {
>             this.objectCoder = objectCoder;
>             cache = StateSpecs.bag(TimestampedValueCoder.of(objectCoder));
>         }
>
>         @ProcessElement
>         public void process(
>                 @TimerId("Process") Timer timer,
>                 @StateId("cache") BagState<TimestampedValue<V>> cache,
>                 @Element KV<K, TimestampedValue<V>> input) {
>             cache.add(input.getValue());
>             // Will just keep updating for each element.
>             timer.offset(Duration.standardSeconds(1)).setRelative();
>         }
>
>         @OnTimer("Process")
>         public void OnTimer(
>                 @TimerId("Process") Timer timer,
>                 OnTimerContext otc,
>                 @StateId("cache") BagState<TimestampedValue<V>> cache) {
>
> ... Output based on Timestamp being less than (now() - yourDelta..)
>
>
>
> On Sat, Mar 28, 2020 at 5:47 AM Luke Cwik <[email protected]> wrote:
>
> Yes, since you don't care about the grouping, you could use any random key
> that you want.
>
> On Fri, Mar 27, 2020 at 2:45 PM Andrew Wylie <[email protected]> wrote:
>
> To be able to invoke GroupByKey, does that mean I need to create a KV
> object for my Pub Sub messages that are received from the topic?
>
> On 27 Mar 2020, at 18:30, Luke Cwik <[email protected]> wrote:
>
> 
> The trigger only applies to when the output of a GroupByKey is produced
> and won't put in the delay without one so as long as inputEvents is
> followed by a GroupByKey you'll see the delay.
>
> Why do you want the delay as there might be different solutions for the
> problem you're trying to solve?
>
> On Fri, Mar 27, 2020 at 11:16 AM Andrew Wylie <[email protected]> wrote:
>
> Thanks Luke. I wasn’t sure if that was advised or even possible, but it
> seems to be working well.
>
> I would like to introduce a 5 minute delay between reading each message
> and publishing it to the output topic. Is the correct way to do this in
> Apache Beam, using triggers?
>
> I am trying the approach below in my pipeline, but the messages just
> output without the delay.
>
> PCollection windowed_inputEvents = inputEvents.apply(
> Window.into(FixedWindows.of(Duration.standardMinutes(1)))
>
> .triggering(AfterProcessingTime.pastFirstElementInPane().
>
> plusDelayOf(Duration.standardMinutes(5))).
>
> withAllowedLateness(Duration.standardMinutes(1)).
>
> discardingFiredPanes());
>
> Thanks
>
> On 25 Mar 2020, at 23:11, Luke Cwik <[email protected]> wrote:
>
> Is there a reason you don't keep them separate and apply the same
> transforms to each output with the only difference being the final writing
> transform being configured for the correct topic?
>
> On Wed, Mar 25, 2020 at 1:54 PM Andrew Wylie <[email protected]> wrote:
>
> Hi,
>
> So I have 5 input Pub Sub topics and each one of those has an output Pub
> Sub topic. When I Flatten the 5 inputs, is there a way I can ensure that I
> can route the messages to their respective output topics? Is there any kind
> of built in attribute in the messages I can read to determine their input
> topic and therefore which output topic to write to?
>
> Pub Sub Input topic 1 messages need to be output from the pipeline to Pub
> Sub Output topic 1 for example.
>
>
> Thanks
>
>
>

Reply via email to