I noticed there’s also a similar bug open for the Spark runner https://github.com/apache/beam/issues/21378
Problem seems to be in SimpleDoFnRunner.TimerInternalsTimer#clear(), which doesn’t work with InMemoryTimerInternals (anymore). https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L1143-L1145 On 26.07.22, 20:18, "Reuven Lax via user" <[email protected]> wrote: This might be a bug in the Flink runner, because it is implemented here. On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu <zeidoo@gmail.com> wrote: Hi everyone, Quick question about GroupIntoBatches. When running on Flink, eventually it This might be a bug in the Flink runner, because it is implemented here<https://urldefense.com/v3/__https:/github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java*L164__;Iw!!CiXD_PY!Vd9Fc8Lng0wPDFM2RTAAcmIR7ydONQ8AmY-tKRl1ONrg2EX14uX-L4fSFAyKjRz_XvXTDFIIlVZVWw$>. On Tue, Jul 26, 2022 at 9:14 AM Cristian Constantinescu <[email protected]<mailto:[email protected]>> wrote: Hi everyone, Quick question about GroupIntoBatches. When running on Flink, eventually it hits an unsupported exception "Canceling a timer by ID is not yet supported." on this line [1]. The source inputs are AVRO files for testing (batch) but will use kafka topics (streaming) when deployed. This happens when the batch is filled (10 items) and the max buffering time timer needs to be cancelled. Anyone else observed this issue? On a related note. Looks like the FlinkStatefulDoFnFunction [2] uses InMemoryTimerInternals. Curious why is FlinkTimerInternals not used? I would guess there's a difference between batch and streaming requirements? Thank you, Cristian [1] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java#L157<https://urldefense.com/v3/__https:/github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java*L157__;Iw!!CiXD_PY!Vd9Fc8Lng0wPDFM2RTAAcmIR7ydONQ8AmY-tKRl1ONrg2EX14uX-L4fSFAyKjRz_XvXTDFJU0ITY5Q$> [2] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java#L136<https://urldefense.com/v3/__https:/github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java*L136__;Iw!!CiXD_PY!Vd9Fc8Lng0wPDFM2RTAAcmIR7ydONQ8AmY-tKRl1ONrg2EX14uX-L4fSFAyKjRz_XvXTDFKV6Ys0_Q$> [3] https://github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java#L1314<https://urldefense.com/v3/__https:/github.com/apache/beam/blob/b87f1f01197a78d60fb08a0ee7a4ef96bc4e08eb/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java*L1314__;Iw!!CiXD_PY!Vd9Fc8Lng0wPDFM2RTAAcmIR7ydONQ8AmY-tKRl1ONrg2EX14uX-L4fSFAyKjRz_XvXTDFJPiYE4Ww$> As a recipient of an email from Talend, your contact personal data will be on our systems. Please see our privacy notice. <https://www.talend.com/privacy/>
