StateMigrationException while using stateTTL
Hello, I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs where I'm using the RocksDB for checkpointing. I have a value state of Pojo class (which is generated from Avro schema). I added a new field to my schema along with the default value to make sure it is backwards compatible, however when I redeployed the job, I got StateMigrationException. I have similar setup with other Flink jobs where adding a column doesn't cause any trouble. This is my stateTTL config: StateTtlConfig .newBuilder(Time.days(7)) .cleanupInRocksdbCompactFilter(1000) .build This is how I enable it: val myStateDescriptor: ValueStateDescriptor[MyPojoClass] = new ValueStateDescriptor[MyPojoClass]( "test-name", classOf[MyPojoClass]) myStateDescriptor.enableTimeToLive(initStateTTLConfig()) This is the exception I end up with: Caused by: org.apache.flink.util.StateMigrationException: The new state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must not be incompatible with the old state serializer (org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51). at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883) at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129) at org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362) at org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) at org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) ... 25 more Does anyone know what is causing the issue? Cheers, Irakli
Re: Flink Batch Execution Mode
Hi Feng, I'm using flink-connector-kafka 3.0.1-1.17. I see that 1.17 is affected, but the ticket is marked as fixed so I'm not sure if that is actually the issue. Best, Irakli From: Feng Jin Sent: 12 March 2024 18:28 To: Keshelava, Irakli Cc: user@flink.apache.org Subject: Re: Flink Batch Execution Mode Hi Irakli What version of flink-connector-kafka are you using? You may have encountered a bug [1] in the old version that prevents the source task from entering the finished state. [1]. https://issues.apache.org/jira/browse/FLINK-31319<https://issues.apache.org/jira/browse/FLINK-31319> Best, Feng On Tue, Mar 12, 2024 at 7:21 PM irakli.keshel...@sony.com<mailto:irakli.keshel...@sony.com> mailto:irakli.keshel...@sony.com>> wrote: Hello, I have a Flink job that is running in the Batch mode. The source for the job is a Kafka topic which has limited number of events. I can see that the job starts running fine and consumes the events, but never makes it past the first task and becomes idle. The Kafka source is defined to be bounded by following command: "KafkaSource.builder().setBounded(OffsetsInitializer.latest())". I expect the job to consume all the events that are in the Kafka topic and then move to the next task, but I'm not sure if the "OffsetsInitializer.latest()" is the right OffsetInitializer. Can anyone help me out here? Thanks! Cheers, Irakli
Flink Batch Execution Mode
Hello, I have a Flink job that is running in the Batch mode. The source for the job is a Kafka topic which has limited number of events. I can see that the job starts running fine and consumes the events, but never makes it past the first task and becomes idle. The Kafka source is defined to be bounded by following command: "KafkaSource.builder().setBounded(OffsetsInitializer.latest())". I expect the job to consume all the events that are in the Kafka topic and then move to the next task, but I'm not sure if the "OffsetsInitializer.latest()" is the right OffsetInitializer. Can anyone help me out here? Thanks! Cheers, Irakli
Re: Batch mode execution
Thank you both! I'll try to switch the scheduler to "AdaptiveBatchScheduler". Best, Irakli From: Junrui Lee Sent: 05 March 2024 03:50 To: user Subject: Re: Batch mode execution Hello Irakli, The error is due to the fact that the Adaptive Scheduler doesn’t support batch jobs, as detailed in the Flink documentation[1]. When operating in reactive mode, Flink automatically decides the type of scheduler to use. For batch execution, the default scheduler is AdaptiveBatchScheduler, not AdaptiveScheduler as in the streaming case. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations<https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/elastic_scaling/#limitations> Best regards, Junrui lorenzo.affetti.ververica.com<http://lorenzo.affetti.ververica.com> via user mailto:user@flink.apache.org>> 于2024年3月4日周一 23:24写道: Hello Irakli and thank you for your question. I guess that somehow Flink enters the "reactive" mode while the adaptive scheduler is not configured. I would go with 2 options to isolate your issue: * Try with forcing the scheduling mode (https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler<https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/deployment/elastic_scaling/#adaptive-scheduler>) in your configuration: `jobmanager.scheduler: adaptive` *Let Flink decide for the runtime mode: if the source is bounded, you don't need `env.setRuntimeMode(RuntimeExecutionMode.BATCH)`, as Flink will understand that correctly. Can you try one of the two and see if that helps? For the rest: "running it in the "BATCH" mode was better as I don't have to deal with the Watermark Strategy". Still, you could opt for a simple watermarking strategy and stay with the streaming mode (for example, 'BoundedOutOfOrcerness': https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector<https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#watermark-strategies-and-the-kafka-connector>). On Mar 4, 2024 at 15:54 +0100, irakli.keshel...@sony.com<mailto:irakli.keshel...@sony.com> mailto:irakli.keshel...@sony.com>>, wrote: Hello, I have a Flink job which is processing bounded number of events. Initially, I was running the job in the "STREAMING" mode, but I realized that running it in the "BATCH" mode was better as I don't have to deal with the Watermark Strategy. The job is reading the data from the Kafka topic and was running fine in the "STREAMING" mode. I switched the job to the "BATCH" mode by setting "env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source builder and I updated the Watermark strategy to be "WatermarkStrategy.noWatermarks()". After making these changes and deploying the job I end up with following error: "java.lang.IllegalStateException","error.message":"Adaptive Scheduler is required for reactive mode". I couldn't find any documentation online which is connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know where this error is coming from and how I can deal with it? Cheers, Irakli
Batch mode execution
Hello, I have a Flink job which is processing bounded number of events. Initially, I was running the job in the "STREAMING" mode, but I realized that running it in the "BATCH" mode was better as I don't have to deal with the Watermark Strategy. The job is reading the data from the Kafka topic and was running fine in the "STREAMING" mode. I switched the job to the "BATCH" mode by setting "env.setRuntimeMode(RuntimeExecutionMode.BATCH)". I changed the Kafka Source to be bounded by setting ".setBounded(OffsetsInitializer.latest())" to the source builder and I updated the Watermark strategy to be "WatermarkStrategy.noWatermarks()". After making these changes and deploying the job I end up with following error: "java.lang.IllegalStateException","error.message":"Adaptive Scheduler is required for reactive mode". I couldn't find any documentation online which is connecting "Adaptive Scheduler" to the "BATCH" processing. Does anyone know where this error is coming from and how I can deal with it? Cheers, Irakli
Dealing with stale Watermark
Hello, I have a Flink application that is consuming events from the Kafka topic and builds sessions from them. I'm using the Keyed stream. The application runs fine initially, but after some time it is getting "stuck". I can see that the "processElement" function is processing the incoming events, but the onTimer function is never called. I can also see that the Watermark is getting stale (I guess that's why the onTimer is not called). My window duration is 1 minute, and the logic lives in onTimer function. Every minute I delete the old timer and create a new one (again with one minute duration). If certain logic is fulfilled, then I don't create a new timer anymore and clear the state. The same application is running fine in another environment (this one just gets more data), because of this I believe the windowing logic has no flows. I can't find any anomalies in CPU/Memory consumption and the checkpoints are completed successfully as well. Does anyone have similar issues? Best, Irakli