???? cli ?????? ??jobmanager???? ???? bin/flink run -d -p 1 -s {savepointuri} 
/data/test.jar  ---->????????????????????????????????????
????webui??????http://jobmanager:8081  submit new  
job??????jar??????????????savepoint path???????????????? ----> 
????????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2020??7??20??(??????) ????2:30
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: Flink Cli ????????



Hi

?????????????? IDEA ????????

?????????????? web ui ?????????????????????????????? savepoint ???? flink run 
???????????????? web ui
??????????????????????????????????????????????????????????

Best,
Congxian


Z-Z <[email protected]&gt; ??2020??7??20?????? ????11:33??????

&gt; 
????taskmanager????????????????????????????????cli????????????webui??????????????
&gt; 2020-07-20 03:29:25,959 WARN&amp;nbsp;
&gt; org.apache.kafka.clients.consumer.ConsumerConfig&amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; - The configuration 'value.serializer' 
was supplied
&gt; but isn't a known config.
&gt; 2020-07-20 03:29:25,959 INFO&amp;nbsp;
&gt; org.apache.kafka.common.utils.AppInfoParser&amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;- Kafka 
version : 0.11.0.2
&gt; 2020-07-20 03:29:25,959 INFO&amp;nbsp;
&gt; org.apache.kafka.common.utils.AppInfoParser&amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;- Kafka 
commitId : 73be1e1168f91ee2
&gt; 2020-07-20 03:29:25,974 ERROR
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder&amp;nbsp;
&gt; - Caught unexpected exception.
&gt; java.lang.ArrayIndexOutOfBoundsException: 0
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.lang.Thread.run(Thread.java:748)
&gt; 2020-07-20 03:29:25,974 WARN&amp;nbsp;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure&amp;nbsp; -
&gt; Exception while restoring keyed state backend for
&gt; StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from alternative (1/1),
&gt; will retry while more alternatives are available.
&gt; org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
&gt; exception.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.lang.Thread.run(Thread.java:748)
&gt; Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 15 more
&gt; 2020-07-20 03:29:25,975 INFO&amp;nbsp;
&gt; org.apache.kafka.clients.producer.KafkaProducer&amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;- Closing the Kafka producer 
with timeoutMillis
&gt; = 9223372036854775807 ms.
&gt; 2020-07-20 03:29:25,979 INFO&amp;nbsp;
&gt; org.apache.flink.runtime.taskmanager.Task&amp;nbsp; &amp;nbsp; &amp;nbsp; 
&amp;nbsp; &amp;nbsp;
&gt; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp;- Map 
-&amp;gt; Filter -&amp;gt; Sink:
&gt; Unnamed (1/1) (ed554502aa995fe53f1cf0cb8adf633c) switched from RUNNING to
&gt; FAILED.
&gt; java.lang.Exception: Exception while creating StreamOperatorStateContext.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at 
java.lang.Thread.run(Thread.java:748)
&gt; Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
&gt; state backend for StreamMap_caf773fe289bfdb867e0b4bd0c431c5f_(1/1) from any
&gt; of the 1 provided restore options.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 9 more
&gt; Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught
&gt; unexpected exception.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ... 11 more
&gt; Caused by: java.lang.ArrayIndexOutOfBoundsException: 0
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.hasMetaDataFollowsFlag(RocksSnapshotUtil.java:45)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:223)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; ??????????
&gt; ??????savepoint????????RocksDBStateBackend????????
&gt; ????savepoint??????webui ??????????????????????????IDE??????savepoint??
&gt;
&gt;
&gt;
&gt;
&gt; ------------------ ???????? ------------------
&gt; ??????:
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "user-zh"
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 <
&gt; [email protected]&amp;gt;;
&gt; ????????:&amp;nbsp;2020??7??19??(??????) ????8:22
&gt; ??????:&amp;nbsp;"user-zh"<[email protected]&amp;gt;;
&gt;
&gt; ????:&amp;nbsp;Re: Flink Cli ????????
&gt;
&gt;
&gt;
&gt; Hi
&gt;
&gt; ?????????????????????????????????????? EOF ????????????????
&gt; 1 ???????? savepoint ?????? RocksDBStateBackend ????????
&gt; 2 ???????????? DFS ???? savepoint ???????????????????? DFS 
??????????????????????????????????
&gt;
&gt; Best,
&gt; Congxian
&gt;
&gt;
&gt; Z-Z <[email protected]&amp;gt; ??2020??7??17?????? ????11:10??????
&gt;
&gt; &amp;gt; Flink 1.10.0 ,taskmanager??????????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 2020-07-17 15:06:43,913 ERROR
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder&amp;amp;nbsp;
&gt; &amp;gt; - Caught unexpected exception.
&gt; &amp;gt; java.io.EOFException
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:197)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:169)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; 2020-07-17 15:06:43,914 WARN&amp;amp;nbsp;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure&amp;amp;nbsp;
&gt; -
&gt; &amp;gt; Exception while restoring keyed state backend for
&gt; &amp;gt; KeyedCoProcessOperator_00360b8021b192d84949201d4fea80f2_(1/1) from
&gt; &amp;gt; alternative (1/1), will retry while more alternatives are 
available.
&gt; &amp;gt; org.apache.flink.runtime.state.BackendBuildingException: Caught
&gt; unexpected
&gt; &amp;gt; exception.
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; Caused by: java.io.EOFException
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:197)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:169)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; ... 15 more
&gt; &amp;gt; 2020-07-17 15:06:43,915 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.clients.producer.KafkaProducer&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;- 
Closing the Kafka
&gt; producer with timeoutMillis
&gt; &amp;gt; = 9223372036854775807 ms.
&gt; &amp;gt; 2020-07-17 15:06:43,918 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Co-Keyed-Process -&amp;amp;gt; Flat Map
&gt; &amp;gt; -&amp;amp;gt; Sink: Unnamed (1/1) 
(bb8f0a84e07ef90b1e11ca2825e0efab)
&gt; switched from
&gt; &amp;gt; RUNNING to FAILED.
&gt; &amp;gt; java.lang.Exception: Exception while creating
&gt; StreamOperatorStateContext.
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; Caused by: org.apache.flink.util.FlinkException: Could not restore
&gt; keyed
&gt; &amp;gt; state backend for
&gt; &amp;gt; KeyedCoProcessOperator_00360b8021b192d84949201d4fea80f2_(1/1) from
&gt; any of
&gt; &amp;gt; the 1 provided restore options.
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; ... 9 more
&gt; &amp;gt; Caused by: 
org.apache.flink.runtime.state.BackendBuildingException:
&gt; Caught
&gt; &amp;gt; unexpected exception.
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; ... 11 more
&gt; &amp;gt; Caused by: java.io.EOFException
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:197)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; java.io.DataInputStream.readFully(DataInputStream.java:169)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; ... 15 more
&gt; &amp;gt; 2020-07-17 15:06:43,919 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Freeing task resources for
&gt; &amp;gt; Co-Keyed-Process -&amp;amp;gt; Flat Map -&amp;amp;gt; Sink: 
Unnamed (1/1)
&gt; &amp;gt; (bb8f0a84e07ef90b1e11ca2825e0efab).
&gt; &amp;gt; 2020-07-17 15:06:43,919 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Ensuring all FileSystem streams
&gt; &amp;gt; are closed for task Co-Keyed-Process -&amp;amp;gt; Flat Map 
-&amp;amp;gt;
&gt; Sink: Unnamed
&gt; &amp;gt; (1/1) (bb8f0a84e07ef90b1e11ca2825e0efab) [FAILED]
&gt; &amp;gt; 2020-07-17 15:06:43,931 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskexecutor.TaskExecutor&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - Un-registering 
task and sending
&gt; final execution
&gt; &amp;gt; state FAILED to JobManager for task Co-Keyed-Process 
-&amp;amp;gt; Flat
&gt; Map -&amp;amp;gt;
&gt; &amp;gt; Sink: Unnamed (1/1) bb8f0a84e07ef90b1e11ca2825e0efab.
&gt; &amp;gt; 2020-07-17 15:06:43,947 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Attempting to cancel task
&gt; &amp;gt; Source: Custom Source -&amp;amp;gt; Flat Map (1/1)
&gt; &amp;gt; (9cb8dcd4982223adcb6f007f1ffccdce).
&gt; &amp;gt; 2020-07-17 15:06:43,947 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Source: Custom Source -&amp;amp;gt; Flat
&gt; &amp;gt; Map (1/1) (9cb8dcd4982223adcb6f007f1ffccdce) switched from 
RUNNING to
&gt; &amp;gt; CANCELING.
&gt; &amp;gt; 2020-07-17 15:06:43,947 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Triggering cancellation of task
&gt; &amp;gt; code Source: Custom Source -&amp;amp;gt; Flat Map (1/1)
&gt; &amp;gt; (9cb8dcd4982223adcb6f007f1ffccdce).
&gt; &amp;gt; 2020-07-17 15:06:43,949 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Attempting to cancel task
&gt; &amp;gt; Source: Custom Source (1/1) (00621ff5d788d00c73ccaaea04717600).
&gt; &amp;gt; 2020-07-17 15:06:43,949 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Source: Custom Source (1/1)
&gt; &amp;gt; (00621ff5d788d00c73ccaaea04717600) switched from RUNNING to 
CANCELING.
&gt; &amp;gt; 2020-07-17 15:06:43,949 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Triggering cancellation of task
&gt; &amp;gt; code Source: Custom Source (1/1) 
(00621ff5d788d00c73ccaaea04717600).
&gt; &amp;gt; 2020-07-17 15:06:43,954 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Source: Custom Source -&amp;amp;gt; Flat
&gt; &amp;gt; Map (1/1) (9cb8dcd4982223adcb6f007f1ffccdce) switched from 
CANCELING
&gt; to
&gt; &amp;gt; CANCELED.
&gt; &amp;gt; 2020-07-17 15:06:43,954 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Freeing task resources for
&gt; &amp;gt; Source: Custom Source -&amp;amp;gt; Flat Map (1/1)
&gt; &amp;gt; (9cb8dcd4982223adcb6f007f1ffccdce).
&gt; &amp;gt; 2020-07-17 15:06:43,954 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Ensuring all FileSystem streams
&gt; &amp;gt; are closed for task Source: Custom Source -&amp;amp;gt; Flat Map 
(1/1)
&gt; &amp;gt; (9cb8dcd4982223adcb6f007f1ffccdce) [CANCELED]
&gt; &amp;gt; 2020-07-17 15:06:43,954 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Source: Custom Source (1/1)
&gt; &amp;gt; (00621ff5d788d00c73ccaaea04717600) switched from CANCELING to
&gt; CANCELED.
&gt; &amp;gt; 2020-07-17 15:06:43,955 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Freeing task resources for
&gt; &amp;gt; Source: Custom Source (1/1) (00621ff5d788d00c73ccaaea04717600).
&gt; &amp;gt; 2020-07-17 15:06:43,954 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskexecutor.TaskExecutor&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - Un-registering 
task and sending
&gt; final execution
&gt; &amp;gt; state CANCELED to JobManager for task Source: Custom Source 
-&amp;amp;gt;
&gt; Flat Map
&gt; &amp;gt; (1/1) 9cb8dcd4982223adcb6f007f1ffccdce.
&gt; &amp;gt; 2020-07-17 15:06:43,962 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Ensuring all FileSystem streams
&gt; &amp;gt; are closed for task Source: Custom Source (1/1)
&gt; &amp;gt; (00621ff5d788d00c73ccaaea04717600) [CANCELED]
&gt; &amp;gt; 2020-07-17 15:06:43,962 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.flink.runtime.taskexecutor.TaskExecutor&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - Un-registering 
task and sending
&gt; final execution
&gt; &amp;gt; state CANCELED to JobManager for task Source: Custom Source (1/1)
&gt; &amp;gt; 00621ff5d788d00c73ccaaea04717600.
&gt; &amp;gt; 2020-07-17 15:06:44,077 WARN&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.clients.consumer.ConsumerConfig&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - The configuration '
&gt; transaction.timeout.ms' was
&gt; &amp;gt; supplied but isn't a known config.
&gt; &amp;gt; 2020-07-17 15:06:44,077 WARN&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.clients.consumer.ConsumerConfig&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - The configuration 
'key.serializer'
&gt; was supplied but
&gt; &amp;gt; isn't a known config.
&gt; &amp;gt; 2020-07-17 15:06:44,077 WARN&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.clients.consumer.ConsumerConfig&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; - The configuration
&gt; 'value.serializer' was supplied
&gt; &amp;gt; but isn't a known config.
&gt; &amp;gt; 2020-07-17 15:06:44,077 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Kafka version : 0.11.0.2
&gt; &amp;gt; 2020-07-17 15:06:44,077 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Kafka commitId : 73be1e1168f91ee2
&gt; &amp;gt; 2020-07-17 15:06:44,077 WARN&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Error registering AppInfo mbean
&gt; &amp;gt; javax.management.InstanceAlreadyExistsException:
&gt; &amp;gt; kafka.consumer:type=app-info,id=consumer-3
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:757)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:633)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:615)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:502)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:181)
&gt; &amp;gt; 2020-07-17 15:06:44,079 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Kafka version : 0.11.0.2
&gt; &amp;gt; 2020-07-17 15:06:44,079 INFO&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Kafka commitId : 73be1e1168f91ee2
&gt; &amp;gt; 2020-07-17 15:06:44,079 WARN&amp;amp;nbsp;
&gt; &amp;gt; org.apache.kafka.common.utils.AppInfoParser&amp;amp;nbsp; 
&amp;amp;nbsp;
&gt; &amp;amp;nbsp; &amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; &amp;amp;nbsp; 
&amp;amp;nbsp; &amp;amp;nbsp;-
&gt; Error registering AppInfo mbean
&gt; &amp;gt; javax.management.InstanceAlreadyExistsException:
&gt; &amp;gt; kafka.consumer:type=app-info,id=consumer-4
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:58)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:757)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:633)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.kafka.clients.consumer.KafkaConsumer.<init&amp;amp;gt;(KafkaConsumer.java:615)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.getConsumer(KafkaConsumerThread.java:502)
&gt; &amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp; 
&amp;nbsp; at
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:181)
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; 
------------------&amp;amp;nbsp;????????&amp;amp;nbsp;------------------
&gt; &amp;gt; ??????:
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; &amp;nbsp; "user-zh"
&gt; 
&amp;gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;
&gt; &amp;nbsp; <
&gt; &amp;gt; [email protected]&amp;amp;gt;;
&gt; &amp;gt; ????????:&amp;amp;nbsp;2020??7??17??(??????) ????10:52
&gt; &amp;gt; 
??????:&amp;amp;nbsp;"user-zh"<[email protected]&amp;amp;gt;;
&gt; &amp;gt;
&gt; &amp;gt; ????:&amp;amp;nbsp;Re: Flink Cli ????????
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; Hi
&gt; &amp;gt;
&gt; &amp;gt; ???????????????????? Flink ????????????????&amp;amp;nbsp; 
Co-Process (1/1)
&gt; &amp;gt; (d0309f26a545e74643382ed3f758269b) ???? tm ?? log 
??????????????????????????????
&gt; 083f69d029de
&gt; &amp;gt; ????????????
&gt; &amp;gt;
&gt; &amp;gt; Best,
&gt; &amp;gt; Congxian
&gt; &amp;gt;
&gt; &amp;gt;
&gt; &amp;gt; Z-Z <[email protected]&amp;amp;gt; ??2020??7??17?????? 
????6:22??????
&gt; &amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
????????????????????????????????????????????restAPI??????????????????????????????savepoint(??????/jobs/overview
&gt; &amp;gt; &amp;amp;gt; ---&amp;amp;amp;gt; /jobs/{jobid}/savepoints 
---&amp;amp;amp;gt;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
/jobs/{jobid}/savepoints/{triggerid})??????????flink????????savepoint????????????????????????webui????jar??????savepoint??????????????????????????
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,925 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.runtime.resourcemanager.StandaloneResourceManager&amp;amp;amp;nbsp;
&gt; &amp;gt; -
&gt; &amp;gt; &amp;amp;gt; Request slot with profile ResourceProfile{UNKNOWN} 
for job
&gt; &amp;gt; &amp;amp;gt; 7639673873b707aa86c4387aa7b4aac3 with allocation id
&gt; &amp;gt; &amp;amp;gt; e8865cdbfe4c3c33099c7112bc2e3231.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,952 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source 
-&amp;amp;amp;gt; Filter
&gt; (1/1)
&gt; &amp;gt; &amp;amp;gt; (1177659bff014e8dbc3f0508055d4307) switched from 
SCHEDULED to
&gt; &amp;gt; DEPLOYING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,952 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Deploying Source: Custom Source
&gt; -&amp;amp;amp;gt; Filter (1/1)
&gt; &amp;gt; (attempt #0) to
&gt; &amp;gt; &amp;amp;gt; e63d829deafc144cd82efd73979dd056 @ 083f69d029de
&gt; (dataPort=35758)
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,953 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source (1/1)
&gt; &amp;gt; (141f0dc22b624b39e21127f637ba63c2)
&gt; &amp;gt; &amp;amp;gt; switched from SCHEDULED to DEPLOYING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,953 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Deploying Source: Custom Source 
(1/1)
&gt; (attempt #0) to
&gt; &amp;gt; &amp;amp;gt; e63d829deafc144cd82efd73979dd056 @ 083f69d029de
&gt; (dataPort=35758)
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,954 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source (1/1)
&gt; &amp;gt; (274b3df03e1fab627059c1a78e4a26da)
&gt; &amp;gt; &amp;amp;gt; switched from SCHEDULED to DEPLOYING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,954 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Deploying Source: Custom Source 
(1/1)
&gt; (attempt #0) to
&gt; &amp;gt; &amp;amp;gt; e63d829deafc144cd82efd73979dd056 @ 083f69d029de
&gt; (dataPort=35758)
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,954 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Co-Process (1/1)
&gt; (d0309f26a545e74643382ed3f758269b)
&gt; &amp;gt; switched from
&gt; &amp;gt; &amp;amp;gt; SCHEDULED to DEPLOYING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,954 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Deploying Co-Process (1/1) 
(attempt #0) to
&gt; &amp;gt; &amp;amp;gt; e63d829deafc144cd82efd73979dd056 @ 083f69d029de
&gt; (dataPort=35758)
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,955 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Co-Process -&amp;amp;amp;gt; 
(Sink: Unnamed,
&gt; Sink: Unnamed) (1/1)
&gt; &amp;gt; &amp;amp;gt; (618b75fcf5ea05fb5c6487bec6426e31) switched from 
SCHEDULED to
&gt; &amp;gt; DEPLOYING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:48,955 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Deploying Co-Process 
-&amp;amp;amp;gt; (Sink:
&gt; Unnamed, Sink:
&gt; &amp;gt; Unnamed) (1/1)
&gt; &amp;gt; &amp;amp;gt; (attempt #0) to e63d829deafc144cd82efd73979dd056 @
&gt; 083f69d029de
&gt; &amp;gt; &amp;amp;gt; (dataPort=35758)
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,346 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Co-Process -&amp;amp;amp;gt; 
(Sink: Unnamed,
&gt; Sink: Unnamed) (1/1)
&gt; &amp;gt; &amp;amp;gt; (618b75fcf5ea05fb5c6487bec6426e31) switched from 
DEPLOYING
&gt; to RUNNING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,370 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source (1/1)
&gt; &amp;gt; (274b3df03e1fab627059c1a78e4a26da)
&gt; &amp;gt; &amp;amp;gt; switched from DEPLOYING to RUNNING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,370 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source (1/1)
&gt; &amp;gt; (141f0dc22b624b39e21127f637ba63c2)
&gt; &amp;gt; &amp;amp;gt; switched from DEPLOYING to RUNNING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,377 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Co-Process (1/1)
&gt; (d0309f26a545e74643382ed3f758269b)
&gt; &amp;gt; switched from
&gt; &amp;gt; &amp;amp;gt; DEPLOYING to RUNNING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,377 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Source: Custom Source 
-&amp;amp;amp;gt; Filter
&gt; (1/1)
&gt; &amp;gt; &amp;amp;gt; (1177659bff014e8dbc3f0508055d4307) switched from 
DEPLOYING
&gt; to RUNNING.
&gt; &amp;gt; &amp;amp;gt; 2020-07-17 09:51:49,493 INFO&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt;
&gt; org.apache.flink.runtime.executiongraph.ExecutionGraph&amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;amp;nbsp; &amp;amp;amp;nbsp;
&gt; &amp;gt; &amp;amp;gt; &amp;amp;amp;nbsp; - Co-Process (1/1)
&gt; (d0309f26a545e74643382ed3f758269b)
&gt; &amp;gt; switched from
&gt; &amp;gt; &amp;amp;gt; RUNNING to FAILED.
&gt; &amp;gt; &amp;amp;gt; java.lang.Exception: Exception while creating
&gt; &amp;gt; StreamOperatorStateContext.
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; java.lang.Thread.run(Thread.java:748)
&gt; &amp;gt; &amp;amp;gt; Caused by: org.apache.flink.util.FlinkException: 
Could not
&gt; restore
&gt; &amp;gt; keyed
&gt; &amp;gt; &amp;amp;gt; state backend for
&gt; &amp;gt; &amp;amp;gt;
&gt; LegacyKeyedCoProcessOperator_65e7116c7aa972ad18a796ae22bd6327_(1/1)
&gt; &amp;gt; from
&gt; &amp;gt; &amp;amp;gt; any of the 1 provided restore options.
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:304)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:131)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; ... 9 more
&gt; &amp;gt; &amp;amp;gt; Caused by:
&gt; org.apache.flink.runtime.state.BackendBuildingException:
&gt; &amp;gt; Caught
&gt; &amp;gt; &amp;amp;gt; unexpected exception.
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:336)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:548)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:288)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; ... 11 more
&gt; &amp;gt; &amp;amp;gt; Caused by: java.io.EOFException
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; java.io.DataInputStream.readFully(DataInputStream.java:197)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; java.io.DataInputStream.readFully(DataInputStream.java:169)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer.java:85)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateData(RocksDBFullRestoreOperation.java:221)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:151)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; at
&gt; &amp;gt; &amp;amp;gt;
&gt; &amp;gt;
&gt; 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:279)
&gt; &amp;gt;
&gt; 
&amp;amp;gt;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;&amp;amp;nbsp;
&gt; ... 15 more

回复