[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2019-02-14 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16768028#comment-16768028
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10493:
-

Fixed for 1.8.0 via a656f9981ad84ea6b9a85d8bd4e4f29f75009791.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Assignee: Igal Shilman
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.8.0
>
>  Time Spent: 20m
>  Remaining Estimate: 0h
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Sou

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-05 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16675634#comment-16675634
 ] 

Elias Levy commented on FLINK-10493:


[~tzulitai] correct.  It is not evident that the Scala serializers generated by 
the macros are anonymous classes.  One only finds out when a job upgrade fails 
and starts digging through the code to find the source of the error.  
Specifically, the section of the documentation that discusses [Type Information 
in the Scala 
API|https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/types_serialization.html#type-information-in-the-scala-api]
 fails to mention this issue.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputSt

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-01 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671648#comment-16671648
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10493:
-

We currently have this: 
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/state/custom_serialization.html
which discourages anonymous classes, but it doesn't point directly away from 
using Scala macro-generated serializers.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.read

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-01 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671644#comment-16671644
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10493:
-

Meanwhile before we actually resolve and upgrade the Scala serialzers, I agree 
with Elias that we should have a much more prominent warning about Scala 
macro-generated serializers in the document.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.Objec

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-01 Thread Tzu-Li (Gordon) Tai (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671643#comment-16671643
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-10493:
-

[~elevy] [~aljoscha]

This issue is exactly what the rework of how serializer snapshotting and 
restoring works in 1.7 is targeted for.

To briefly put this, before 1.7, serializers were written via Java 
serialization alongside the serializer snapshots, and is therefore error prone 
to Java serialization problems such as the one described here in the ticket.
After 1.7, once a serializer's snapshot has been upgraded to the new interface, 
the serializer will no longer be written. Instead, on restore, the snapshot of 
the prior serialized will be used as a factory to reinstantiate the prior 
serializer.

Unfortunately, in the upcoming 1.7, we haven't upgraded the Scala serializers' 
snapshots to the new interfaces yet, meaning that in 1.7 for the Scala 
serializers we will still be relying on Java serialization to obtain prior 
serializers, and the issue here still remains.

I think upgrading the Scala serializers (alongside other ones such as Pojo) 
will be a high priority for Flink 1.8, so we should expect that this ticket to 
be fixed by then.

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-11-01 Thread JC (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16671403#comment-16671403
 ] 

JC commented on FLINK-10493:


I came across this ticket while researching the following exception i got today 
when redeploying a pipeline from a savepoint on flink 1.5.4, is it related ?
{code:java}
Caused by: org.apache.flink.util.FlinkException: Could not restore operator 
state backend for DoFnOperator_165cb4197f62583ea9e72c0199dcdb4f_(1/2) from any 
of the 1 provided restore options.
    at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:137)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:240)
    at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
    ... 5 more
Caused by: java.io.IOException: Unable to restore operator state 
[pushed-back-elements]. The previous serializer of the operator state must be 
present; the serializer could have been removed from the classpath, or its 
implementation have changed and could not be loaded. This is a temporary 
restriction that will be fixed in future versions.{code}

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.5.4, 1.6.0, 
> 1.6.1
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-12 Thread Aljoscha Krettek (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16647566#comment-16647566
 ] 

Aljoscha Krettek commented on FLINK-10493:
--

[~tzulitai] Will this still be an issue with the new state migration code that 
has the rework of how serializer snapshots work?

 

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 
> 1.5.4
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.ap

[jira] [Commented] (FLINK-10493) Macro generated CaseClassSerializer considered harmful

2018-10-04 Thread Elias Levy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10493?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16638834#comment-16638834
 ] 

Elias Levy commented on FLINK-10493:


This issue was noted [~tzulitai] back in June 2017 
[here|https://github.com/apache/flink/pull/4090#issuecomment-307109692].

> Macro generated CaseClassSerializer considered harmful
> --
>
> Key: FLINK-10493
> URL: https://issues.apache.org/jira/browse/FLINK-10493
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API, State Backends, Checkpointing, Type 
> Serialization System
>Affects Versions: 1.4.0, 1.4.1, 1.4.2, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 
> 1.5.4
>Reporter: Elias Levy
>Priority: Major
>
> The Flink Scala API uses implicits and macros to generate {{TypeInformation}} 
> and {{TypeSerializer}} objects for types.  In the case of Scala tuple and 
> case classes, the macro generates an [anonymous {{CaseClassSerializer}} 
> class|https://github.com/apache/flink/blob/8674b69964eae50cad024f2c5caf92a71bf21a09/flink-scala/src/main/scala/org/apache/flink/api/scala/codegen/TypeInformationGen.scala#L148-L161].
>   
> The Scala compiler will generate a name for the anonymous class that depends 
> on the relative position in the code of the macro invocation to other 
> anonymous classes.  If the code is changed such that the anonymous class 
> relative position changes, even if the overall logic of the code or the type 
> in question do not change, the name of the serializer class will change.
> That will result in errors, such as the one below, if the job is restored 
> from a savepoint, as the serializer to read the data in the savepoint will no 
> longer be found, as its name will have changed.
> At the very least, there should be a prominent warning in the documentation 
> about this issue.  Minor code changes can result in jobs that can't restore 
> previous state.  Ideally, the use of anonymous classes should be deprecated 
> if possible.
> {noformat}
> WARN  org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil  
> - Deserialization of serializer errored; replacing with null.
> java.io.IOException: Unloadable class for type serializer.
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil$TypeSerializerSerializationProxy.read(TypeSerializerSerializationUtil.java:384)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:110)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.tryReadSerializer(TypeSerializerSerializationUtil.java:83)
>   at 
> org.apache.flink.api.common.typeutils.TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(TypeSerializerSerializationUtil.java:203)
>   at 
> org.apache.flink.runtime.state.OperatorBackendStateMetaInfoSnapshotReaderWriters$OperatorBackendStateMetaInfoReaderV2.readStateMetaInfo(OperatorBackendStateMetaInfoSnapshotReaderWriters.java:207)
>   at 
> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.read(OperatorBackendSerializationProxy.java:85)
>   at 
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.restore(DefaultOperatorStateBackend.java:351)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.createOperatorStateBackend(StreamTask.java:733)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initOperatorState(AbstractStreamOperator.java:299)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:692)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:679)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Unknown Source)
> Caused by: java.io.InvalidClassException: failed to read class descriptor
>   at java.io.ObjectInputStream.readNonProxyDesc(Unknown Source)
>   at java.io.ObjectInputStream.readClassDesc(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
>   at java.io.ObjectInputStream.readSerialData(Unknown Source)
>   at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
>   at java.io.ObjectInputStream.readObject0(Unknown Source)
>   at java.io.ObjectInputStream.readObject(Unknown Source)
>   at 
> org.apache.flink.api.common.typeutils.