??????:
??????flink, ??????????state checkpoint??????:
1) ????????socket????????????keyed state backend; ????job??????
2) ????????????????socket??????job??????failed????
3) ????????????????socket????????
4) ????flink??????socket, ??job??????????????????????,
??????MapState????????????
??????
flink: 1.8.0
flink??hadoop????flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
hdfs????????: hadoop2.6.0-cdh5.16.1
??????standalone?????? state backend??fssystem??rocksdb????????
??????log:
Caused by: java.io.IOException: Stream closed
at
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
at java.io.FilterInputStream.read(FilterInputStream.java:83)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
at
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
at
org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
at org.apache.flink.types.StringValue.readString(StringValue.java:769)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
at
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
at
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
at
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
at
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)