hi,
this is the pipeline, very simple one
the onTimer is not fired.
We are not using any experimental variables.

public class KafkaBeamPipeline {

    static class ProcessMessageFn extends DoFn<KV<String, String>, String> {
        @StateId("count")
        private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();

        @TimerId("eventTimer")
        private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);

        @ProcessElement
        public void processElement(ProcessContext context,
@StateId("count") ValueState<Integer> state, @TimerId("eventTimer") Timer
timer) {
            Integer count = state.read();
            //some logic
            state.write(count);

            // Set a timer for one minute later
            timer.set(context.timestamp().plus(60000));
            context.output("Current count: " + count);
        }

        @OnTimer("eventTimer")
        public void onTimer(OnTimerContext context, @StateId("count")
ValueState<Integer> state) {
            state.write(0);
            System.out.println("Timer fired at " + context.timestamp());
        }
    }

    public static void main(String[] args) {
        Pipeline pipeline =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());

        pipeline
            .apply(KafkaIO.<String, String>read()
                    .withBootstrapServers("localhost:9092")
                    .withTopic("topic")
                    .withKeyDeserializer(StringDeserializer.class)
                    .withValueDeserializer(StringDeserializer.class)
                    .withoutMetadata())
            .apply(ParDo.of(new ProcessMessageFn()));

        pipeline.run().waitUntilFinish();
    }
}

thanks
Sigalit

On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <[email protected]> wrote:

> Hi,
>
> what is your runner, is it Flink as well in the issue? What is the
> source of your Pipeline? Do you use some additional flags, e.g.
> --experiments? Do you see that using classical or portable runner?
>
>   Jan
>
> On 3/26/24 19:18, Sigalit Eliazov wrote:
> > Hi all
> > We encountered issue with timers starting from version 2.52.
> >
> > We saw that the timers are not triggered.
> >
> > https://github.com/apache/beam/issues/29816
> >
> > Did someone encounter such problems as well?
> >
> > Thanks
> > Sigalit
> >
> >
> >
>

Reply via email to