Hi Filip,

It looks like, your state primitive is used in the context of Windows:
Keyed state works like this:

  *   It uses a cascade of key types to store and retrieve values:
     *   The key (set by .keyBy)
     *   A namespace (usually a VoidNamespace), unless it is used in context of 
a specific window
     *   An optional key of the state primitive (if it is a MapState)

In your case the state primitive is (probably) declared in the context of a 
window and hence when loading the state by means of StateProcessorAPI you also 
need to specify the correct Namespace TypeInformation.
If I am in doubt, how a state primitive is set up, I let the debugger stop in a 
process function and walk up the call stack to find the proper components 
implementing it.

If you share a little more of your code it is much easier to provide specific 
guidance 😊
(e.g. ‘savepoint’ is never used again in your code snippet …)

Sincere greeting

Thias



From: Filip Karnicki <filip.karni...@gmail.com>
Sent: Tuesday, October 25, 2022 10:08 AM
To: user <user@flink.apache.org>
Subject: State Processor API - VoidNamespaceSerializer must be compatible with 
the old namespace serializer LongSerializer

Hi, I'm trying to load a list state using the State Processor API (Flink 1.14.3)

Cluster settings:


state.backend: rocksdb

state.backend.incremental: true

(...)

Code:

val env = ExecutionEnvironment.getExecutionEnvironment
val savepoint = Savepoint.load(env, pathToSavepoint, new 
EmbeddedRocksDBStateBackend(true))

val tpe = new 
MessageTypeInformation(MessageFactoryKey.forType(MessageFactoryType.WITH_PROTOBUF_PAYLOADS,
 null) // using Flink Stateful Functions
val envelopeSerializer: TypeSerializer[Message] = 
tpe.createSerializer(env.getConfig)
val listDescriptor = new ListStateDescriptor[Message]("delayed-message-buffer", 
envelopeSerializer.duplicate)

(...)
override def open(parameters: Configuration): Unit = {
    getRuntimeContext.getListState(listDescriptor) // fails with error [1]
}


Error [1]:

Caused by: java.io.IOException: Failed to restore timer state
            at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:177)
            at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:64)
            at 
org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:183)
            at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
            at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
            at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
            at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.RuntimeException: Error while getting state
            at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
            at 
org.apache.flink.state.api.runtime.SavepointRuntimeContext.getListState(SavepointRuntimeContext.java:213)
            at 
x.x.x.x.x.myModule.StateReader$$anon$1.open(StateReader.scala:527)
            at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
            at 
org.apache.flink.state.api.input.operator.StateReaderOperator.open(StateReaderOperator.java:106)
            at 
org.apache.flink.state.api.input.operator.KeyedStateReaderOperator.open(KeyedStateReaderOperator.java:66)
            at 
org.apache.flink.state.api.input.KeyedStateInputFormat.open(KeyedStateInputFormat.java:174)
            ... 7 more
Caused by: org.apache.flink.util.StateMigrationException: The new namespace 
serializer 
(org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da<mailto:org.apache.flink.runtime.state.VoidNamespaceSerializer@2806d6da>)
 must be compatible with the old namespace serializer 
(org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef<mailto:org.apache.flink.api.common.typeutils.base.LongSerializer@52b06bef>).
            at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:685)
            at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:624)
            at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createInternalState(RocksDBKeyedStateBackend.java:837)
            at 
org.apache.flink.runtime.state.KeyedStateFactory.createInternalState(KeyedStateFactory.java:47)
            at 
org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73)
            at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:302)
            at 
org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:353)
            at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115)
            at 
org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
            ... 13 more



It seems that DefaultKeyedStateStore always wants to use 
VoidNamespaceSerializer.INSTANCE despite my state being created with a 
LongSerializer namespace serializer.

Is there anything anyone can immediately see me doing wrong?

Thank you
Fil
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to