下面是附件的内容,请问是因为什么导致重启呢?

2阶段提交demo:


@Slf4j public class CommonOracleSink extends 
TwoPhaseCommitSinkFunction<LinkedList<Object&gt;, 
CommonOracleSink.ConnectionState, Void&gt; {  &nbsp; &nbsp; private transient 
String sinkSQL;  &nbsp; &nbsp; public CommonOracleSink() {  &nbsp; &nbsp; 
&nbsp; &nbsp; super(new KryoSerializer<&gt;(ConnectionState.class, new 
ExecutionConfig()), VoidSerializer.INSTANCE);  &nbsp; &nbsp; }  &nbsp; &nbsp; 
@Override &nbsp; &nbsp; public void open(Configuration parameters) throws 
Exception { &nbsp; &nbsp; &nbsp; &nbsp; super.open(parameters); &nbsp; &nbsp; 
&nbsp; &nbsp; ParameterTool params = (ParameterTool) 
getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); &nbsp; 
&nbsp; &nbsp; &nbsp; sinkSQL = params.getRequired("sinkSQL"); &nbsp; &nbsp; }  
&nbsp; &nbsp; @Override &nbsp; &nbsp; protected void invoke(ConnectionState 
connectionState, LinkedList<Object&gt; colList, Context context){ &nbsp; &nbsp; 
&nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.err.println("start invoke......."); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; Connection connection = connectionState.connection; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; log.info("colList----------------------&gt;", 
JSON.toJSONString(colList)); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
TKQueryRunner runner = new TKQueryRunner(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; Object[] params = colList.toArray(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; System.err.println("params size-----&gt;"+params.length); &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; runner.update(connection,sinkSQL,params); &nbsp; 
&nbsp; &nbsp; &nbsp; }catch (Exception e){ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; log.error(e.getMessage(),e); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
System.err.println(e.getMessage());  &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp; 
&nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 获取连接,开启手动提交事物 &nbsp; &nbsp; 
&nbsp;* &nbsp; &nbsp; &nbsp;* @return &nbsp; &nbsp; &nbsp;* @throws Exception 
&nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; protected 
ConnectionState beginTransaction() throws Exception {  &nbsp; &nbsp; &nbsp; 
&nbsp; Connection connection = HikariOUtils.getConnection();  &nbsp; &nbsp; 
&nbsp; &nbsp; log.info("start beginTransaction......." + connection);  &nbsp; 
&nbsp; &nbsp; &nbsp; return new ConnectionState(connection); &nbsp; &nbsp; }  
&nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 预提交,这里预提交的逻辑在invoke方法中 &nbsp; &nbsp; 
&nbsp;* &nbsp; &nbsp; &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;* 
@throws Exception &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; 
protected void preCommit(ConnectionState connectionState) throws Exception { 
&nbsp; &nbsp; &nbsp; &nbsp; log.info("start preCommit......." + 
connectionState); &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; &nbsp;* 
如果invoke方法执行正常,则提交事务 &nbsp; &nbsp; &nbsp;* &nbsp; &nbsp; &nbsp;* @param 
connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; @Override &nbsp; &nbsp; 
protected void commit(ConnectionState connectionState) { &nbsp; &nbsp; &nbsp; 
&nbsp; log.info("start commit......." + connectionState);  &nbsp; &nbsp; &nbsp; 
&nbsp; Connection connection = connectionState.connection;  &nbsp; &nbsp; 
&nbsp; &nbsp; try { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
connection.commit(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
connection.close(); &nbsp; &nbsp; &nbsp; &nbsp; } catch (SQLException e) { 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new RuntimeException("提交事物异常"); 
&nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; }  &nbsp; &nbsp; /** &nbsp; &nbsp; 
&nbsp;* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 &nbsp; &nbsp; &nbsp;* &nbsp; 
&nbsp; &nbsp;* @param connectionState &nbsp; &nbsp; &nbsp;*/ &nbsp; &nbsp; 
@Override &nbsp; &nbsp; protected void abort(ConnectionState connectionState) { 
&nbsp; &nbsp; &nbsp; &nbsp; log.error("start abort rollback......." + 
connectionState); &nbsp; &nbsp; &nbsp; &nbsp; Connection connection = 
connectionState.connection; &nbsp; &nbsp; &nbsp; &nbsp; try { &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; connection.rollback(); &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; connection.close(); &nbsp; &nbsp; &nbsp; &nbsp; } catch 
(SQLException e) { &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new 
RuntimeException("回滚事物异常"); &nbsp; &nbsp; &nbsp; &nbsp; } &nbsp; &nbsp; }  
&nbsp; &nbsp; static class ConnectionState {  &nbsp; &nbsp; &nbsp; &nbsp; 
private final transient Connection connection;  &nbsp; &nbsp; &nbsp; &nbsp; 
ConnectionState(Connection connection) {  &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; this.connection = connection; &nbsp; &nbsp; &nbsp; &nbsp; }  &nbsp; 
&nbsp; }   }

jobmanager日志

2020-08-10 16:37:31,892 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - 
--------------------------------------------------------------------------------2020-08-10
 16:37:31,897 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Starting YarnJobClusterEntrypoint 
(Version: 1.11.1, Scala: 2.11, Rev:7eb514a, 
Date:2020-07-15T07:02:09+02:00)2020-08-10 16:37:31,898 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;OS current user: root2020-08-10 16:37:32,295 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;Current Hadoop/Kerberos user: root2020-08-10 
16:37:32,295 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;JVM: Java HotSpot(TM) 64-Bit Server VM - 
Oracle Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;Maximum heap size: 3166 MiBytes2020-08-10 16:37:32,295 
INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10 
16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;Hadoop version: 2.7.72020-08-10 
16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp;JVM Options:2020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp; &nbsp; -Xmx34628173762020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp; &nbsp; -Xms34628173762020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp; &nbsp; -XX:MaxMetaspaceSize=2684354562020-08-10 
16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; 
-Dlog.file=/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
 16:37:32,297 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - &nbsp; &nbsp; 
-Dlog4j.configuration=file:log4j.properties2020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp; &nbsp; 
-Dlog4j.configurationFile=file:log4j.properties2020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;Program Arguments: (none)2020-08-10 16:37:32,297 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - &nbsp;Classpath: 
:UnifyCompFlink-1.0.jar:lib/flink-csv-1.11.1.jar:lib/flink-json-1.11.1.jar:lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.11.1.jar:lib/flink-table_2.11-1.11.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.11.1.jar:job.graph:flink-conf.yaml::/home/xxx/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/logredactor-1.0.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/junit-4.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpclient-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/avro-1.7.6-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/mockito-all-1.8.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-net-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsch-0.1.42.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/gson-2.2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/paranamer-2.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hamcrest-core-1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-digester-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jets3t-0.9.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpcore-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-client-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-httpclient-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-framework-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/aopalliance-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/javax.inject-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-client-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jline-2.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/asm-3.2.jar2020-08-10
 16:37:32,299 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - 
--------------------------------------------------------------------------------2020-08-10
 16:37:32,301 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Registered UNIX signal handlers for [TERM, HUP, 
INT]2020-08-10 16:37:32,306 INFO 
&nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint &nbsp; &nbsp; 
&nbsp; &nbsp;[] - YARN daemon is running as: root Yarn client user obtainer: 
root2020-08-10 16:37:32,311 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
taskmanager.memory.process.size, 4 gb2020-08-10 16:37:32,311 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: internal.jobgraph-path, 
job.graph2020-08-10 16:37:32,311 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region2020-08-10 16:37:32,312 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
high-availability.cluster-id, application_1591335931326_00242020-08-10 
16:37:32,312 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.rpc.address, localhost2020-08-10 16:37:32,312 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: execution.target, 
yarn-per-job2020-08-10 16:37:32,312 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.memory.process.size, 4 gb2020-08-10 16:37:32,312 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.port, 
61232020-08-10 16:37:32,312 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:32,313 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: execution.attached, 
true2020-08-10 16:37:32,313 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
internal.cluster.execution-mode, NORMAL2020-08-10 16:37:32,313 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
execution.shutdown-on-attached-exit, false2020-08-10 16:37:32,313 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: pipeline.jars, 
file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:32,313 
INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Loading configuration property: parallelism.default, 
82020-08-10 16:37:32,313 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
taskmanager.numberOfTaskSlots, 12020-08-10 16:37:32,313 WARN 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Error while trying to split key and value in configuration 
file 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
 "pipeline.classpaths: "2020-08-10 16:37:32,314 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
$internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 
16:37:32,314 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: 
$internal.yarn.log-config-file, 
/home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:32,347 WARN 
&nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 
'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:32,362 INFO 
&nbsp;org.apache.flink.runtime.clusterframework.BootstrapTools &nbsp; &nbsp; [] 
- Setting directories for temporary files to: 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_00242020-08-10
 16:37:32,368 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Starting YarnJobClusterEntrypoint.2020-08-10 
16:37:32,413 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Install default filesystem.2020-08-10 
16:37:32,461 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Install security context.2020-08-10 
16:37:32,520 INFO &nbsp;org.apache.flink.runtime.security.modules.HadoopModule 
&nbsp; &nbsp; &nbsp; [] - Hadoop user set to root (auth:SIMPLE)2020-08-10 
16:37:32,529 INFO &nbsp;org.apache.flink.runtime.security.modules.JaasModule 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Jaas file will be created as 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/jaas-1114046375892877617.conf.2020-08-10
 16:37:32,539 INFO &nbsp;org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Initializing cluster services.2020-08-10 
16:37:32,556 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Trying to start actor system, external address 
node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,191 INFO 
&nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - 
Slf4jLogger started2020-08-10 16:37:33,218 INFO &nbsp;akka.remote.Remoting 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting 
remoting2020-08-10 16:37:33,378 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Remoting started; 
listening on addresses :[akka.tcp://flink@node3:40657]2020-08-10 16:37:33,506 
INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; 
&nbsp; &nbsp;[] - Actor system started at 
akka.tcp://flink@node3:406572020-08-10 16:37:33,539 WARN 
&nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 
'web.port' instead of proper key 'rest.port'2020-08-10 16:37:33,551 INFO 
&nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Created BLOB server storage 
directory 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/blobStore-15a573e2-a671-4eb9-975b-b5229cec6bde2020-08-10
 16:37:33,555 INFO &nbsp;org.apache.flink.runtime.blob.BlobServer &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Started BLOB 
server at 0.0.0.0:34380 - max concurrent requests: 50 - max backlog: 
10002020-08-10 16:37:33,570 INFO 
&nbsp;org.apache.flink.runtime.metrics.MetricRegistryImpl &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp;[] - No metrics reporter configured, no metrics will be 
exposed/reported.2020-08-10 16:37:33,574 INFO 
&nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils &nbsp; &nbsp; 
&nbsp; &nbsp;[] - Trying to start actor system, external address node3:0, bind 
address 0.0.0.0:0.2020-08-10 16:37:33,591 INFO 
&nbsp;akka.event.slf4j.Slf4jLogger &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - 
Slf4jLogger started2020-08-10 16:37:33,597 INFO &nbsp;akka.remote.Remoting 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Starting 
remoting2020-08-10 16:37:33,606 INFO &nbsp;akka.remote.Remoting &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Remoting started; 
listening on addresses :[akka.tcp://flink-metrics@node3:43096]2020-08-10 
16:37:33,642 INFO &nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils 
&nbsp; &nbsp; &nbsp; &nbsp;[] - Actor system started at 
akka.tcp://flink-metrics@node3:430962020-08-10 16:37:33,659 INFO 
&nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
akka://flink-metrics/user/rpc/MetricQueryService .2020-08-10 16:37:33,721 WARN 
&nbsp;org.apache.flink.configuration.Configuration &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Config uses deprecated configuration key 
'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:33,723 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Upload 
directory /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload 
does not exist.&nbsp;2020-08-10 16:37:33,724 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
Created directory 
/tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload for file 
uploads.2020-08-10 16:37:33,748 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
Starting rest endpoint.2020-08-10 16:37:34,110 INFO 
&nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp;[] - Determined location of main cluster component log file: 
/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
 16:37:34,111 INFO &nbsp;org.apache.flink.runtime.webmonitor.WebMonitorUtils 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Determined location of main cluster 
component stdout file: 
/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.out2020-08-10
 16:37:34,309 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Rest 
endpoint listening at node3:394692020-08-10 16:37:34,311 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
http://node3:39469 was granted leadership with 
leaderSessionID=00000000-0000-0000-0000-0000000000002020-08-10 16:37:34,312 
INFO &nbsp;org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
Web frontend listening at http://node3:39469.2020-08-10 16:37:34,403 INFO 
&nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for 
org.apache.flink.yarn.YarnResourceManager at 
akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: internal.jobgraph-path, 
job.graph2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
high-availability.cluster-id, application_1591335931326_00242020-08-10 
16:37:34,417 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: execution.target, 
yarn-per-job2020-08-10 16:37:34,417 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: jobmanager.rpc.port, 
61232020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: execution.attached, 
true2020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: pipeline.jars, 
file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:34,418 
INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Loading configuration property: parallelism.default, 
82020-08-10 16:37:34,418 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Error while trying to split key and value in configuration 
file 
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
 "pipeline.classpaths: "2020-08-10 16:37:34,419 INFO 
&nbsp;org.apache.flink.configuration.GlobalConfiguration &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; [] - Loading configuration property: 
$internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10 
16:37:34,419 INFO &nbsp;org.apache.flink.configuration.GlobalConfiguration 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Loading configuration property: 
$internal.yarn.log-config-file, 
/home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450 INFO 
&nbsp;org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - 
Enabled external resources: []2020-08-10 16:37:34,519 INFO 
&nbsp;org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] 
- Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO 
&nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for 
org.apache.flink.runtime.dispatcher.MiniDispatcher at 
akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO 
&nbsp;org.apache.flink.runtime.rpc.akka.AkkaRpcService &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; [] - Starting RPC endpoint for 
org.apache.flink.runtime.jobmaster.JobMaster at 
akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Initializing job empJOB 
(eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 
16:37:34,667 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Running initialization on 
master for job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 
16:37:34,806 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Successfully ran 
initialization on master in 139 ms.2020-08-10 16:37:34,876 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Recovered 0 containers from 
previous attempts ([]).2020-08-10 16:37:34,877 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Register application master 
response contains scheduler resource types: [MEMORY, CPU].2020-08-10 
16:37:34,877 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Container matching 
strategy: MATCH_VCORE.2020-08-10 16:37:34,887 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - ResourceManager 
akka.tcp://flink@node3:40657/user/rpc/resourcemanager_0 was granted leadership 
with fencing token 000000000000000000000000000000002020-08-10 16:37:34,891 INFO 
&nbsp;org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] - 
Starting the SlotManager.2020-08-10 16:37:35,466 INFO 
&nbsp;org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
Built 1 pipelined regions in 2 ms2020-08-10 16:37:35,483 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - No state backend has been configured, using 
default (Memory / JobManager) MemoryStateBackend (data in heap memory / 
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', 
asynchronous: TRUE, maxStateSize: 5242880)2020-08-10 16:37:35,503 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Using failover strategy 
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3915bc20
 for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:35,509 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl &nbsp; &nbsp; 
&nbsp;[] - JobManager runner for job empJOB (eb447d27efb8134da40c0c1dd19fffdf) 
was granted leadership with session id 00000000-0000-0000-0000-000000000000 at 
akka.tcp://flink@node3:40657/user/rpc/jobmanager_2.2020-08-10 16:37:35,514 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Starting execution of job empJOB 
(eb447d27efb8134da40c0c1dd19fffdf) under job master id 
00000000000000000000000000000000.2020-08-10 16:37:35,517 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - Starting scheduling with scheduling strategy 
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]2020-08-10 
16:37:35,518 INFO &nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph 
&nbsp; &nbsp; &nbsp; [] - Job empJOB (eb447d27efb8134da40c0c1dd19fffdf) 
switched from state CREATED to RUNNING.2020-08-10 16:37:35,535 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (1/6) (5a6410258857c02ebd1b5ec03a78be4b) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (2/6) (299de0d4a8affe02a999edeb84957c41) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (3/6) (1b98df27c9019f64835b55fa3de3f363) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (4/6) (a7612608772c018d819741ce4d9320bd) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (5/6) (b19828c85fc0e92e62f2a7241b610f5b) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal, 
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums: 
null -&gt; SinkConversionToRow (6/6) (c2178a51eda2db900d3212e4f488d00f) 
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (1/8) 
(2a8db3a2b4cd65fd7cd3e6bac031a971) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (2/8) 
(7aa8dd779d4ff75e4c985be75a52c427) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (3/8) 
(867c814978ea302537065f51516ed766) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,536 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (4/8) 
(4e186575ab42cc6c1d599ae027bf99b8) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,537 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (5/8) 
(b107b8bfb0a08c5e7937400c43a0f9ff) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,537 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (6/8) 
(28e1f0fa1b9ebed59e4c67b0598864b9) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,537 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (7/8) 
(e27e60ff7dcd5245dfd21b23bbd49985) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,537 INFO 
&nbsp;org.apache.flink.runtime.executiongraph.ExecutionGraph &nbsp; &nbsp; 
&nbsp; [] - Process -&gt; Sink: Unnamed (8/8) 
(c0f6b9e623c68fd7e9205a8ad686d4e5) switched from CREATED to 
SCHEDULED.2020-08-10 16:37:35,558 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}]2020-08-10 
16:37:35,565 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}]2020-08-10 
16:37:35,565 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}]2020-08-10 
16:37:35,566 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{2447496efd24d542bce06de1b69ec70d}]2020-08-10 
16:37:35,566 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{2ab761d21cd4368751f3187f122705fa}]2020-08-10 
16:37:35,566 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Cannot serve slot request, no ResourceManager connected. Adding as pending 
request [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}]2020-08-10 
16:37:35,574 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Connecting to 
ResourceManager 
akka.tcp://flink@node3:40657/user/rpc/resourcemanager_*(00000000000000000000000000000000)2020-08-10
 16:37:35,579 INFO &nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; [] - Resolved ResourceManager 
address, beginning registration2020-08-10 16:37:35,584 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering job manager 
00000000000000000000000000000...@akka.tcp://flink@node3:40657/user/rpc/jobmanager_2
 for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,589 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registered job manager 
00000000000000000000000000000...@akka.tcp://flink@node3:40657/user/rpc/jobmanager_2
 for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,593 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.JobMaster &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; [] - JobManager successfully registered at 
ResourceManager, leader id: 00000000000000000000000000000000.2020-08-10 
16:37:35,594 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Requesting new slot [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}] and 
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 
INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id e490d3208119fe28d97f4f0fe94cab28.2020-08-10 16:37:35,595 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Requesting new slot [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}] and 
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 
INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; 
&nbsp; [] - Requesting new slot 
[SlotRequestId{a1d23d7e17b9369e80833de148f54bac}] and profile 
ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Requesting new slot [SlotRequestId{2447496efd24d542bce06de1b69ec70d}] and 
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 
INFO &nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; 
&nbsp; [] - Requesting new slot 
[SlotRequestId{2ab761d21cd4368751f3187f122705fa}] and profile 
ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO 
&nbsp;org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl &nbsp; &nbsp; [] 
- Requesting new slot [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}] and 
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,612 
INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new 
TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0, 
taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes, 
networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063 
bytes)}. Number pending workers of this resource is 1.2020-08-10 16:37:35,614 
INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id b74299e8619de93adec5869d1fa79d73.2020-08-10 16:37:35,615 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor 
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending 
workers of this resource is 2.2020-08-10 16:37:35,615 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id 046a3dcf1af40e0539f15fcddfbddf77.2020-08-10 16:37:35,615 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor 
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending 
workers of this resource is 3.2020-08-10 16:37:35,615 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id 90870250ae0f3bef44cbdd675dede57b.2020-08-10 16:37:35,616 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor 
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending 
workers of this resource is 4.2020-08-10 16:37:35,616 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id f8063a6fc86162712215a92533532b65.2020-08-10 16:37:35,616 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor 
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending 
workers of this resource is 5.2020-08-10 16:37:35,616 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Request slot with profile 
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with 
allocation id cfe671ec5448d440838f02145cb6267f.2020-08-10 16:37:35,617 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Requesting new TaskExecutor 
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending 
workers of this resource is 6.2020-08-10 16:37:38,391 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:37:40,933 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 containers.2020-08-10 
16:37:40,940 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Received 1 
containers with resource <memory:4096, vCores:1&gt;, 6 pending container 
requests.2020-08-10 16:37:40,953 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - TaskExecutor 
container_1591335931326_0024_01_000003 will be started on node1 with 
TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (134217728 
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb 
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb 
(359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes), 
jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb 
(429496736 bytes)}.2020-08-10 16:37:40,976 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Creating container launch context 
for TaskManagers2020-08-10 16:37:40,978 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Starting TaskManagers2020-08-10 
16:37:40,995 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Removing container 
request Capability[<memory:4096, vCores:1&gt;]Priority[1].2020-08-10 
16:37:40,995 INFO &nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Accepted 1 
requested containers, returned 0 excess containers, 5 pending container 
requests of resource <memory:4096, vCores:1&gt;.2020-08-10 16:37:46,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:37:47,712 INFO 
&nbsp;org.apache.flink.yarn.YarnResourceManager &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;[] - Registering TaskManager with 
ResourceID container_1591335931326_0024_01_000003 
(akka.tcp://flink@node1:40857/user/rpc/taskmanager_0) at 
ResourceManager2020-08-10 16:37:54,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:02,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:10,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:18,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:26,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:34,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:42,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:50,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:38:58,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:06,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:14,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:22,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:30,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:38,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:46,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.2020-08-10 16:39:54,389 INFO 
&nbsp;org.apache.flink.runtime.checkpoint.CheckpointCoordinator &nbsp; &nbsp;[] 
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr, 
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false, 
PartitionNums: null -&gt; SinkConversionToRow (1/6) of job 
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead. 
Aborting checkpoint.



job代码

public class PlatJobExecution {
&nbsp; &nbsp; private volatile ParameterTool parameters;
&nbsp; &nbsp; public PlatJobExecution(ParameterTool parameters) {&nbsp; &nbsp; 
&nbsp; &nbsp; this.parameters = parameters;&nbsp; &nbsp; }
&nbsp; &nbsp; public void execute() throws Exception {

&nbsp; &nbsp; &nbsp; &nbsp; //目标数据源:&nbsp; &nbsp; &nbsp; &nbsp; //目标数据表:
&nbsp; &nbsp; &nbsp; &nbsp; //1.读取数据 kafka /oracle &nbsp;把流注册成一张表【这个过程可以手动完成】 
&nbsp; &nbsp; &nbsp;--hive
&nbsp; &nbsp; &nbsp; &nbsp; //2.执行sql,返回结果
&nbsp; &nbsp; &nbsp; &nbsp; //3.把结果写入目标数据表 / 写入redis / 写入kafka
&nbsp; &nbsp; &nbsp; &nbsp; InputStream is = 
ReadKafkaPrint.class.getClassLoader().getResourceAsStream("config.properties");
&nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameters2 = 
ParameterTool.fromPropertiesFile(is);



&nbsp; &nbsp; &nbsp; &nbsp; String targetDatabase = 
parameters.get("sourceDatabase");&nbsp; &nbsp; &nbsp; &nbsp; String 
executiveSql = parameters.get("executiveSql");&nbsp; &nbsp; &nbsp; &nbsp; 
String sinkSQL = parameters.get("sinkSQL");&nbsp; &nbsp; &nbsp; &nbsp; String 
jobName = parameters.get("jobName");
&nbsp; &nbsp; &nbsp; &nbsp; Map<String, String&gt; pMap = 
Maps.newHashMap();&nbsp; &nbsp; &nbsp; &nbsp; 
pMap.putAll(parameters2.toMap());&nbsp; &nbsp; &nbsp; &nbsp; 
pMap.put("sinkSQL",sinkSQL);
&nbsp; &nbsp; &nbsp; &nbsp; parameters2 = ParameterTool.fromMap(pMap);

&nbsp; &nbsp; &nbsp; &nbsp; //1.创建执行环境&nbsp; &nbsp; &nbsp; &nbsp; 
StreamExecutionEnvironment streamEnv = 
StreamExecutionEnvironment.getExecutionEnvironment();
&nbsp; &nbsp; &nbsp; &nbsp; //全局参数设置&nbsp; &nbsp; &nbsp; &nbsp; 
streamEnv.getConfig().setGlobalJobParameters(parameters2);
&nbsp; &nbsp; &nbsp; &nbsp; streamEnv.enableCheckpointing(8000, 
CheckpointingMode.EXACTLY_ONCE);//每隔5s进行一次checkpoint
&nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings tableEnvSettings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
&nbsp; &nbsp; &nbsp; &nbsp; //2.流式的TableEnv&nbsp; &nbsp; &nbsp; &nbsp; 
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, 
tableEnvSettings);&nbsp; &nbsp; &nbsp; &nbsp; 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
 CheckpointingMode.EXACTLY_ONCE);&nbsp; &nbsp; &nbsp; &nbsp; 
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
 Duration.ofSeconds(8));
&nbsp; &nbsp; &nbsp; &nbsp; //3.注册HiveCatalog&nbsp; &nbsp; &nbsp; &nbsp; String 
name &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;= targetDatabase;&nbsp; &nbsp; 
&nbsp; &nbsp; String defaultDatabase = targetDatabase;&nbsp; &nbsp; &nbsp; 
&nbsp; String hiveConfDir &nbsp; &nbsp; = 
"/home/xxx/app/flink-1.11.1/jobcfg";&nbsp; &nbsp; &nbsp; &nbsp; String version 
&nbsp; &nbsp; &nbsp; &nbsp; = "1.1.0";
&nbsp; &nbsp; &nbsp; &nbsp; HiveCatalog catalog = new HiveCatalog(name, 
defaultDatabase, hiveConfDir, version);&nbsp; &nbsp; &nbsp; &nbsp; 
tableEnv.registerCatalog(name, catalog);&nbsp; &nbsp; &nbsp; &nbsp; 
tableEnv.useCatalog(name);
&nbsp; &nbsp; &nbsp; &nbsp; //4.流式读取Hive&nbsp; &nbsp; &nbsp; &nbsp; 
tableEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
 true);
&nbsp; &nbsp; &nbsp; &nbsp; //query&nbsp; &nbsp; &nbsp; &nbsp; Table table = 
tableEnv.sqlQuery(executiveSql);
&nbsp; &nbsp; &nbsp; &nbsp; // CREATE/INSERT&nbsp; &nbsp; &nbsp; &nbsp; // 
tableEnv.executeSql()
// &nbsp; &nbsp; &nbsp; &nbsp;tableEnv.toRetractStream(table, 
Row.class).print().setParallelism(1);
&nbsp; &nbsp; &nbsp; &nbsp; 
SingleOutputStreamOperator<LinkedList<Object&gt;&gt; colList = 
tableEnv.toAppendStream(table, Row.class).process(new ProcessFunction<Row, 
LinkedList<Object&gt;&gt;() {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
@Override&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public void 
processElement(Row row, Context context, Collector<LinkedList<Object&gt;&gt; 
collector) throws Exception {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; LinkedList<Object&gt; linkedList = Lists.newLinkedList();&nbsp; &nbsp; 
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; for (int i = 0; i < row.getArity(); 
i++) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
linkedList.add(row.getField(i));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
collector.collect(linkedList);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; 
&nbsp; &nbsp; &nbsp; });
&nbsp; &nbsp; &nbsp; &nbsp; colList.addSink(new CommonOracleSink());

&nbsp; &nbsp; &nbsp; &nbsp; //sink to Oracle&nbsp; &nbsp; &nbsp; &nbsp; 
streamEnv.execute(jobName);

&nbsp; &nbsp; }}


发自我的iPhone


------------------ 原始邮件 ------------------
发件人: shizk233 <wangwangdaxian...@gmail.com&gt;
发送时间: 2020年8月10日 18:04
收件人: user-zh@flink.apache.org <user-zh@flink.apache.org&gt;
主题: 回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败



hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。

不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。

Bruce&nbsp;<bruceleeof1...@qq.com&gt;&nbsp;于2020年8月10日周一&nbsp;下午5:01写道:

&gt;&nbsp;您好,这里有个问题反馈下!
&gt;
&gt;&nbsp;读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
&gt;&nbsp;没有抛任何异常但是checkpoint失败:
&gt;&nbsp;job&nbsp;eb447d27efb8134da40c0c1dd19fffdf&nbsp;is&nbsp;not&nbsp;in&nbsp;state&nbsp;RUNNING&nbsp;but&nbsp;SCHEDULED
&gt;&nbsp;instead.&nbsp;Aborting&nbsp;checkpoint.
&gt;&nbsp;附件
&gt;&nbsp;1.flink.log是yarn&nbsp;jobmanager打印的伪日志
&gt;&nbsp;2.Job.txt是job的伪代码
&gt;&nbsp;3.jdbc两阶段提交的伪代码附件
&gt;&nbsp;------------------------------
&gt;&nbsp;发自我的iPhone
&gt;

回复