Any of the Flink folks seen this before? On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fearsome.lucid...@gmail.com> wrote:
> I am experiencing a rather odd error. We have a job running on a Flink > 1.4.2 cluster with two Kafka input streams, one of the streams is processed > by an async function, and the output of the async function and the other > original stream are consumed by a CoProcessOperator, that intern emits > Scala case class instances, that go into a stateful ProcessFunction filter, > and then into a sink. I.e. > > source 1 -> async function --\ > |---> co process --> > process --> sink > source 2 --------------------------/ > > A field was added to output case class and the job would no longer start > up from a save point. I assumed this was a result of a serializer > incompatibility. I verified this by reversing the addition of the field > and the job could then restore from the previous savepoint. So far it > makes sense. > > Then I decided to leave the new field in the case class, but eliminated > most of the DAG, leaving only the source 1 --> async function portion of > it. The case class is emitted by the co process. So this removed the > modified case class from the processing graph. When I try to restore from > the savepoint, even if Allow Non Restored State is selected, the job fails > to restore with the error "Deserialization of serializer erroed". > > So then I decided to completely eliminate the modified case class. I > removed all trace of it from the job, again only leaving the source 1 -> > async function. I tried to restore this job, with no traces of the case > class, and still the job failed with the "Deserialization of serializer > erroed" even when Allow Non Restored State is selected. > > Anyone seen anything like this? > > This is the error being generated: > > WARN > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - > Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.defaultReadFields(Unknown Source) > at java.io.ObjectInputStream.readSerialData(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 14 more > Caused by: java.lang.ClassNotFoundException: > com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > WARN > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - > Deserialization of serializer errored; replacing with null. > java.io.IOException: Unloadable class for type serializer. > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) > at > org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206) > at > org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) > at > org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.InvalidClassException: failed to read class descriptor > at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) > at java.io.ObjectInputStream.readClassDesc(Unknown Source) > at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) > at java.io.ObjectInputStream.readObject0(Unknown Source) > at java.io.ObjectInputStream.readObject(Unknown Source) > at > org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) > ... 18 more > Caused by: > java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 > at java.net.URLClassLoader.findClass(Unknown Source) > at java.lang.ClassLoader.loadClass(Unknown Source) > at > org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) > at java.lang.ClassLoader.loadClass(Unknown Source) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Unknown Source) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) > at > org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) > ... 24 more > ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - > Error during disposal of stream operator. > java.lang.NullPointerException > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353) > at > org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > INFO org.apache.flink.runtime.taskmanager.Task - > Source: Kafka Topic -> Async Function (1/1) > (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED. > java.lang.IllegalStateException: Could not initialize operator state > backend. > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Unknown Source) > Caused by: java.io.IOException: Unable to restore operator state > [_async_wait_operator_state_]. The previous serializer of the operator > state must be present; the serializer could have been removed from the > classpath, or its implementation have changed and could not be loaded. This > is a temporary restriction that will be fixed in future versions. > at > org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) > ... 6 more > > >