I am wondering if the issue here is the createTuple2TypeInformation
implicit is creating an anonymous class
<https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/package.scala#L97-L110>
which
results in a non-stable class name if the code is refactored, leading to
the class no longer being found by that name in a new version of the job.

On Tue, Oct 2, 2018 at 4:55 PM Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> To add to the mystery, I extracted the class file mentioned in the
> exceptions (TestJob$$anon$13$$anon$3) from the job jar that created the
> savepoint and disassembled it to determine what serializer it is.  The
> serializer actually has nothing to do with the case class that was
> initially modified and then completely removed.  Rather, it was the
> serializer
> org/apache/flink/api/scala/typeutils/CaseClassSerializer<Lscala/Tuple2<Ljava/lang/String;Lme/doubledutch/lazyjson/LazyObject;>;>;
>
> That's the serializer generated by Flink for source 1 data stream type
> (DataStream[ (String, LazyObject) ]), which is consumed by the async
> function.  AFAIK there is no reason for there to be any error with that
> serializer.
>
> Thoughts?
>
>
>
> On Tue, Oct 2, 2018 at 12:41 AM Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi Elias,
>>
>> I am not familiar with the recovery code, but Flink might read (some of )
>> the savepoint data even though it is not needed and loaded into operators.
>> That would explain why you see an exception when the case class is
>> modified or completely removed.
>>
>> Maybe Stefan or Gordon can help here.
>>
>> Best, Fabian
>>
>> Am Di., 2. Okt. 2018 um 01:10 Uhr schrieb Elias Levy <
>> fearsome.lucid...@gmail.com>:
>>
>>> Any of the Flink folks seen this before?
>>>
>>> On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fearsome.lucid...@gmail.com>
>>> wrote:
>>>
>>>> I am experiencing a rather odd error.  We have a job running on a Flink
>>>> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
>>>> by an async function, and the output of the async function and the other
>>>> original stream are consumed by a CoProcessOperator, that intern emits
>>>> Scala case class instances, that go into a stateful ProcessFunction filter,
>>>> and then into a sink.  I.e.
>>>>
>>>> source 1 -> async function --\
>>>>                                                |---> co process -->
>>>> process --> sink
>>>> source 2 --------------------------/
>>>>
>>>> A field was added to output case class and the job would no longer
>>>> start up from a save point.  I assumed this was a result of a serializer
>>>> incompatibility.  I verified this by reversing the addition of the field
>>>> and the job could then restore from the previous savepoint.  So far it
>>>> makes sense.
>>>>
>>>> Then I decided to leave the new field in the case class, but eliminated
>>>> most of the DAG, leaving only the source 1 --> async function portion of
>>>> it.  The case class is emitted by the co process.  So this removed the
>>>> modified case class from the processing graph.  When I try to restore from
>>>> the savepoint, even if Allow Non Restored State is selected, the job fails
>>>> to restore with the error "Deserialization of serializer erroed".
>>>>
>>>> So then I decided to completely eliminate the modified case class.  I
>>>> removed all trace of it from the job, again only leaving the source 1 ->
>>>> async function.  I tried to restore this job, with no traces of the case
>>>> class, and still the job failed with the "Deserialization of serializer
>>>> erroed" even when Allow Non Restored State is selected.
>>>>
>>>> Anyone seen anything like this?
>>>>
>>>> This is the error being generated:
>>>>
>>>> WARN
>>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>>> Deserialization of serializer errored; replacing with null.
>>>> java.io.IOException: Unloadable class for type serializer.
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.InvalidClassException: failed to read class
>>>> descriptor
>>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>>>> at java.io.ObjectInputStream.readSerialData(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>>> ... 14 more
>>>> Caused by: java.lang.ClassNotFoundException:
>>>> com.somewhere.TestJob$$anon$13$$anon$3
>>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Unknown Source)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>>> ... 24 more
>>>> WARN
>>>>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
>>>> Deserialization of serializer errored; replacing with null.
>>>> java.io.IOException: Unloadable class for type serializer.
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>>>> at
>>>> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>>>> at
>>>> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.InvalidClassException: failed to read class
>>>> descriptor
>>>> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>>>> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject0(Unknown Source)
>>>> at java.io.ObjectInputStream.readObject(Unknown Source)
>>>> at
>>>> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
>>>> ... 18 more
>>>> Caused by:
>>>> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
>>>> at java.net.URLClassLoader.findClass(Unknown Source)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at
>>>> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
>>>> at java.lang.ClassLoader.loadClass(Unknown Source)
>>>> at java.lang.Class.forName0(Native Method)
>>>> at java.lang.Class.forName(Unknown Source)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
>>>> at
>>>> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
>>>> ... 24 more
>>>> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
>>>> Error during disposal of stream operator.
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
>>>> at
>>>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> INFO  org.apache.flink.runtime.taskmanager.Task                     -
>>>> Source: Kafka Topic -> Async Function (1/1)
>>>> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
>>>> java.lang.IllegalStateException: Could not initialize operator state
>>>> backend.
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>>>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>>>> at java.lang.Thread.run(Unknown Source)
>>>> Caused by: java.io.IOException: Unable to restore operator state
>>>> [_async_wait_operator_state_]. The previous serializer of the operator
>>>> state must be present; the serializer could have been removed from the
>>>> classpath, or its implementation have changed and could not be loaded. This
>>>> is a temporary restriction that will be fixed in future versions.
>>>> at
>>>> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>>>> at
>>>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>>>> ... 6 more
>>>>
>>>>
>>>>

Reply via email to