Hello,

I'm using Flink 1.17.1 and I have stateTTL enabled in one of my Flink jobs 
where I'm using the RocksDB for checkpointing. I have a value state of Pojo 
class (which is generated from Avro schema). I added a new field to my schema 
along with the default value to make sure it is backwards compatible, however 
when I redeployed the job, I got StateMigrationException. I have similar setup 
with other Flink jobs where adding a column doesn't cause any trouble.

This is my stateTTL config:

StateTtlConfig
 .newBuilder(Time.days(7))
 .cleanupInRocksdbCompactFilter(1000)
 .build

This is how I enable it:

val myStateDescriptor: ValueStateDescriptor[MyPojoClass] =
 new ValueStateDescriptor[MyPojoClass](
   "test-name",
   classOf[MyPojoClass])

myStateDescriptor.enableTimeToLive(initStateTTLConfig())

This is the exception I end up with:

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51) must 
not be incompatible with the old state serializer 
(org.apache.flink.runtime.state.ttl.TtlStateFactory$TtlSerializer@a13bf51).
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:755)
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667)
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883)
    at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870)
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:222)
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:145)
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:129)
    at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:69)
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362)
    at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413)
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
    at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
    ... 25 more

Does anyone know what is causing the issue?

Cheers,
Irakli


Reply via email to