Hi, Could you please share the code of state initialization (getting state from a state descriptor)? It seems you are creating a state in #processElement?
Best, Zakelly On Thu, Jan 18, 2024 at 2:25 PM Zakelly Lan <zakelly....@gmail.com> wrote: > Hi, > > Could you please share the code of state initialization (getting state > from a state descriptor)? It seems you are creating a state in > #processElement? > > > Best, > Zakelly > > On Thu, Jan 18, 2024 at 3:47 AM Konstantinos Karavitis < > kkaravi...@gmail.com> wrote: > >> Have you ever met the following error when a flink application restarts >> and tries to restore the state from RocksDB? >> >> >> *Caused by: java.lang.UnsupportedOperationException: A serializer has >> already been registered for the state; re-registration is not allowed. >> at >> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >> >> May that be a potential bug of a race condition where the namespace >> serializer is being registered by more than one place concurrently? >> >> Here's also the full stack trace >> >> at >> org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.processElement1(CoBroadcastWithKeyedOperator.java:125) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183) >> at >> org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) >> at >> org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) >> at >> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) >> at >> org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542) >> at >> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780) >> at >> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935) >> at >> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550) >> at java.base/java.lang.Thread.run(Unknown Source) >> >> *Caused by: java.lang.UnsupportedOperationException: A serializer has >> already been registered for the state; re-registration is not allowed. >> at >> org.apache.flink.runtime.state.StateSerializerProvider$LazilyRegisteredStateSerializerProvider.registerNewSerializerForRestoredState(StateSerializerProvider.java:302)* >> Caused by: java.lang.UnsupportedOperationException: A serializer has >> already been registered for the state; re-registration is not allowed. >> *at >> org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo.updateNamespaceSerializer(RegisteredKeyValueStateBackendMetaInfo.java:132)* >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.updateRestoredStateMetaInfo(RocksDBKeyedStateBackend.java:734) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.tryRegisterKvStateInformation(RocksDBKeyedStateBackend.java:667) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:883) >> at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.createOrUpdateInternalState(RocksDBKeyedStateBackend.java:870) >> at >> org.apache.flink.runtime.state.KeyedStateFactory.createOrUpdateInternalState(KeyedStateFactory.java:47) >> at >> org.apache.flink.runtime.state.ttl.TtlStateFactory.createStateAndWrapWithTtlIfEnabled(TtlStateFactory.java:73) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getOrCreateKeyedState(AbstractKeyedStateBackend.java:362) >> at >> org.apache.flink.runtime.state.AbstractKeyedStateBackend.getPartitionedState(AbstractKeyedStateBackend.java:413) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getPartitionedState(DefaultKeyedStateStore.java:115) >> at >> org.apache.flink.runtime.state.DefaultKeyedStateStore.getState(DefaultKeyedStateStore.java:60) >> ... 36 common frames omitted >> >> >> Many thanks in advance! >> >> >>