we are running with flink runner, i also tested with direct runner. same
results

On Wed, Mar 27, 2024 at 2:51 PM Sigalit Eliazov <[email protected]> wrote:

> hi,
> this is the pipeline, very simple one
> the onTimer is not fired.
> We are not using any experimental variables.
>
> public class KafkaBeamPipeline {
>
>     static class ProcessMessageFn extends DoFn<KV<String, String>, String>
> {
>         @StateId("count")
>         private final StateSpec<ValueState<Integer>> stateSpec =
> StateSpecs.value();
>
>         @TimerId("eventTimer")
>         private final TimerSpec timerSpec =
> TimerSpecs.timer(TimeDomain.EVENT_TIME);
>
>         @ProcessElement
>         public void processElement(ProcessContext context,
> @StateId("count") ValueState<Integer> state, @TimerId("eventTimer") Timer
> timer) {
>             Integer count = state.read();
>             //some logic
>             state.write(count);
>
>             // Set a timer for one minute later
>             timer.set(context.timestamp().plus(60000));
>             context.output("Current count: " + count);
>         }
>
>         @OnTimer("eventTimer")
>         public void onTimer(OnTimerContext context, @StateId("count")
> ValueState<Integer> state) {
>             state.write(0);
>             System.out.println("Timer fired at " + context.timestamp());
>         }
>     }
>
>     public static void main(String[] args) {
>         Pipeline pipeline =
> Pipeline.create(PipelineOptionsFactory.fromArgs(args).withValidation().create());
>
>         pipeline
>             .apply(KafkaIO.<String, String>read()
>                     .withBootstrapServers("localhost:9092")
>                     .withTopic("topic")
>                     .withKeyDeserializer(StringDeserializer.class)
>                     .withValueDeserializer(StringDeserializer.class)
>                     .withoutMetadata())
>             .apply(ParDo.of(new ProcessMessageFn()));
>
>         pipeline.run().waitUntilFinish();
>     }
> }
>
> thanks
> Sigalit
>
> On Wed, Mar 27, 2024 at 9:54 AM Jan Lukavský <[email protected]> wrote:
>
>> Hi,
>>
>> what is your runner, is it Flink as well in the issue? What is the
>> source of your Pipeline? Do you use some additional flags, e.g.
>> --experiments? Do you see that using classical or portable runner?
>>
>>   Jan
>>
>> On 3/26/24 19:18, Sigalit Eliazov wrote:
>> > Hi all
>> > We encountered issue with timers starting from version 2.52.
>> >
>> > We saw that the timers are not triggered.
>> >
>> > https://github.com/apache/beam/issues/29816
>> >
>> > Did someone encounter such problems as well?
>> >
>> > Thanks
>> > Sigalit
>> >
>> >
>> >
>>
>

Reply via email to