Hi,  Avi Levi

I don't think there's any way to solve this problem right now, and Flink 
documentation clearly shows that this is not supported. 


“Trying to restore state, which was previously configured without TTL, using 
TTL enabled descriptor or vice versa will lead to compatibility failure and 
StateMigrationException."


Flink Document: 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/state.html#state-time-to-live-ttl


Best,
Haibo

At 2019-07-14 16:50:19, "Avi Levi" <avi.l...@bluevoyant.com> wrote:

Hi,

I added a ttl to my state 
old version :
 private lazy val stateDescriptor = new ValueStateDescriptor("foo", 
Types.CASE_CLASS[DomainState])


vs the new version 

@transient
private lazy val storeTtl = StateTtlConfig.newBuilder(90)
  .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
  .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
  .cleanupInRocksdbCompactFilter()
  .build()

  private lazy val stateDescriptor = {
    val des = new ValueStateDescriptor("foo", Types.CASE_CLASS[DomainState])
    des.enableTimeToLive(storeTtl)
    des
  }


BUT when trying to restore from savepoint I am getting this error:


java.lang.RuntimeException: Error while getting state
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:62)
        at 
org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:144)
        ...

Caused by: org.apache.flink.util.StateMigrationException: The new state 
serializer cannot be incompatible.
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:527)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:475)
        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:613)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createTtlStateContext(TtlStateFactory.java:197)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createValueState(TtlStateFactory.java:137)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createState(TtlStateFactory.java:126)
        at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:71)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:286)
        at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:335)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:124)
        at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60)
        ... 11 more


Do you have any idea how can I resolve it ? 


Best wishes 

Reply via email to