I am wondering if the issue here is the createTuple2TypeInformation implicit is creating an anonymous class <https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala#L97-L110> which results in a non-stable class name if the code is refactored, leading to the class no longer being found by that name in a new version of the job.
On Tue, Oct 2, 2018 at 4:55 PM Elias Levy <fearsome.lucid...@gmail.com> wrote: > To add to the mystery, I extracted the class file mentioned in the > exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the > savepoint and disassembled it to determine what serializer it is. The > serializer actually has nothing to do with the case class that was > initially modified and then completely removed. Rather, it was the > serializer > org/apache/flink/api/scala/typeutils/CaseClassSerializer<Lscala/Tuple2<Ljava/lang/String;Lme/doubledutch/lazyjson/LazyObject;>;>; > > That's the serializer generated by Flink for source 1 data stream type > (DataStream[ (String, LazyObject) ]), which is consumed by the async > function. AFAIK there is no reason for there to be any error with that > serializer. > > Thoughts? > > > > On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Elias, >> >> I am not familiar with the recovery code, but Flink might read (some of ) >> the savepoint data even though it is not needed and loaded into operators. >> That would explain why you see an exception when the case class is >> modified or completely removed. >> >> Maybe Stefan or Gordon can help here. >> >> Best, Fabian >> >> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy < >> fearsome.lucid...@gmail.com>: >> >>> Any of the Flink folks seen this before? >>> >>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fearsome.lucid...@gmail.com> >>> wrote: >>> >>>> I am experiencing a rather odd error. We have a job running on a Flink >>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed >>>> by an async function, and the output of the async function and the other >>>> original stream are consumed by a CoProcessOperator, that intern emits >>>> Scala case class instances, that go into a stateful ProcessFunction filter, >>>> and then into a sink. I.e. >>>> >>>> source 1 -> async function --\ >>>> |---> co process --> >>>> process --> sink >>>> source 2 --------------------------/ >>>> >>>> A field was added to output case class and the job would no longer >>>> start up from a save point. I assumed this was a result of a serializer >>>> incompatibility. I verified this by reversing the addition of the field >>>> and the job could then restore from the previous savepoint. So far it >>>> makes sense. >>>> >>>> Then I decided to leave the new field in the case class, but eliminated >>>> most of the DAG, leaving only the source 1 --> async function portion of >>>> it. The case class is emitted by the co process. So this removed the >>>> modified case class from the processing graph. When I try to restore from >>>> the savepoint, even if Allow Non Restored State is selected, the job fails >>>> to restore with the error "Deserialization of serializer erroed". >>>> >>>> So then I decided to completely eliminate the modified case class. I >>>> removed all trace of it from the job, again only leaving the source 1 -> >>>> async function. I tried to restore this job, with no traces of the case >>>> class, and still the job failed with the "Deserialization of serializer >>>> erroed" even when Allow Non Restored State is selected. >>>> >>>> Anyone seen anything like this? >>>> >>>> This is the error being generated: >>>> >>>> WARN >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - >>>> Deserialization of serializer errored; replacing with null. >>>> java.io.IOException: Unloadable class for type serializer. >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) >>>> at >>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) >>>> at >>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>> at java.lang.Thread.run(Unknown Source) >>>> Caused by: java.io.InvalidClassException: failed to read class >>>> descriptor >>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) >>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source) >>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>>> at java.io.ObjectInputStream.readObject0(Unknown Source) >>>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source) >>>> at java.io.ObjectInputStream.readSerialData(Unknown Source) >>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>>> at java.io.ObjectInputStream.readObject0(Unknown Source) >>>> at java.io.ObjectInputStream.readObject(Unknown Source) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) >>>> ... 14 more >>>> Caused by: java.lang.ClassNotFoundException: >>>> com.somewhere.TestJob$$anon$13$$anon$3 >>>> at java.net.URLClassLoader.findClass(Unknown Source) >>>> at java.lang.ClassLoader.loadClass(Unknown Source) >>>> at >>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) >>>> at java.lang.ClassLoader.loadClass(Unknown Source) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Unknown Source) >>>> at >>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) >>>> at >>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) >>>> ... 24 more >>>> WARN >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil - >>>> Deserialization of serializer errored; replacing with null. >>>> java.io.IOException: Unloadable class for type serializer. >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203) >>>> at >>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206) >>>> at >>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207) >>>> at >>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85) >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>> at java.lang.Thread.run(Unknown Source) >>>> Caused by: java.io.InvalidClassException: failed to read class >>>> descriptor >>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source) >>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source) >>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source) >>>> at java.io.ObjectInputStream.readObject0(Unknown Source) >>>> at java.io.ObjectInputStream.readObject(Unknown Source) >>>> at >>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375) >>>> ... 18 more >>>> Caused by: >>>> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3 >>>> at java.net.URLClassLoader.findClass(Unknown Source) >>>> at java.lang.ClassLoader.loadClass(Unknown Source) >>>> at >>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128) >>>> at java.lang.ClassLoader.loadClass(Unknown Source) >>>> at java.lang.Class.forName0(Native Method) >>>> at java.lang.Class.forName(Unknown Source) >>>> at >>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73) >>>> at >>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204) >>>> ... 24 more >>>> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask - >>>> Error during disposal of stream operator. >>>> java.lang.NullPointerException >>>> at >>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353) >>>> at >>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>> at java.lang.Thread.run(Unknown Source) >>>> INFO org.apache.flink.runtime.taskmanager.Task - >>>> Source: Kafka Topic -> Async Function (1/1) >>>> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED. >>>> java.lang.IllegalStateException: Could not initialize operator state >>>> backend. >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) >>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) >>>> at java.lang.Thread.run(Unknown Source) >>>> Caused by: java.io.IOException: Unable to restore operator state >>>> [_async_wait_operator_state_]. The previous serializer of the operator >>>> state must be present; the serializer could have been removed from the >>>> classpath, or its implementation have changed and could not be loaded. This >>>> is a temporary restriction that will be fixed in future versions. >>>> at >>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733) >>>> at >>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299) >>>> ... 6 more >>>> >>>> >>>>