??????:

??????flink, ??????????state checkpoint??????:
1) ????????socket????????????keyed state backend; ????job??????
2) ????????????????socket??????job??????failed????
3) ????????????????socket????????
4) ????flink??????socket, ??job??????????????????????, 
??????MapState????????????


??????
    flink: 1.8.0
    flink??hadoop????flink-shaded-hadoop2-uber-2.6.5-1.8.0.jar
    hdfs????????: hadoop2.6.0-cdh5.16.1
    ??????standalone?????? state backend??fssystem??rocksdb????????



??????log:


Caused by: java.io.IOException: Stream closed
        at 
org.apache.hadoop.hdfs.DFSInputStream.readWithStrategy(DFSInputStream.java:892)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:963)
        at org.apache.hadoop.hdfs.DFSInputStream.read(DFSInputStream.java:757)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at 
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:84)
        at 
org.apache.flink.core.fs.FSDataInputStreamWrapper.read(FSDataInputStreamWrapper.java:51)
        at 
org.apache.flink.runtime.util.ForwardingInputStream.read(ForwardingInputStream.java:41)
        at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
        at org.apache.flink.types.StringValue.readString(StringValue.java:769)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:75)
        at 
org.apache.flink.api.common.typeutils.base.StringSerializer.deserialize(StringSerializer.java:33)
        at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
        at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
        at 
org.apache.flink.runtime.state.heap.StateTableByKeyGroupReaders.lambda$createV2PlusReader$0(StateTableByKeyGroupReaders.java:74)
        at 
org.apache.flink.runtime.state.KeyGroupPartitioner$PartitioningResultKeyGroupReader.readMappingsInKeyGroup(KeyGroupPartitioner.java:297)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readKeyGroupStateData(HeapRestoreOperation.java:290)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.readStateHandleStateData(HeapRestoreOperation.java:251)
        at 
org.apache.flink.runtime.state.heap.HeapRestoreOperation.restore(HeapRestoreOperation.java:153)
        at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder.build(HeapKeyedStateBackendBuilder.java:127)

回复