[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702651#comment-17702651 ] John commented on FLINK-31356: -- I know the reason through debugging. My old pojo does not have a parameterless constructor, and the system uses *KryoSerializer* for serialization. After I add the parameterless constructor, the system uses *PojoSerializer* for serialization, but the property *_subclassSerializerCache_* still retains that my pojo should be deserialized with {*}KryoSerializer{*}, which causes problems. {code:java} TypeSerializer getSubclassSerializer(Class subclass) { TypeSerializer result = subclassSerializerCache.get(subclass); if (result == null) { result = createSubclassSerializer(subclass); subclassSerializerCache.put(subclass, result); } return result; } {code} normally by removing this cache when restoring, I hope the official can improve this problem later {code:java} PojoSerializer( Class clazz, Field[] fields, TypeSerializer[] fieldSerializers, LinkedHashMap, Integer> registeredClasses, TypeSerializer[] registeredSerializers, Map, TypeSerializer> subclassSerializerCache, ExecutionConfig executionConfig) { this.clazz = checkNotNull(clazz); this.fields = checkNotNull(fields); this.numFields = fields.length; this.fieldSerializers = checkNotNull(fieldSerializers); this.registeredClasses = checkNotNull(registeredClasses); this.registeredSerializers = checkNotNull(registeredSerializers); this.subclassSerializerCache = checkNotNull(subclassSerializerCache); this.subclassSerializerCache.entrySet().removeIf(next -> "xxx".equals(next.getKey().getName())); this.executionConfig = checkNotNull(executionConfig); this.cl = Thread.currentThread().getContextClassLoader(); } {code} > Serialize garbled characters at checkpoint > -- > > Key: FLINK-31356 > URL: https://issues.apache.org/jira/browse/FLINK-31356 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.13.6 >Reporter: John >Priority: Major > > > {panel:title=The last checkpoint of the program was successful} > 2023-03-07 08:33:16,085 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job > 8b5720a4a40f50b995c97c6fe5b93079. > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in > 849 ms). > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 39126 as completed for source Source: kafkaDataStream. > 2023-03-07 08:36:10,444 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to > FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). > java.lang.RuntimeException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) > ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] > {panel} > {panel:title=But from this checkpoint restore, it can't be decoded} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at >
[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697773#comment-17697773 ] John commented on FLINK-31356: -- [~Yanfei Lei] I have a mapState. It turns out that pojo has only one parameter constructor, I first manually savepoint the program and stop, then I add a parameterless constructor to the pojo, and restore the program from the savepoint just now. The program ran normally for a while, and because some sinks reported errors, the program itself tried to restart from the most recent checkpoint , and reported the above error > Serialize garbled characters at checkpoint > -- > > Key: FLINK-31356 > URL: https://issues.apache.org/jira/browse/FLINK-31356 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.13.6 >Reporter: John >Priority: Major > > > {panel:title=The last checkpoint of the program was successful} > 2023-03-07 08:33:16,085 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job > 8b5720a4a40f50b995c97c6fe5b93079. > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in > 849 ms). > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 39126 as completed for source Source: kafkaDataStream. > 2023-03-07 08:36:10,444 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to > FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). > java.lang.RuntimeException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) > ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] > {panel} > {panel:title=But from this checkpoint restore, it can't be decoded} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: java.io.UTFDataFormatException: malformed input around byte
[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint
[ https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697746#comment-17697746 ] Yanfei Lei commented on FLINK-31356: Did any POJO fields update when restoring? Is it related to https://issues.apache.org/jira/browse/FLINK-21752? A common cause of "java.io.UTFDataFormatException: malformed input around byte" is the read and write in serializer are not symmetrical. > Serialize garbled characters at checkpoint > -- > > Key: FLINK-31356 > URL: https://issues.apache.org/jira/browse/FLINK-31356 > Project: Flink > Issue Type: Bug > Components: API / Type Serialization System >Affects Versions: 1.13.6 >Reporter: John >Priority: Major > > > {panel:title=The last checkpoint of the program was successful} > 2023-03-07 08:33:16,085 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job > 8b5720a4a40f50b995c97c6fe5b93079. > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed > checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in > 849 ms). > 2023-03-07 08:33:16,918 INFO > org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking > checkpoint 39126 as completed for source Source: kafkaDataStream. > 2023-03-07 08:36:10,444 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Sink: > mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to > FAILED on container_e38_1676011848026_0012_01_02 @ (dataPort=44633). > java.lang.RuntimeException: Writing records to JDBC failed. > at > org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153) > ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6] > {panel} > {panel:title=But from this checkpoint restore, it can't be decoded} > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from > any of the 1 provided restore options. > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed > when trying to restore heap backend > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > at > org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) > ~[flink-dist_2.11-1.13.6.jar:1.13.6] > ... 10 more > Caused by: java.io.UTFDataFormatException: malformed input around byte 32 > at java.io.DataInputStream.readUTF(DataInputStream.java:656) > ~[?:1.8.0_201] > at java.io.DataInputStream.readUTF(DataInputStream.java:564) >