hi,
this is the pipeline, very simple one
the onTimer is not fired.
We are not using any experimental variables.
public class KafkaBeamPipeline {
static class ProcessMessageFn extends DoFn<KV<String, String>, String> {
@StateId("count")
private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
@TimerId("eventTimer")
private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
@ProcessElement
public void processElement(ProcessContext context,
@StateId("count") ValueState<Integer> state, @TimerId("eventTimer") Timer
timer) {
Integer count = state.read();
//some logic
state.write(count);
// Set a timer for one minute later
timer.set(context.timestamp().plus(60000));
context.output("Current count: " + count);
}
@OnTimer("eventTimer")
public void onTimer(OnTimerContext context, @StateId("count")
ValueState<Integer> state) {
state.write(0);
System.out.println("Timer fired at " + context.timestamp());
}
}
public static void main(String[] args) {
Pipeline pipeline =
Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
pipeline
.apply(KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.withoutMetadata())
.apply(ParDo.of(new ProcessMessageFn()));
pipeline.run().waitUntilFinish();
}
}
thanks
Sigalit
On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <[email protected]> wrote:
> Hi,
>
> what is your runner, is it Flink as well in the issue? What is the
> source of your Pipeline? Do you use some additional flags, e.g.
> --experiments? Do you see that using classical or portable runner?
>
> Jan
>
> On 3/26/24 19:18, Sigalit Eliazov wrote:
> > Hi all
> > We encountered issue with timers starting from version 2.52.
> >
> > We saw that the timers are not triggered.
> >
> > https://github.com/apache/beam/issues/29816
> >
> > Did someone encounter such problems as well?
> >
> > Thanks
> > Sigalit
> >
> >
> >
>