Hi,Congxian:
不好意思,邮件消失在了邮件海中...
我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB 
StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。
public class App {

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

env.enableCheckpointing(2 * 60 * 1000);

DataStreamSource<Student> source = env.addSource(new SourceFunction<Student>() {

private volatile boolean running = true;

@Override
public void run(SourceContext<Student> ctx) throws Exception {
                Random rand = new Random();
                for (int i = 0; i < 100; i++) {
int id = rand.nextInt();
ctx.collect(new Student(id, "Tom" + id));
}
synchronized (this) {
while (running) {
this.wait();
}
                }
            }

@Override
public void cancel() {
synchronized (this) {
running = false;
                    this.notifyAll();
}
            }
        });

source.keyBy("id").process(new KeyedProcessFunction<Tuple, Student, Student>() {
@Override
public void processElement(Student value, Context ctx, Collector<Student> out) 
throws Exception {
                out.collect(value);
}
        }).addSink(new SinkFunction<Student>() {
@Override
public void invoke(Student value, Context context) throws Exception {
                System.out.println(value);
}
        });


env.execute("test keyed process operator state restore....");
}

@Getter
    @Setter
    @NoArgsConstructor
    @AllArgsConstructor
public static class Student implements Serializable {
private static final long serialVersionUID = 3909702675393996601L;
        private Integer id;
        private String name;
}
}

下面附上开启了DEBUG的log:

2020-04-14 11:42:44,679 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Establish 
JobManager connection for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,684 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Offer reserved 
slots to the leader of job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,727 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new allocation id ed04b5323aa885406201e85c9f8b7c78 for local state 
stores for job 6fd13de6e9c84a51425f7cc34ce94940.

2020-04-14 11:42:44,729 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new local state store with configuration 
LocalRecoveryConfig{localRecoveryMode=false, 
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
 jobID=6fd13de6e9c84a51425f7cc34ce94940, 
jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for 
6fd13de6e9c84a51425f7cc34ce94940 - bc764cd8ddf7a0cff126f51c16239658 - 0 under 
allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,742 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory  - Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Initialized 
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory@41801faf

2020-04-14 11:42:44,747 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task 
Source: Custom Source (1/1).

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) 
switched from CREATED to DEPLOYING.

2020-04-14 11:42:44,748 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Creating FileSystem stream leak safety net for task Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]

2020-04-14 11:42:44,751 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Loading JAR files for task Source: Custom Source (1/1) 
(ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].

2020-04-14 11:42:44,752 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl  - Activate slot 
ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,772 DEBUG 
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  - 
Registered new local state store with configuration 
LocalRecoveryConfig{localRecoveryMode=false, 
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
 jobID=6fd13de6e9c84a51425f7cc34ce94940, 
jobVertexID=20ba6b65f97481d5570070de90e4e791, subtaskIndex=0}} for 
6fd13de6e9c84a51425f7cc34ce94940 - 20ba6b65f97481d5570070de90e4e791 - 0 under 
allocation id ed04b5323aa885406201e85c9f8b7c78.

2020-04-14 11:42:44,786 DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory  
- KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): 
Created 1 input channels (local: 1, remote: 0, unknown: 0).

2020-04-14 11:42:44,788 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received task 
KeyedProcess -> Sink: Unnamed (1/1).

2020-04-14 11:42:44,795 INFO  org.apache.flink.runtime.taskmanager.Task         
            - KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d) switched from CREATED to DEPLOYING.

2020-04-14 11:42:44,805 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Creating FileSystem stream leak safety net for task KeyedProcess 
-> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING]

2020-04-14 11:42:44,812 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Loading JAR files for task KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].

2020-04-14 11:42:44,817 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore 
            - Copying from 
hdfs://nameservice1/data/flink1_10/ha/flink1_10_0/blob/job_6fd13de6e9c84a51425f7cc34ce94940/blob_p-6581a081d862993cf5a06573dbb6621fef1e46b2-f795a9ecd636e88bdf7ddd7746b9ca06
 to 
/data/flink1_10/tmp/blobStore-e924cf2e-5e6c-48c2-893e-c2e9c0a809b6/incoming/temp-00000000.

2020-04-14 11:42:45,060 DEBUG 
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received 
heartbeat request from 4ff74d2e5ff4f66a88688fdeafd2d3ec.

2020-04-14 11:42:45,919 DEBUG org.apache.flink.runtime.taskmanager.Task         
            - Getting user code class loader for task 
ee17273414060c57d2d331a83d1a84fc at library cache manager took 1167 milliseconds

2020-04-14 11:42:45,920 DEBUG org.apache.flink.runtime.taskmanager.Task         
            - Getting user code class loader for task 
406f2d0b26fb4b1040ae5ac00028202d at library cache manager took 1108 milliseconds

2020-04-14 11:42:45,931 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Registering task at network: Source: Custom Source (1/1) 
(ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].

2020-04-14 11:42:45,931 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Registering task at network: KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].

2020-04-14 11:42:45,934 DEBUG 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool    - Using a local 
buffer pool with 2-10 buffers

2020-04-14 11:42:45,934 DEBUG 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool    - Using a local 
buffer pool with 0-8 buffers

2020-04-14 11:42:45,935 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - 
Registered ReleaseOnConsumptionResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 
[PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].

2020-04-14 11:42:45,935 DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel  - 
LocalInputChannel 
[0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc]: Requesting 
LOCAL subpartition 0 of partition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:45,935 DEBUG 
org.apache.flink.runtime.io.network.TaskEventDispatcher       - registering 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc

2020-04-14 11:42:45,935 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - 
Requesting subpartition 0 of ReleaseOnConsumptionResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 
[PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].

2020-04-14 11:42:45,935 DEBUG 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition  - Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Creating read view for 
subpartition 0 of partition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:45,937 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartition  - Created 
PipelinedSubpartitionView(index: 0) of ResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc

2020-04-14 11:42:45,961 INFO  org.apache.flink.runtime.taskmanager.Task         
            - KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d) switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,963 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Initializing 
KeyedProcess -> Sink: Unnamed (1/1).

2020-04-14 11:42:45,967 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - Loading state 
backend via factory 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,984 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Using 
partitioner HASH for output 0 of task Source: Custom Source

2020-04-14 11:42:45,992 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
predefined options: DEFAULT.

2020-04-14 11:42:45,992 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default 
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:45,994 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) 
switched from DEPLOYING to RUNNING.

2020-04-14 11:42:45,994 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Initializing 
Source: Custom Source (1/1).

2020-04-14 11:42:45,994 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask           - Loading state 
backend via factory 
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using 
predefined options: DEFAULT.

2020-04-14 11:42:45,995 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Using default 
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.

2020-04-14 11:42:46,033 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Invoking 
Source: Custom Source (1/1)

2020-04-14 11:42:46,042 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1) 
with empty state.

2020-04-14 11:42:46,057 DEBUG 
org.apache.flink.streaming.runtime.tasks.StreamTask           - Invoking 
KeyedProcess -> Sink: Unnamed (1/1)

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl           - Operator 
c09dc291fad93d575e015871097bfc60 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager 
and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,060 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1) 
with empty state.

2020-04-14 11:42:46,069 DEBUG 
org.apache.flink.runtime.state.TaskStateManagerImpl           - Operator 
20ba6b65f97481d5570070de90e4e791 has remote state 
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]}, 
operatorStateFromStream=StateObjectCollection{[]}, 
keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7,
 keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1, 
sharedState={}, 
privateState={MANIFEST-000006=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff',
 dataBytes=206}, OPTIONS-000010=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963
 [10379 bytes], 
CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4',
 dataBytes=16}}, metaStateHandle=File State: 
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a
 [1163 bytes], registered=false}]}, 
keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job 
manager and local state alternatives [] from local state store 
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.

2020-04-14 11:42:46,070 DEBUG 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Creating 
keyed state backend for 
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with 
state from alternative (1/1).

2020-04-14 11:42:46,071 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
load RocksDB native library and store it under '/data/flink1_10/tmp'

2020-04-14 11:42:46,071 DEBUG 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Attempting to 
create RocksDB native library folder 
/data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33

2020-04-14 11:42:46,078 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true

2020-04-14 11:42:46,079 DEBUG 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf  - 
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true

2020-04-14 11:42:46,080 DEBUG 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory  - 
Loaded default ResourceLeakDetector: 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee

2020-04-14 11:42:46,150 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Successfully 
loaded RocksDB native library

2020-04-14 11:42:46,154 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Getting managed 
memory shared cache for RocksDB.

2020-04-14 11:42:46,161 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend  - Obtained shared 
RocksDB cache of size 53687092 bytes

2020-04-14 11:42:46,495 DEBUG 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
  - Restoring keyed backend uid in operator 
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from incremental 
snapshot to 04ac09d6-1f1f-4a6c-a78d-74090c83b3c7.

2020-04-14 11:42:46,571 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  - 
Caught unexpected exception.

java.nio.file.NoSuchFileException: 
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at 
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

2020-04-14 11:42:46,576 WARN  
org.apache.flink.streaming.api.operators.BackendRestorerProcedure  - Exception 
while restoring keyed state backend for 
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from alternative 
(1/1), will retry while more alternatives are available.

org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected 
exception.

        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

Caused by: java.nio.file.NoSuchFileException: 
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at 
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        ... 15 more

2020-04-14 11:42:46,579 INFO  org.apache.flink.runtime.taskmanager.Task         
            - KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d) switched from RUNNING to FAILED.

java.lang.Exception: Exception while creating StreamOperatorStateContext.

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)

        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)

        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)

        at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from 
any of the 1 provided restore options.

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)

        ... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.

        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)

        at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)

        at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

        at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

        ... 11 more

Caused by: java.nio.file.NoSuchFileException: 
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT

        at 
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)

        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)

        at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)

        at 
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)

        at java.nio.file.Files.copy(Files.java:1274)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)

        at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)

        at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)

        ... 15 more

2020-04-14 11:42:46,588 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Freeing task resources for KeyedProcess -> Sink: Unnamed (1/1) 
(406f2d0b26fb4b1040ae5ac00028202d).

2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.taskmanager.Task         
            - Release task KeyedProcess -> Sink: Unnamed (1/1) network 
resources (state: FAILED).

2020-04-14 11:42:46,589 DEBUG 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate  - 
KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d): 
Releasing 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@23ae29cd.

2020-04-14 11:42:46,589 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartition  - 
ReleaseOnConsumptionResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]: Received consumed 
notification for subpartition 0.

2020-04-14 11:42:46,589 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - 
Received consume notification from ReleaseOnConsumptionResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].

2020-04-14 11:42:46,589 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartition  - Source: Custom 
Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Releasing 
ReleaseOnConsumptionResultPartition 
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc 
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].

2020-04-14 11:42:46,590 DEBUG 
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition  - Source: 
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Released 
PipelinedSubpartition#0 [number of buffers: 1 (0 bytes), number of buffers in 
backlog: 1, finished? false, read view? false].

2020-04-14 11:42:46,590 DEBUG 
org.apache.flink.runtime.io.network.partition.ResultPartitionManager  - 
Released partition 0d224b8294583b8fcdf469150870d2a4 produced by 
ee17273414060c57d2d331a83d1a84fc.

2020-04-14 11:42:46,590 INFO  org.apache.flink.runtime.taskmanager.Task         
            - Ensuring all FileSystem streams are closed for task KeyedProcess 
-> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [FAILED]

2020-04-14 11:42:46,603 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor            - Un-registering 
task and sending final execution state FAILED to JobManager for task 
KeyedProcess -> Sink: Unnamed (1/1) 406f2d0b26fb4b1040ae5ac00028202d.











在 2020-04-03 18:09:19,"Congxian Qiu" <qcx978132...@gmail.com> 写道:
>HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
>CheckpointMetadataLoadingTest 的相关测试。
>我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
>outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
>debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好
>
>Best,
>Congxian
>
>
>chenxyz <chen...@163.com> 于2020年4月1日周三 下午5:18写道:
>
>> Hi, 从贤,
>> 我查看了下HDFS,
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-01 16:50:13,"Congxian Qiu" <qcx978132...@gmail.com> 写道:
>> >Hi
>> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>> >从 TM 日志看像下载出错了,你可以看下
>>
>> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>> >
>> >Best,
>> >Congxian
>> >
>> >
>> >chenxyz <chen...@163.com> 于2020年4月1日周三 下午3:02写道:
>> >
>> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> >> KeyedProcessOperator。这个问题怎么解决呢?
>> >>
>> >> 版本:1.10 standalone
>> >>
>> >> 配置信息:
>> >>
>> >> state.backend: rocksdb
>> >>
>> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>> >>
>> >> state.backend.incremental: true
>> >>
>> >> jobmanager.execution.failover-strategy: region
>> >>
>> >> io.tmp.dirs: /data/flink1_10/tmp
>> >>
>> >>
>> >>
>> >>
>> >> 任务的checkpoint配置:
>> >>
>> >> env.enableCheckpointing(2 * 60 * 1000);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> >>
>> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>> >>
>> >> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> >>
>> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> >>
>> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >>
>> >>
>> >>
>> >>
>> >> 日志信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 11:13:03
>> >>
>> >> java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> >> state backend for
>> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
>> the
>> >> 1 provided restore options.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> ... 9 more
>> >>
>> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught
>> >> unexpected exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> ... 11 more
>> >>
>> >> Caused by: java.nio.file.NoSuchFileException:
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >> ->
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>> >>
>> >> at
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>> >>
>> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> >>
>> >> at
>> >>
>> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>> >>
>> >> at java.nio.file.Files.createLink(Files.java:1086)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> TaskManager的报错信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 14:48:10,726 ERROR
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder  -
>> >> Caught unexpected exception.
>> >>
>> >> java.io.InterruptedIOException: Interrupted while waiting for data to
>> be
>> >> acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> 2020-04-01 14:48:10,726 WARN
>> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure  -
>> >> Exception while restoring keyed state backend for
>> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
>> >> alternative (1/1), will retry while more alternatives are available.
>> >>
>> >> org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected
>> >> exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting
>> for
>> >> data to be acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>>

回复