[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-20 Thread John (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17702651#comment-17702651
 ] 

John commented on FLINK-31356:
--

I know the reason through debugging.

My old pojo does not have a parameterless constructor, and the system uses 
*KryoSerializer* for serialization.

After I add the parameterless constructor, the system uses *PojoSerializer* for 
serialization, but the property *_subclassSerializerCache_* still retains that 
my pojo should be deserialized with {*}KryoSerializer{*}, which causes problems.
{code:java}
TypeSerializer getSubclassSerializer(Class subclass) {
TypeSerializer result = subclassSerializerCache.get(subclass);
if (result == null) {
result = createSubclassSerializer(subclass);
subclassSerializerCache.put(subclass, result);
}
return result;
} {code}
normally by removing this cache when restoring, I hope the official can improve 
this problem later
{code:java}
PojoSerializer(
Class clazz,
Field[] fields,
TypeSerializer[] fieldSerializers,
LinkedHashMap, Integer> registeredClasses,
TypeSerializer[] registeredSerializers,
Map, TypeSerializer> subclassSerializerCache,
ExecutionConfig executionConfig) {

this.clazz = checkNotNull(clazz);
this.fields = checkNotNull(fields);
this.numFields = fields.length;
this.fieldSerializers = checkNotNull(fieldSerializers);
this.registeredClasses = checkNotNull(registeredClasses);
this.registeredSerializers = checkNotNull(registeredSerializers);
this.subclassSerializerCache = checkNotNull(subclassSerializerCache);
this.subclassSerializerCache.entrySet().removeIf(next -> 
"xxx".equals(next.getKey().getName()));
this.executionConfig = checkNotNull(executionConfig);
this.cl = Thread.currentThread().getContextClassLoader();
} {code}

> Serialize garbled characters at checkpoint
> --
>
> Key: FLINK-31356
> URL: https://issues.apache.org/jira/browse/FLINK-31356
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.6
>Reporter: John
>Priority: Major
>
>  
> {panel:title=The last checkpoint of the program was successful}
> 2023-03-07 08:33:16,085 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
> 8b5720a4a40f50b995c97c6fe5b93079.
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
> checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
> 849 ms).
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 39126 as completed for source Source: kafkaDataStream.
> 2023-03-07 08:36:10,444 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
> FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
> java.lang.RuntimeException: Writing records to JDBC failed.
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
>  ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
> {panel}
> {panel:title=But from this checkpoint restore, it can't be decoded}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> 

[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-08 Thread John (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697773#comment-17697773
 ] 

John commented on FLINK-31356:
--

[~Yanfei Lei] 
I have a mapState. 
It turns out that pojo has only one parameter constructor, I first manually 
savepoint the program and stop, then I add a parameterless constructor to the 
pojo, and restore the program from the savepoint just now.   
The program ran normally for a while, and because some sinks reported errors, 
the program itself tried to restart from the most recent checkpoint , and 
reported the above error

> Serialize garbled characters at checkpoint
> --
>
> Key: FLINK-31356
> URL: https://issues.apache.org/jira/browse/FLINK-31356
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.6
>Reporter: John
>Priority: Major
>
>  
> {panel:title=The last checkpoint of the program was successful}
> 2023-03-07 08:33:16,085 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
> 8b5720a4a40f50b995c97c6fe5b93079.
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
> checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
> 849 ms).
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 39126 as completed for source Source: kafkaDataStream.
> 2023-03-07 08:36:10,444 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
> FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
> java.lang.RuntimeException: Writing records to JDBC failed.
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
>  ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
> {panel}
> {panel:title=But from this checkpoint restore, it can't be decoded}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: java.io.UTFDataFormatException: malformed input around byte 

[jira] [Commented] (FLINK-31356) Serialize garbled characters at checkpoint

2023-03-07 Thread Yanfei Lei (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-31356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17697746#comment-17697746
 ] 

Yanfei Lei commented on FLINK-31356:


Did any POJO fields update when restoring? Is it related to 
https://issues.apache.org/jira/browse/FLINK-21752?

A common cause of "java.io.UTFDataFormatException: malformed input around byte" 
 is the read and write in serializer are not symmetrical.

> Serialize garbled characters at checkpoint
> --
>
> Key: FLINK-31356
> URL: https://issues.apache.org/jira/browse/FLINK-31356
> Project: Flink
>  Issue Type: Bug
>  Components: API / Type Serialization System
>Affects Versions: 1.13.6
>Reporter: John
>Priority: Major
>
>  
> {panel:title=The last checkpoint of the program was successful}
> 2023-03-07 08:33:16,085 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Triggering 
> checkpoint 39126 (type=CHECKPOINT) @ 1678149196059 for job 
> 8b5720a4a40f50b995c97c6fe5b93079.
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Completed 
> checkpoint 39126 for job 8b5720a4a40f50b995c97c6fe5b93079 (71251394 bytes in 
> 849 ms).
> 2023-03-07 08:33:16,918 INFO  
> org.apache.flink.runtime.source.coordinator.SourceCoordinator [] - Marking 
> checkpoint 39126 as completed for source Source: kafkaDataStream.
> 2023-03-07 08:36:10,444 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Sink: 
> mysqlSink (1/2) (898af6700ac9cd087c763cef0b5585d4) switched from RUNNING to 
> FAILED on container_e38_1676011848026_0012_01_02 @  (dataPort=44633).
> java.lang.RuntimeException: Writing records to JDBC failed.
>     at 
> org.apache.flink.connector.jdbc.internal.JdbcBatchingOutputFormat.checkFlushException(JdbcBatchingOutputFormat.java:153)
>  ~[flink-connector-jdbc_2.11-1.13.6.jar:1.13.6]
> {panel}
> {panel:title=But from this checkpoint restore, it can't be decoded}
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_a1b6a20a1eb2801464c79c8d018a24d1_(1/2) from 
> any of the 1 provided restore options.
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:160)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed 
> when trying to restore heap backend
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.restoreState(HeapKeyedStateBackendBuilder.java:177)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:111)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:131)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.hashmap.HashMapStateBackend.createKeyedStateBackend(HashMapStateBackend.java:73)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.runtime.state.StateBackend.createKeyedStateBackend(StateBackend.java:136)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
>  ~[flink-dist_2.11-1.13.6.jar:1.13.6]
>     ... 10 more
> Caused by: java.io.UTFDataFormatException: malformed input around byte 32
>     at java.io.DataInputStream.readUTF(DataInputStream.java:656) 
> ~[?:1.8.0_201]
>     at java.io.DataInputStream.readUTF(DataInputStream.java:564) 
>