we are running with flink runner, i also tested with direct runner. same results
On Wed, Mar 27, 2024 at 2:51 PM Sigalit Eliazov <[email protected]> wrote: > hi, > this is the pipeline, very simple one > the onTimer is not fired. > We are not using any experimental variables. > > public class KafkaBeamPipeline { > > static class ProcessMessageFn extends DoFn<KV<String, String>, String> > { > @StateId("count") > private final StateSpec<ValueState<Integer>> stateSpec = > StateSpecs.value(); > > @TimerId("eventTimer") > private final TimerSpec timerSpec = > TimerSpecs.timer(TimeDomain.EVENT_TIME); > > @ProcessElement > public void processElement(ProcessContext context, > @StateId("count") ValueState<Integer> state, @TimerId("eventTimer") Timer > timer) { > Integer count = state.read(); > //some logic > state.write(count); > > // Set a timer for one minute later > timer.set(context.timestamp().plus(60000)); > context.output("Current count: " + count); > } > > @OnTimer("eventTimer") > public void onTimer(OnTimerContext context, @StateId("count") > ValueState<Integer> state) { > state.write(0); > System.out.println("Timer fired at " + context.timestamp()); > } > } > > public static void main(String[] args) { > Pipeline pipeline = > Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create()); > > pipeline > .apply(KafkaIO.<String, String>read() > .withBootstrapServers("localhost:9092") > .withTopic("topic") > .withKeyDeserializer(StringDeserializer.class) > .withValueDeserializer(StringDeserializer.class) > .withoutMetadata()) > .apply(ParDo.of(new ProcessMessageFn())); > > pipeline.run().waitUntilFinish(); > } > } > > thanks > Sigalit > > On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <[email protected]> wrote: > >> Hi, >> >> what is your runner, is it Flink as well in the issue? What is the >> source of your Pipeline? Do you use some additional flags, e.g. >> --experiments? Do you see that using classical or portable runner? >> >> Jan >> >> On 3/26/24 19:18, Sigalit Eliazov wrote: >> > Hi all >> > We encountered issue with timers starting from version 2.52. >> > >> > We saw that the timers are not triggered. >> > >> > https://github.com/apache/beam/issues/29816 >> > >> > Did someone encounter such problems as well? >> > >> > Thanks >> > Sigalit >> > >> > >> > >> >
