Hi,Congxian:
不好意思,邮件消失在了邮件海中...
我是这么复现的,直接重启运行这个任务的TM。然后就会出现KeyedProcessFunction恢复失败。只有RocksDB
StateBackend会出现这种错误,使用HDFS作为FsBackend可以正常恢复任务。一开始我以为是KeyedProcessFunction里面的自定义State恢复失败,最后写了一个空的KeyedProcessFunction也不能成功恢复任务。下面附上一个简单的Demo。
public class App {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(2 * 60 * 1000);
DataStreamSource<Student> source = env.addSource(new SourceFunction<Student>() {
private volatile boolean running = true;
@Override
public void run(SourceContext<Student> ctx) throws Exception {
Random rand = new Random();
for (int i = 0; i < 100; i++) {
int id = rand.nextInt();
ctx.collect(new Student(id, "Tom" + id));
}
synchronized (this) {
while (running) {
this.wait();
}
}
}
@Override
public void cancel() {
synchronized (this) {
running = false;
this.notifyAll();
}
}
});
source.keyBy("id").process(new KeyedProcessFunction<Tuple, Student, Student>() {
@Override
public void processElement(Student value, Context ctx, Collector<Student> out)
throws Exception {
out.collect(value);
}
}).addSink(new SinkFunction<Student>() {
@Override
public void invoke(Student value, Context context) throws Exception {
System.out.println(value);
}
});
env.execute("test keyed process operator state restore....");
}
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public static class Student implements Serializable {
private static final long serialVersionUID = 3909702675393996601L;
private Integer id;
private String name;
}
}
下面附上开启了DEBUG的log:
2020-04-14 11:42:44,679 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Establish
JobManager connection for job 6fd13de6e9c84a51425f7cc34ce94940.
2020-04-14 11:42:44,684 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Offer reserved
slots to the leader of job 6fd13de6e9c84a51425f7cc34ce94940.
2020-04-14 11:42:44,727 DEBUG
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
Registered new allocation id ed04b5323aa885406201e85c9f8b7c78 for local state
stores for job 6fd13de6e9c84a51425f7cc34ce94940.
2020-04-14 11:42:44,729 DEBUG
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
Registered new local state store with configuration
LocalRecoveryConfig{localRecoveryMode=false,
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
jobID=6fd13de6e9c84a51425f7cc34ce94940,
jobVertexID=bc764cd8ddf7a0cff126f51c16239658, subtaskIndex=0}} for
6fd13de6e9c84a51425f7cc34ce94940 - bc764cd8ddf7a0cff126f51c16239658 - 0 under
allocation id ed04b5323aa885406201e85c9f8b7c78.
2020-04-14 11:42:44,742 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory - Source:
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Initialized
org.apache.flink.runtime.io.network.partition.ResultPartitionFactory@41801faf
2020-04-14 11:42:44,747 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task
Source: Custom Source (1/1).
2020-04-14 11:42:44,748 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc)
switched from CREATED to DEPLOYING.
2020-04-14 11:42:44,748 INFO org.apache.flink.runtime.taskmanager.Task
- Creating FileSystem stream leak safety net for task Source:
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc) [DEPLOYING]
2020-04-14 11:42:44,751 INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task Source: Custom Source (1/1)
(ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].
2020-04-14 11:42:44,752 INFO
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl - Activate slot
ed04b5323aa885406201e85c9f8b7c78.
2020-04-14 11:42:44,772 DEBUG
org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager -
Registered new local state store with configuration
LocalRecoveryConfig{localRecoveryMode=false,
localStateDirectories=LocalRecoveryDirectoryProvider{rootDirectories=[/data/flink1_10/tmp/localState/aid_ed04b5323aa885406201e85c9f8b7c78],
jobID=6fd13de6e9c84a51425f7cc34ce94940,
jobVertexID=20ba6b65f97481d5570070de90e4e791, subtaskIndex=0}} for
6fd13de6e9c84a51425f7cc34ce94940 - 20ba6b65f97481d5570070de90e4e791 - 0 under
allocation id ed04b5323aa885406201e85c9f8b7c78.
2020-04-14 11:42:44,786 DEBUG
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFactory
- KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d):
Created 1 input channels (local: 1, remote: 0, unknown: 0).
2020-04-14 11:42:44,788 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received task
KeyedProcess -> Sink: Unnamed (1/1).
2020-04-14 11:42:44,795 INFO org.apache.flink.runtime.taskmanager.Task
- KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d) switched from CREATED to DEPLOYING.
2020-04-14 11:42:44,805 INFO org.apache.flink.runtime.taskmanager.Task
- Creating FileSystem stream leak safety net for task KeyedProcess
-> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING]
2020-04-14 11:42:44,812 INFO org.apache.flink.runtime.taskmanager.Task
- Loading JAR files for task KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].
2020-04-14 11:42:44,817 DEBUG org.apache.flink.runtime.blob.FileSystemBlobStore
- Copying from
hdfs://nameservice1/data/flink1_10/ha/flink1_10_0/blob/job_6fd13de6e9c84a51425f7cc34ce94940/blob_p-6581a081d862993cf5a06573dbb6621fef1e46b2-f795a9ecd636e88bdf7ddd7746b9ca06
to
/data/flink1_10/tmp/blobStore-e924cf2e-5e6c-48c2-893e-c2e9c0a809b6/incoming/temp-00000000.
2020-04-14 11:42:45,060 DEBUG
org.apache.flink.runtime.taskexecutor.TaskExecutor - Received
heartbeat request from 4ff74d2e5ff4f66a88688fdeafd2d3ec.
2020-04-14 11:42:45,919 DEBUG org.apache.flink.runtime.taskmanager.Task
- Getting user code class loader for task
ee17273414060c57d2d331a83d1a84fc at library cache manager took 1167 milliseconds
2020-04-14 11:42:45,920 DEBUG org.apache.flink.runtime.taskmanager.Task
- Getting user code class loader for task
406f2d0b26fb4b1040ae5ac00028202d at library cache manager took 1108 milliseconds
2020-04-14 11:42:45,931 INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: Source: Custom Source (1/1)
(ee17273414060c57d2d331a83d1a84fc) [DEPLOYING].
2020-04-14 11:42:45,931 INFO org.apache.flink.runtime.taskmanager.Task
- Registering task at network: KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d) [DEPLOYING].
2020-04-14 11:42:45,934 DEBUG
org.apache.flink.runtime.io.network.buffer.LocalBufferPool - Using a local
buffer pool with 2-10 buffers
2020-04-14 11:42:45,934 DEBUG
org.apache.flink.runtime.io.network.buffer.LocalBufferPool - Using a local
buffer pool with 0-8 buffers
2020-04-14 11:42:45,935 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager -
Registered ReleaseOnConsumptionResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
[PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].
2020-04-14 11:42:45,935 DEBUG
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel -
LocalInputChannel
[0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc]: Requesting
LOCAL subpartition 0 of partition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.
2020-04-14 11:42:45,935 DEBUG
org.apache.flink.runtime.io.network.TaskEventDispatcher - registering
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
2020-04-14 11:42:45,935 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager -
Requesting subpartition 0 of ReleaseOnConsumptionResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
[PIPELINED_BOUNDED, 1 subpartitions, 1 pending consumptions].
2020-04-14 11:42:45,935 DEBUG
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source:
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Creating read view for
subpartition 0 of partition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc.
2020-04-14 11:42:45,937 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition - Created
PipelinedSubpartitionView(index: 0) of ResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
2020-04-14 11:42:45,961 INFO org.apache.flink.runtime.taskmanager.Task
- KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d) switched from DEPLOYING to RUNNING.
2020-04-14 11:42:45,963 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing
KeyedProcess -> Sink: Unnamed (1/1).
2020-04-14 11:42:45,967 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Loading state
backend via factory
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2020-04-14 11:42:45,984 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Using
partitioner HASH for output 0 of task Source: Custom Source
2020-04-14 11:42:45,992 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
predefined options: DEFAULT.
2020-04-14 11:42:45,992 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using default
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-04-14 11:42:45,994 INFO org.apache.flink.runtime.taskmanager.Task
- Source: Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc)
switched from DEPLOYING to RUNNING.
2020-04-14 11:42:45,994 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Initializing
Source: Custom Source (1/1).
2020-04-14 11:42:45,994 INFO
org.apache.flink.streaming.runtime.tasks.StreamTask - Loading state
backend via factory
org.apache.flink.contrib.streaming.state.RocksDBStateBackendFactory
2020-04-14 11:42:45,995 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
predefined options: DEFAULT.
2020-04-14 11:42:45,995 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using default
options factory: DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-04-14 11:42:46,033 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking
Source: Custom Source (1/1)
2020-04-14 11:42:46,042 DEBUG
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating
operator state backend for StreamSource_bc764cd8ddf7a0cff126f51c16239658_(1/1)
with empty state.
2020-04-14 11:42:46,057 DEBUG
org.apache.flink.streaming.runtime.tasks.StreamTask - Invoking
KeyedProcess -> Sink: Unnamed (1/1)
2020-04-14 11:42:46,060 DEBUG
org.apache.flink.runtime.state.TaskStateManagerImpl - Operator
c09dc291fad93d575e015871097bfc60 has remote state
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[]},
keyedStateFromStream=StateObjectCollection{[]}, stateSize=0} from job manager
and local state alternatives [] from local state store
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.
2020-04-14 11:42:46,060 DEBUG
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating
operator state backend for StreamSink_c09dc291fad93d575e015871097bfc60_(1/1)
with empty state.
2020-04-14 11:42:46,069 DEBUG
org.apache.flink.runtime.state.TaskStateManagerImpl - Operator
20ba6b65f97481d5570070de90e4e791 has remote state
SubtaskState{operatorStateFromBackend=StateObjectCollection{[]},
operatorStateFromStream=StateObjectCollection{[]},
keyedStateFromBackend=StateObjectCollection{[IncrementalRemoteKeyedStateHandle{backendIdentifier=04ac09d6-1f1f-4a6c-a78d-74090c83b3c7,
keyGroupRange=KeyGroupRange{startKeyGroup=0, endKeyGroup=127}, checkpointId=1,
sharedState={},
privateState={MANIFEST-000006=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/2ff261b8-f51c-42bf-9fab-93c6b119dcff',
dataBytes=206}, OPTIONS-000010=File State:
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/426c66a6-d32e-43c8-9873-550237ee0963
[10379 bytes],
CURRENT=ByteStreamStateHandle{handleName='hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/shared/bbbce7c9-ea02-4590-9b18-d7a322deb2f4',
dataBytes=16}}, metaStateHandle=File State:
hdfs://nameservice1/data/flink1_10/checkpoint/6fd13de6e9c84a51425f7cc34ce94940/chk-1/9215630d-632e-48f6-b668-7dc235a8ff7a
[1163 bytes], registered=false}]},
keyedStateFromStream=StateObjectCollection{[]}, stateSize=11764} from job
manager and local state alternatives [] from local state store
org.apache.flink.runtime.state.NoOpTaskLocalStateStoreImpl@3fd9cf09.
2020-04-14 11:42:46,070 DEBUG
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Creating
keyed state backend for
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) and restoring with
state from alternative (1/1).
2020-04-14 11:42:46,071 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to
load RocksDB native library and store it under '/data/flink1_10/tmp'
2020-04-14 11:42:46,071 DEBUG
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Attempting to
create RocksDB native library folder
/data/flink1_10/tmp/rocksdb-lib-a5f35d4dd06539876a20dbabc82a7f33
2020-04-14 11:42:46,078 DEBUG
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf -
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkAccessible: true
2020-04-14 11:42:46,079 DEBUG
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractByteBuf -
-Dorg.apache.flink.shaded.netty4.io.netty.buffer.checkBounds: true
2020-04-14 11:42:46,080 DEBUG
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory -
Loaded default ResourceLeakDetector:
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector@28a9bbee
2020-04-14 11:42:46,150 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Successfully
loaded RocksDB native library
2020-04-14 11:42:46,154 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Getting managed
memory shared cache for RocksDB.
2020-04-14 11:42:46,161 INFO
org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Obtained shared
RocksDB cache of size 53687092 bytes
2020-04-14 11:42:46,495 DEBUG
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation
- Restoring keyed backend uid in operator
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from incremental
snapshot to 04ac09d6-1f1f-4a6c-a78d-74090c83b3c7.
2020-04-14 11:42:46,571 ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
Caught unexpected exception.
java.nio.file.NoSuchFileException:
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)
at
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
2020-04-14 11:42:46,576 WARN
org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception
while restoring keyed state backend for
KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from alternative
(1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.nio.file.NoSuchFileException:
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)
at
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 more
2020-04-14 11:42:46,579 INFO org.apache.flink.runtime.taskmanager.Task
- KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d) switched from RUNNING to FAILED.
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state
backend for KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791_(1/1) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
... 9 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
unexpected exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more
Caused by: java.nio.file.NoSuchFileException:
/data/flink1_10/tmp/flink-io-cb7e58a9-8b06-4ada-b8eb-ad754cd2470e/job_6fd13de6e9c84a51425f7cc34ce94940_op_KeyedProcessOperator_20ba6b65f97481d5570070de90e4e791__1_1__uuid_0079fc9d-30c1-452a-86b9-eaa71eab7593/2508f380-d4ae-4316-9ec8-dd13ae5d3c2e/CURRENT
at
sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
at sun.nio.fs.UnixCopyFile.copy(UnixCopyFile.java:526)
at
sun.nio.fs.UnixFileSystemProvider.copy(UnixFileSystemProvider.java:253)
at java.nio.file.Files.copy(Files.java:1274)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:483)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
at
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
... 15 more
2020-04-14 11:42:46,588 INFO org.apache.flink.runtime.taskmanager.Task
- Freeing task resources for KeyedProcess -> Sink: Unnamed (1/1)
(406f2d0b26fb4b1040ae5ac00028202d).
2020-04-14 11:42:46,589 DEBUG org.apache.flink.runtime.taskmanager.Task
- Release task KeyedProcess -> Sink: Unnamed (1/1) network
resources (state: FAILED).
2020-04-14 11:42:46,589 DEBUG
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate -
KeyedProcess -> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d):
Releasing
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate@23ae29cd.
2020-04-14 11:42:46,589 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition -
ReleaseOnConsumptionResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions]: Received consumed
notification for subpartition 0.
2020-04-14 11:42:46,589 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager -
Received consume notification from ReleaseOnConsumptionResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].
2020-04-14 11:42:46,589 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartition - Source: Custom
Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Releasing
ReleaseOnConsumptionResultPartition
0d224b8294583b8fcdf469150870d2a4@ee17273414060c57d2d331a83d1a84fc
[PIPELINED_BOUNDED, 1 subpartitions, 0 pending consumptions].
2020-04-14 11:42:46,590 DEBUG
org.apache.flink.runtime.io.network.partition.PipelinedSubpartition - Source:
Custom Source (1/1) (ee17273414060c57d2d331a83d1a84fc): Released
PipelinedSubpartition#0 [number of buffers: 1 (0 bytes), number of buffers in
backlog: 1, finished? false, read view? false].
2020-04-14 11:42:46,590 DEBUG
org.apache.flink.runtime.io.network.partition.ResultPartitionManager -
Released partition 0d224b8294583b8fcdf469150870d2a4 produced by
ee17273414060c57d2d331a83d1a84fc.
2020-04-14 11:42:46,590 INFO org.apache.flink.runtime.taskmanager.Task
- Ensuring all FileSystem streams are closed for task KeyedProcess
-> Sink: Unnamed (1/1) (406f2d0b26fb4b1040ae5ac00028202d) [FAILED]
2020-04-14 11:42:46,603 INFO
org.apache.flink.runtime.taskexecutor.TaskExecutor - Un-registering
task and sending final execution state FAILED to JobManager for task
KeyedProcess -> Sink: Unnamed (1/1) 406f2d0b26fb4b1040ae5ac00028202d.
在 2020-04-03 18:09:19,"Congxian Qiu" <[email protected]> 写道:
>HDFS 上的路径和本地不一样,如果你要看 HDFS 路径的话,可能需要看 Checkjpoint Meta 的相关信息,这个比较麻烦,可以参考
>CheckpointMetadataLoadingTest 的相关测试。
>我再看了一下你给的 TM Log,看上去是 148 行的 outputStream.close() 出错了(有个比较奇怪的现象是,这里的
>outputStream 是本地的文件,但是从错误栈看是 HadoopFileSystem)。你这个是稳定复现的问题吗?如果是的话,能否贴一下打开
>debug log,贴一下 JM/TM log,另外能给一个可复现的 作业更好
>
>Best,
>Congxian
>
>
>chenxyz <[email protected]> 于2020年4月1日周三 下午5:18写道:
>
>> Hi, 从贤,
>> 我查看了下HDFS,
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160下面是空的,也没有db这一层目录。
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-04-01 16:50:13,"Congxian Qiu" <[email protected]> 写道:
>> >Hi
>> >Restore 可以大致分为两部分,1)下载文件;2)从下载的文件恢复
>> >从 TM 日志看像下载出错了,你可以看下
>>
>> >/data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >这个文件是不是存在 double check 下,如果是下载失败,你需要确认下下载失败的原因
>> >
>> >Best,
>> >Congxian
>> >
>> >
>> >chenxyz <[email protected]> 于2020年4月1日周三 下午3:02写道:
>> >
>> >> 任务启用rocksdb作为状态后端,任务出现异常重启时经常失败Could not restore keyed state backend for
>> >> KeyedProcessOperator。这个问题怎么解决呢?
>> >>
>> >> 版本:1.10 standalone
>> >>
>> >> 配置信息:
>> >>
>> >> state.backend: rocksdb
>> >>
>> >> state.checkpoints.dir: hdfs://nameservice1/data/flink1_10/checkpoint
>> >>
>> >> state.backend.incremental: true
>> >>
>> >> jobmanager.execution.failover-strategy: region
>> >>
>> >> io.tmp.dirs: /data/flink1_10/tmp
>> >>
>> >>
>> >>
>> >>
>> >> 任务的checkpoint配置:
>> >>
>> >> env.enableCheckpointing(2 * 60 * 1000);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
>> >>
>> >> env.getCheckpointConfig().setMinPauseBetweenCheckpoints(2 * 60 * 1000);
>> >>
>> >> env.getCheckpointConfig().setCheckpointTimeout(60000);
>> >>
>> >> env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
>> >>
>> >> env.getCheckpointConfig().setTolerableCheckpointFailureNumber(12);
>> >>
>> >>
>> >>
>> env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
>> >>
>> >>
>> >>
>> >>
>> >> 日志信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 11:13:03
>> >>
>> >> java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> >> state backend for
>> >> KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926_(1/2) from any of
>> the
>> >> 1 provided restore options.
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> ... 9 more
>> >>
>> >> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught
>> >> unexpected exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> ... 11 more
>> >>
>> >> Caused by: java.nio.file.NoSuchFileException:
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/db/001888.sst
>> >> ->
>> >>
>> /data/flink1_10/tmp/flink-io-01229972-48d4-4229-ac8c-33f0edfe5b7c/job_5ec178dc885a8f1a64c1925e182562e3_op_KeyedProcessOperator_da2b90ef97e5c844980791c8fe08b926__1_2__uuid_772b4663-f633-4ed5-a67a-d1904760a160/3e979cc5-82c1-42bf-a269-1ce6e43f3e10/001888.sst
>> >>
>> >> at
>> sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
>> >>
>> >> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
>> >>
>> >> at
>> >>
>> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
>> >>
>> >> at java.nio.file.Files.createLink(Files.java:1086)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:480)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:218)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:194)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> TaskManager的报错信息:
>> >>
>> >>
>> >>
>> >>
>> >> 2020-04-01 14:48:10,726 ERROR
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
>> >> Caught unexpected exception.
>> >>
>> >> java.io.InterruptedIOException: Interrupted while waiting for data to
>> be
>> >> acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> 2020-04-01 14:48:10,726 WARN
>> >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
>> >> Exception while restoring keyed state backend for
>> >> KeyedProcessOperator_0cead2d40df6e304d3168f6366c79e3f_(1/2) from
>> >> alternative (1/1), will retry while more alternatives are available.
>> >>
>> >> org.apache.flink.runtime.state.BackendBuildingException: Caught
>> unexpected
>> >> exception.
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
>> >>
>> >> at
>> >>
>> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
>> >>
>> >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
>> >>
>> >> at java.lang.Thread.run(Thread.java:748)
>> >>
>> >> Caused by: java.io.InterruptedIOException: Interrupted while waiting
>> for
>> >> data to be acknowledged by pipeline
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.waitForAckedSeqno(DFSOutputStream.java:2147)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.hdfs.DFSOutputStream.flushInternal(DFSOutputStream.java:2128)
>> >>
>> >> at
>> org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2229)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
>> >>
>> >> at
>> >>
>> org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.fs.hdfs.HadoopDataOutputStream.close(HadoopDataOutputStream.java:52)
>> >>
>> >> at
>> >>
>> org.apache.flink.core.fs.ClosingFSDataOutputStream.close(ClosingFSDataOutputStream.java:64)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:148)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:109)
>> >>
>> >> at
>> >>
>> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:50)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1626)
>> >>
>> >> at
>> >>
>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.asyncRunStage(CompletableFuture.java:1640)
>> >>
>> >> at
>> >>
>> java.util.concurrent.CompletableFuture.runAsync(CompletableFuture.java:1858)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:83)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.transferAllStateDataToDirectory(RocksDBStateDownloader.java:67)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.transferRemoteStateToLocalDirectory(RocksDBIncrementalRestoreOperation.java:230)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:195)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:168)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:154)
>> >>
>> >> at
>> >>
>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
>> >>
>> >> ... 15 more
>>