Any of the Flink folks seen this before?

On Fri, Sep 28, 2018 at 5:23 PM Elias Levy <fearsome.lucid...@gmail.com>
wrote:

> I am experiencing a rather odd error.  We have a job running on a Flink
> 1.4.2 cluster with two Kafka input streams, one of the streams is processed
> by an async function, and the output of the async function and the other
> original stream are consumed by a CoProcessOperator, that intern emits
> Scala case class instances, that go into a stateful ProcessFunction filter,
> and then into a sink.  I.e.
>
> source 1 -> async function --\
>                                                |---> co process -->
> process --> sink
> source 2 --------------------------/
>
> A field was added to output case class and the job would no longer start
> up from a save point.  I assumed this was a result of a serializer
> incompatibility.  I verified this by reversing the addition of the field
> and the job could then restore from the previous savepoint.  So far it
> makes sense.
>
> Then I decided to leave the new field in the case class, but eliminated
> most of the DAG, leaving only the source 1 --> async function portion of
> it.  The case class is emitted by the co process.  So this removed the
> modified case class from the processing graph.  When I try to restore from
> the savepoint, even if Allow Non Restored State is selected, the job fails
> to restore with the error "Deserialization of serializer erroed".
>
> So then I decided to completely eliminate the modified case class.  I
> removed all trace of it from the job, again only leaving the source 1 ->
> async function.  I tried to restore this job, with no traces of the case
> class, and still the job failed with the "Deserialization of serializer
> erroed" even when Allow Non Restored State is selected.
>
> Anyone seen anything like this?
>
> This is the error being generated:
>
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
> at java.io.ObjectInputStream.readSerialData(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 14 more
> Caused by: java.lang.ClassNotFoundException:
> com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> WARN
>  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  -
> Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
> at
> org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot.read(CompositeTypeSerializerConfigSnapshot.java:71)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerConfigSnapshotSerializationProxy.read(TypeSerializerSerializationUtil.java:445)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializerConfigSnapshot(TypeSerializerSerializationUtil.java:250)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:206)
> at
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
> at
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
> at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
> at java.io.ObjectInputStream.readClassDesc(Unknown Source)
> at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
> at java.io.ObjectInputStream.readObject0(Unknown Source)
> at java.io.ObjectInputStream.readObject(Unknown Source)
> at
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:375)
> ... 18 more
> Caused by:
> java.lang.ClassNotFoundException: com.somewhere.TestJob$$anon$13$$anon$3
> at java.net.URLClassLoader.findClass(Unknown Source)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at
> org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$ChildFirstClassLoader.loadClass(FlinkUserCodeClassLoaders.java:128)
> at java.lang.ClassLoader.loadClass(Unknown Source)
> at java.lang.Class.forName0(Native Method)
> at java.lang.Class.forName(Unknown Source)
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:73)
> at
> org.apache.flink.util.InstantiationUtil$FailureTolerantObjectInputStream.readClassDescriptor(InstantiationUtil.java:204)
> ... 24 more
> ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           -
> Error during disposal of stream operator.
> java.lang.NullPointerException
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.stopResources(AsyncWaitOperator.java:353)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.dispose(AsyncWaitOperator.java:330)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:446)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:351)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> INFO  org.apache.flink.runtime.taskmanager.Task                     -
> Source: Kafka Topic -> Async Function (1/1)
> (1de078fb77acdd16b7e021fb3e70339f) switched from RUNNING to FAILED.
> java.lang.IllegalStateException: Could not initialize operator state
> backend.
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:301)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
> at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.IOException: Unable to restore operator state
> [_async_wait_operator_state_]. The previous serializer of the operator
> state must be present; the serializer could have been removed from the
> classpath, or its implementation have changed and could not be loaded. This
> is a temporary restriction that will be fixed in future versions.
> at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:367)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
> at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
> ... 6 more
>
>
>

Reply via email to