下面是附件的内容,请问是因为什么导致重启呢?
2阶段提交demo:
@Slf4j public class CommonOracleSink extends
TwoPhaseCommitSinkFunction<LinkedList<Object>,
CommonOracleSink.ConnectionState, Void> { private transient
String sinkSQL; public CommonOracleSink() {
super(new KryoSerializer<>(ConnectionState.class, new
ExecutionConfig()), VoidSerializer.INSTANCE); }
@Override public void open(Configuration parameters) throws
Exception { super.open(parameters);
ParameterTool params = (ParameterTool)
getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
sinkSQL = params.getRequired("sinkSQL"); }
@Override protected void invoke(ConnectionState
connectionState, LinkedList<Object> colList, Context context){
try {
System.err.println("start invoke.......");
Connection connection = connectionState.connection;
log.info("colList---------------------->",
JSON.toJSONString(colList));
TKQueryRunner runner = new TKQueryRunner();
Object[] params = colList.toArray();
System.err.println("params size----->"+params.length);
runner.update(connection,sinkSQL,params);
}catch (Exception e){
log.error(e.getMessage(),e);
System.err.println(e.getMessage()); }
} /** * 获取连接,开启手动提交事物
* * @return * @throws Exception
*/ @Override protected
ConnectionState beginTransaction() throws Exception {
Connection connection = HikariOUtils.getConnection();
log.info("start beginTransaction......." + connection);
return new ConnectionState(connection); }
/** * 预提交,这里预提交的逻辑在invoke方法中
* * @param connectionState *
@throws Exception */ @Override
protected void preCommit(ConnectionState connectionState) throws Exception {
log.info("start preCommit......." +
connectionState); } /** *
如果invoke方法执行正常,则提交事务 * * @param
connectionState */ @Override
protected void commit(ConnectionState connectionState) {
log.info("start commit......." + connectionState);
Connection connection = connectionState.connection;
try {
connection.commit();
connection.close(); } catch (SQLException e) {
throw new RuntimeException("提交事物异常");
} } /**
* 如果invoke执行异常则回滚事物,下一次的checkpoint操作也不会执行 *
* @param connectionState */
@Override protected void abort(ConnectionState connectionState) {
log.error("start abort rollback......." +
connectionState); Connection connection =
connectionState.connection; try {
connection.rollback();
connection.close(); } catch
(SQLException e) { throw new
RuntimeException("回滚事物异常"); } }
static class ConnectionState {
private final transient Connection connection;
ConnectionState(Connection connection) {
this.connection = connection; }
} }
jobmanager日志
2020-08-10 16:37:31,892 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] -
--------------------------------------------------------------------------------2020-08-10
16:37:31,897 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Starting YarnJobClusterEntrypoint
(Version: 1.11.1, Scala: 2.11, Rev:7eb514a,
Date:2020-07-15T07:02:09+02:00)2020-08-10 16:37:31,898 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - OS current user: root2020-08-10 16:37:32,295 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Current Hadoop/Kerberos user: root2020-08-10
16:37:32,295 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - JVM: Java HotSpot(TM) 64-Bit Server VM -
Oracle Corporation - 1.8/25.121-b132020-08-10 16:37:32,295 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Maximum heap size: 3166 MiBytes2020-08-10 16:37:32,295
INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - JAVA_HOME: /home/xxx/app/jdk1.8.0_1212020-08-10
16:37:32,297 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Hadoop version: 2.7.72020-08-10
16:37:32,297 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - JVM Options:2020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - -Xmx34628173762020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - -Xms34628173762020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - -XX:MaxMetaspaceSize=2684354562020-08-10
16:37:32,297 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] -
-Dlog.file=/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
16:37:32,297 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] -
-Dlog4j.configuration=file:log4j.properties2020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] -
-Dlog4j.configurationFile=file:log4j.properties2020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Program Arguments: (none)2020-08-10 16:37:32,297 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Classpath:
:UnifyCompFlink-1.0.jar:lib/flink-csv-1.11.1.jar:lib/flink-json-1.11.1.jar:lib/flink-shaded-hadoop-2-uber-2.6.5-10.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.11.1.jar:lib/flink-table_2.11-1.11.1.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:flink-dist_2.11-1.11.1.jar:job.graph:flink-conf.yaml::/home/xxx/app/hadoop-2.6.0-cdh5.15.1/etc/hadoop:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/hadoop-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-util-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/logredactor-1.0.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/junit-4.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-configuration-1.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-core-1.8.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-api-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpclient-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/api-asn1-api-1.0.0-M20.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/avro-1.7.6-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/mockito-all-1.8.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-auth-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-net-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsch-0.1.42.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hadoop-annotations-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-recipes-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/snappy-java-1.0.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/gson-2.2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/java-xmlbuilder-0.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/paranamer-2.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/hamcrest-core-1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-math3-3.1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-digester-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-compiler-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jets3t-0.9.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/httpcore-4.2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-beanutils-1.9.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-kerberos-codec-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-client-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/commons-httpclient-3.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/apacheds-i18n-2.0.0-M15.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/curator-framework-2.7.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/common/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1-tests.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/hadoop-hdfs-nfs-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/netty-3.10.5.Final.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-el-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jsp-api-2.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/commons-daemon-1.0.13.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xmlenc-0.52.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xml-apis-1.3.04.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/xercesImpl-2.9.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jasper-runtime-5.5.23.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/htrace-core4-4.0.1-incubating.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/hdfs/lib/asm-3.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-unmanaged-am-launcher-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-registry-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-nodemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-web-proxy-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-applications-distributedshell-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-common-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-tests-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-api-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-applicationhistoryservice-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-server-resourcemanager-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/hadoop-yarn-client-2.6.0-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/zookeeper-3.4.5-cdh5.15.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/activation-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-codec-1.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/aopalliance-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/leveldbjni-all-1.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/javax.inject-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jsr305-3.0.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-client-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jline-2.11.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-api-2.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-cli-1.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-jaxrs-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-servlet-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-json-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-logging-1.1.3.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-lang-2.6.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jettison-1.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-compress-1.4.1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guava-11.0.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/protobuf-java-2.5.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-guice-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-server-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-io-2.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-xc-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jersey-core-1.9.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/guice-3.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/servlet-api-2.5.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/log4j-1.2.17.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/xz-1.0.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-core-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/stax-api-1.0-2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jetty-util-6.1.26.cloudera.4.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/commons-collections-3.2.2.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jackson-mapper-asl-1.8.8.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/jaxb-impl-2.2.3-1.jar:/home/xxx/app/hadoop-2.6.0-cdh5.15.1/share/hadoop/yarn/lib/asm-3.2.jar2020-08-10
16:37:32,299 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] -
--------------------------------------------------------------------------------2020-08-10
16:37:32,301 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Registered UNIX signal handlers for [TERM, HUP,
INT]2020-08-10 16:37:32,306 INFO
org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - YARN daemon is running as: root Yarn client user obtainer:
root2020-08-10 16:37:32,311 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
taskmanager.memory.process.size, 4 gb2020-08-10 16:37:32,311 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: internal.jobgraph-path,
job.graph2020-08-10 16:37:32,311 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.execution.failover-strategy, region2020-08-10 16:37:32,312 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
high-availability.cluster-id, application_1591335931326_00242020-08-10
16:37:32,312 INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.rpc.address, localhost2020-08-10 16:37:32,312 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: execution.target,
yarn-per-job2020-08-10 16:37:32,312 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.memory.process.size, 4 gb2020-08-10 16:37:32,312 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: jobmanager.rpc.port,
61232020-08-10 16:37:32,312 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:32,313 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: execution.attached,
true2020-08-10 16:37:32,313 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
internal.cluster.execution-mode, NORMAL2020-08-10 16:37:32,313 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
execution.shutdown-on-attached-exit, false2020-08-10 16:37:32,313 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: pipeline.jars,
file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:32,313
INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: parallelism.default,
82020-08-10 16:37:32,313 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
taskmanager.numberOfTaskSlots, 12020-08-10 16:37:32,313 WARN
org.apache.flink.configuration.GlobalConfiguration
[] - Error while trying to split key and value in configuration
file
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
"pipeline.classpaths: "2020-08-10 16:37:32,314 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
$internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
16:37:32,314 INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
$internal.yarn.log-config-file,
/home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:32,347 WARN
org.apache.flink.configuration.Configuration
[] - Config uses deprecated configuration key
'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:32,362 INFO
org.apache.flink.runtime.clusterframework.BootstrapTools []
- Setting directories for temporary files to:
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_00242020-08-10
16:37:32,368 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Starting YarnJobClusterEntrypoint.2020-08-10
16:37:32,413 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Install default filesystem.2020-08-10
16:37:32,461 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Install security context.2020-08-10
16:37:32,520 INFO org.apache.flink.runtime.security.modules.HadoopModule
[] - Hadoop user set to root (auth:SIMPLE)2020-08-10
16:37:32,529 INFO org.apache.flink.runtime.security.modules.JaasModule
[] - Jaas file will be created as
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/jaas-1114046375892877617.conf.2020-08-10
16:37:32,539 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint
[] - Initializing cluster services.2020-08-10
16:37:32,556 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
[] - Trying to start actor system, external address
node3:0, bind address 0.0.0.0:0.2020-08-10 16:37:33,191 INFO
akka.event.slf4j.Slf4jLogger
[] -
Slf4jLogger started2020-08-10 16:37:33,218 INFO akka.remote.Remoting
[] - Starting
remoting2020-08-10 16:37:33,378 INFO akka.remote.Remoting
[] - Remoting started;
listening on addresses :[akka.tcp://flink@node3:40657]2020-08-10 16:37:33,506
INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
[] - Actor system started at
akka.tcp://flink@node3:406572020-08-10 16:37:33,539 WARN
org.apache.flink.configuration.Configuration
[] - Config uses deprecated configuration key
'web.port' instead of proper key 'rest.port'2020-08-10 16:37:33,551 INFO
org.apache.flink.runtime.blob.BlobServer
[] - Created BLOB server storage
directory
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/blobStore-15a573e2-a671-4eb9-975b-b5229cec6bde2020-08-10
16:37:33,555 INFO org.apache.flink.runtime.blob.BlobServer
[] - Started BLOB
server at 0.0.0.0:34380 - max concurrent requests: 50 - max backlog:
10002020-08-10 16:37:33,570 INFO
org.apache.flink.runtime.metrics.MetricRegistryImpl
[] - No metrics reporter configured, no metrics will be
exposed/reported.2020-08-10 16:37:33,574 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
[] - Trying to start actor system, external address node3:0, bind
address 0.0.0.0:0.2020-08-10 16:37:33,591 INFO
akka.event.slf4j.Slf4jLogger
[] -
Slf4jLogger started2020-08-10 16:37:33,597 INFO akka.remote.Remoting
[] - Starting
remoting2020-08-10 16:37:33,606 INFO akka.remote.Remoting
[] - Remoting started;
listening on addresses :[akka.tcp://flink-metrics@node3:43096]2020-08-10
16:37:33,642 INFO org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils
[] - Actor system started at
akka.tcp://flink-metrics@node3:430962020-08-10 16:37:33,659 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.metrics.dump.MetricQueryService at
akka://flink-metrics/user/rpc/MetricQueryService .2020-08-10 16:37:33,721 WARN
org.apache.flink.configuration.Configuration
[] - Config uses deprecated configuration key
'web.port' instead of proper key 'rest.bind-port'2020-08-10 16:37:33,723 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Upload
directory /tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload
does not exist. 2020-08-10 16:37:33,724 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
Created directory
/tmp/flink-web-d25a365e-b6cb-45e6-bb6e-3068b7412f06/flink-web-upload for file
uploads.2020-08-10 16:37:33,748 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
Starting rest endpoint.2020-08-10 16:37:34,110 INFO
org.apache.flink.runtime.webmonitor.WebMonitorUtils
[] - Determined location of main cluster component log file:
/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.log2020-08-10
16:37:34,111 INFO org.apache.flink.runtime.webmonitor.WebMonitorUtils
[] - Determined location of main cluster
component stdout file:
/home/xxx/app/hadoop-2.6.0-cdh5.15.1/logs/userlogs/application_1591335931326_0024/container_1591335931326_0024_01_000001/jobmanager.out2020-08-10
16:37:34,309 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Rest
endpoint listening at node3:394692020-08-10 16:37:34,311 INFO
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
http://node3:39469 was granted leadership with
leaderSessionID=00000000-0000-0000-0000-0000000000002020-08-10 16:37:34,312
INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] -
Web frontend listening at http://node3:39469.2020-08-10 16:37:34,403 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.yarn.YarnResourceManager at
akka://flink/user/rpc/resourcemanager_0 .2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
taskmanager.memory.process.size, 4 gb2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: internal.jobgraph-path,
job.graph2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.execution.failover-strategy, region2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
high-availability.cluster-id, application_1591335931326_00242020-08-10
16:37:34,417 INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.rpc.address, localhost2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: execution.target,
yarn-per-job2020-08-10 16:37:34,417 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
jobmanager.memory.process.size, 4 gb2020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: jobmanager.rpc.port,
61232020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
execution.savepoint.ignore-unclaimed-state, false2020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: execution.attached,
true2020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
internal.cluster.execution-mode, NORMAL2020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
execution.shutdown-on-attached-exit, false2020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: pipeline.jars,
file:/home/xxx/app/flink-1.11.1/UnifyCompFlink-1.0.jar2020-08-10 16:37:34,418
INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property: parallelism.default,
82020-08-10 16:37:34,418 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
taskmanager.numberOfTaskSlots, 12020-08-10 16:37:34,418 WARN
org.apache.flink.configuration.GlobalConfiguration
[] - Error while trying to split key and value in configuration
file
/tmp/hadoop-root/nm-local-dir/usercache/root/appcache/application_1591335931326_0024/container_1591335931326_0024_01_000001/flink-conf.yaml:16:
"pipeline.classpaths: "2020-08-10 16:37:34,419 INFO
org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
$internal.deployment.config-dir, /home/xxx/app/flink-1.11.1/conf2020-08-10
16:37:34,419 INFO org.apache.flink.configuration.GlobalConfiguration
[] - Loading configuration property:
$internal.yarn.log-config-file,
/home/xxx/app/flink-1.11.1/conf/log4j.properties2020-08-10 16:37:34,450 INFO
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] -
Enabled external resources: []2020-08-10 16:37:34,519 INFO
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess []
- Start JobDispatcherLeaderProcess.2020-08-10 16:37:34,527 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.dispatcher.MiniDispatcher at
akka://flink/user/rpc/dispatcher_1 .2020-08-10 16:37:34,572 INFO
org.apache.flink.runtime.rpc.akka.AkkaRpcService
[] - Starting RPC endpoint for
org.apache.flink.runtime.jobmaster.JobMaster at
akka://flink/user/rpc/jobmanager_2 .2020-08-10 16:37:34,582 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Initializing job empJOB
(eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:34,615 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Using restart back off time strategy
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647,
backoffTimeMS=1000) for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10
16:37:34,667 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Running initialization on
master for job empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10
16:37:34,806 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Successfully ran
initialization on master in 139 ms.2020-08-10 16:37:34,876 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Recovered 0 containers from
previous attempts ([]).2020-08-10 16:37:34,877 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Register application master
response contains scheduler resource types: [MEMORY, CPU].2020-08-10
16:37:34,877 INFO org.apache.flink.yarn.YarnResourceManager
[] - Container matching
strategy: MATCH_VCORE.2020-08-10 16:37:34,887 INFO
org.apache.flink.yarn.YarnResourceManager
[] - ResourceManager
akka.tcp://flink@node3:40657/user/rpc/resourcemanager_0 was granted leadership
with fencing token 000000000000000000000000000000002020-08-10 16:37:34,891 INFO
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl [] -
Starting the SlotManager.2020-08-10 16:37:35,466 INFO
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] -
Built 1 pipelined regions in 2 ms2020-08-10 16:37:35,483 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - No state backend has been configured, using
default (Memory / JobManager) MemoryStateBackend (data in heap memory /
checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null',
asynchronous: TRUE, maxStateSize: 5242880)2020-08-10 16:37:35,503 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Using failover strategy
org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3915bc20
for empJOB (eb447d27efb8134da40c0c1dd19fffdf).2020-08-10 16:37:35,509 INFO
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl
[] - JobManager runner for job empJOB (eb447d27efb8134da40c0c1dd19fffdf)
was granted leadership with session id 00000000-0000-0000-0000-000000000000 at
akka.tcp://flink@node3:40657/user/rpc/jobmanager_2.2020-08-10 16:37:35,514 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Starting execution of job empJOB
(eb447d27efb8134da40c0c1dd19fffdf) under job master id
00000000000000000000000000000000.2020-08-10 16:37:35,517 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - Starting scheduling with scheduling strategy
[org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]2020-08-10
16:37:35,518 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Job empJOB (eb447d27efb8134da40c0c1dd19fffdf)
switched from state CREATED to RUNNING.2020-08-10 16:37:35,535 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (1/6) (5a6410258857c02ebd1b5ec03a78be4b)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (2/6) (299de0d4a8affe02a999edeb84957c41)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (3/6) (1b98df27c9019f64835b55fa3de3f363)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (4/6) (a7612608772c018d819741ce4d9320bd)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (5/6) (b19828c85fc0e92e62f2a7241b610f5b)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Source: HiveTableSource(empno, ename, job, mgr, hiredate, sal,
comm, deptno) TablePath: flink000.emp, PartitionPruned: false, PartitionNums:
null -> SinkConversionToRow (6/6) (c2178a51eda2db900d3212e4f488d00f)
switched from CREATED to SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (1/8)
(2a8db3a2b4cd65fd7cd3e6bac031a971) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (2/8)
(7aa8dd779d4ff75e4c985be75a52c427) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (3/8)
(867c814978ea302537065f51516ed766) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,536 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (4/8)
(4e186575ab42cc6c1d599ae027bf99b8) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,537 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (5/8)
(b107b8bfb0a08c5e7937400c43a0f9ff) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,537 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (6/8)
(28e1f0fa1b9ebed59e4c67b0598864b9) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,537 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (7/8)
(e27e60ff7dcd5245dfd21b23bbd49985) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,537 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph
[] - Process -> Sink: Unnamed (8/8)
(c0f6b9e623c68fd7e9205a8ad686d4e5) switched from CREATED to
SCHEDULED.2020-08-10 16:37:35,558 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}]2020-08-10
16:37:35,565 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}]2020-08-10
16:37:35,565 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{a1d23d7e17b9369e80833de148f54bac}]2020-08-10
16:37:35,566 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{2447496efd24d542bce06de1b69ec70d}]2020-08-10
16:37:35,566 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{2ab761d21cd4368751f3187f122705fa}]2020-08-10
16:37:35,566 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Cannot serve slot request, no ResourceManager connected. Adding as pending
request [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}]2020-08-10
16:37:35,574 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Connecting to
ResourceManager
akka.tcp://flink@node3:40657/user/rpc/resourcemanager_*(00000000000000000000000000000000)2020-08-10
16:37:35,579 INFO org.apache.flink.runtime.jobmaster.JobMaster
[] - Resolved ResourceManager
address, beginning registration2020-08-10 16:37:35,584 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Registering job manager
[email protected]://flink@node3:40657/user/rpc/jobmanager_2
for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,589 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Registered job manager
[email protected]://flink@node3:40657/user/rpc/jobmanager_2
for job eb447d27efb8134da40c0c1dd19fffdf.2020-08-10 16:37:35,593 INFO
org.apache.flink.runtime.jobmaster.JobMaster
[] - JobManager successfully registered at
ResourceManager, leader id: 00000000000000000000000000000000.2020-08-10
16:37:35,594 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Requesting new slot [SlotRequestId{7c8417d17f555e546cd6427733db6a3b}] and
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595
INFO org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id e490d3208119fe28d97f4f0fe94cab28.2020-08-10 16:37:35,595 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Requesting new slot [SlotRequestId{eb7c6ff479c977fdb6312f8e61dfdfba}] and
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595
INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
[] - Requesting new slot
[SlotRequestId{a1d23d7e17b9369e80833de148f54bac}] and profile
ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,595 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Requesting new slot [SlotRequestId{2447496efd24d542bce06de1b69ec70d}] and
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596
INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl
[] - Requesting new slot
[SlotRequestId{2ab761d21cd4368751f3187f122705fa}] and profile
ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,596 INFO
org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl []
- Requesting new slot [SlotRequestId{08a52f113cce64d1309509da5d25a1bb}] and
profile ResourceProfile{UNKNOWN} from resource manager.2020-08-10 16:37:35,612
INFO org.apache.flink.yarn.YarnResourceManager
[] - Requesting new
TaskExecutor container with resource WorkerResourceSpec {cpuCores=1.0,
taskHeapSize=1.425gb (1530082070 bytes), taskOffHeapSize=0 bytes,
networkMemSize=343.040mb (359703515 bytes), managedMemSize=1.340gb (1438814063
bytes)}. Number pending workers of this resource is 1.2020-08-10 16:37:35,614
INFO org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id b74299e8619de93adec5869d1fa79d73.2020-08-10 16:37:35,615 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Requesting new TaskExecutor
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending
workers of this resource is 2.2020-08-10 16:37:35,615 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id 046a3dcf1af40e0539f15fcddfbddf77.2020-08-10 16:37:35,615 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Requesting new TaskExecutor
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending
workers of this resource is 3.2020-08-10 16:37:35,615 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id 90870250ae0f3bef44cbdd675dede57b.2020-08-10 16:37:35,616 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Requesting new TaskExecutor
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending
workers of this resource is 4.2020-08-10 16:37:35,616 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id f8063a6fc86162712215a92533532b65.2020-08-10 16:37:35,616 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Requesting new TaskExecutor
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending
workers of this resource is 5.2020-08-10 16:37:35,616 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Request slot with profile
ResourceProfile{UNKNOWN} for job eb447d27efb8134da40c0c1dd19fffdf with
allocation id cfe671ec5448d440838f02145cb6267f.2020-08-10 16:37:35,617 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Requesting new TaskExecutor
container with resource WorkerResourceSpec {cpuCores=1.0, taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemSize=1.340gb (1438814063 bytes)}. Number pending
workers of this resource is 6.2020-08-10 16:37:38,391 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:37:40,933 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Received 1 containers.2020-08-10
16:37:40,940 INFO org.apache.flink.yarn.YarnResourceManager
[] - Received 1
containers with resource <memory:4096, vCores:1>, 6 pending container
requests.2020-08-10 16:37:40,953 INFO
org.apache.flink.yarn.YarnResourceManager
[] - TaskExecutor
container_1591335931326_0024_01_000003 will be started on node1 with
TaskExecutorProcessSpec {cpuCores=1.0, frameworkHeapSize=128.000mb (134217728
bytes), frameworkOffHeapSize=128.000mb (134217728 bytes), taskHeapSize=1.425gb
(1530082070 bytes), taskOffHeapSize=0 bytes, networkMemSize=343.040mb
(359703515 bytes), managedMemorySize=1.340gb (1438814063 bytes),
jvmMetaspaceSize=256.000mb (268435456 bytes), jvmOverheadSize=409.600mb
(429496736 bytes)}.2020-08-10 16:37:40,976 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Creating container launch context
for TaskManagers2020-08-10 16:37:40,978 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Starting TaskManagers2020-08-10
16:37:40,995 INFO org.apache.flink.yarn.YarnResourceManager
[] - Removing container
request Capability[<memory:4096, vCores:1>]Priority[1].2020-08-10
16:37:40,995 INFO org.apache.flink.yarn.YarnResourceManager
[] - Accepted 1
requested containers, returned 0 excess containers, 5 pending container
requests of resource <memory:4096, vCores:1>.2020-08-10 16:37:46,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:37:47,712 INFO
org.apache.flink.yarn.YarnResourceManager
[] - Registering TaskManager with
ResourceID container_1591335931326_0024_01_000003
(akka.tcp://flink@node1:40857/user/rpc/taskmanager_0) at
ResourceManager2020-08-10 16:37:54,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:02,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:10,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:18,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:26,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:34,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:42,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:50,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:38:58,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:06,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:14,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:22,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:30,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:38,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:46,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.2020-08-10 16:39:54,389 INFO
org.apache.flink.runtime.checkpoint.CheckpointCoordinator []
- Checkpoint triggering task Source: HiveTableSource(empno, ename, job, mgr,
hiredate, sal, comm, deptno) TablePath: flink000.emp, PartitionPruned: false,
PartitionNums: null -> SinkConversionToRow (1/6) of job
eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED instead.
Aborting checkpoint.
job代码
public class PlatJobExecution {
private volatile ParameterTool parameters;
public PlatJobExecution(ParameterTool parameters) {
this.parameters = parameters; }
public void execute() throws Exception {
//目标数据源: //目标数据表:
//1.读取数据 kafka /oracle 把流注册成一张表【这个过程可以手动完成】
--hive
//2.执行sql,返回结果
//3.把结果写入目标数据表 / 写入redis / 写入kafka
InputStream is =
ReadKafkaPrint.class.getClassLoader().getResourceAsStream("config.properties");
ParameterTool parameters2 =
ParameterTool.fromPropertiesFile(is);
String targetDatabase =
parameters.get("sourceDatabase"); String
executiveSql = parameters.get("executiveSql");
String sinkSQL = parameters.get("sinkSQL"); String
jobName = parameters.get("jobName");
Map<String, String> pMap =
Maps.newHashMap();
pMap.putAll(parameters2.toMap());
pMap.put("sinkSQL",sinkSQL);
parameters2 = ParameterTool.fromMap(pMap);
//1.创建执行环境
StreamExecutionEnvironment streamEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
//全局参数设置
streamEnv.getConfig().setGlobalJobParameters(parameters2);
streamEnv.enableCheckpointing(8000,
CheckpointingMode.EXACTLY_ONCE);//每隔5s进行一次checkpoint
EnvironmentSettings tableEnvSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
//2.流式的TableEnv
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv,
tableEnvSettings);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_MODE,
CheckpointingMode.EXACTLY_ONCE);
tableEnv.getConfig().getConfiguration().set(ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL,
Duration.ofSeconds(8));
//3.注册HiveCatalog String
name = targetDatabase;
String defaultDatabase = targetDatabase;
String hiveConfDir =
"/home/xxx/app/flink-1.11.1/jobcfg"; String version
= "1.1.0";
HiveCatalog catalog = new HiveCatalog(name,
defaultDatabase, hiveConfDir, version);
tableEnv.registerCatalog(name, catalog);
tableEnv.useCatalog(name);
//4.流式读取Hive
tableEnv.getConfig().getConfiguration().setBoolean(TableConfigOptions.TABLE_DYNAMIC_TABLE_OPTIONS_ENABLED,
true);
//query Table table =
tableEnv.sqlQuery(executiveSql);
// CREATE/INSERT //
tableEnv.executeSql()
// tableEnv.toRetractStream(table,
Row.class).print().setParallelism(1);
SingleOutputStreamOperator<LinkedList<Object>> colList =
tableEnv.toAppendStream(table, Row.class).process(new ProcessFunction<Row,
LinkedList<Object>>() {
@Override public void
processElement(Row row, Context context, Collector<LinkedList<Object>>
collector) throws Exception {
LinkedList<Object> linkedList = Lists.newLinkedList();
for (int i = 0; i < row.getArity();
i++) {
linkedList.add(row.getField(i));
}
collector.collect(linkedList); }
});
colList.addSink(new CommonOracleSink());
//sink to Oracle
streamEnv.execute(jobName);
}}
发自我的iPhone
------------------ 原始邮件 ------------------
发件人: shizk233 <[email protected]>
发送时间: 2020年8月10日 18:04
收件人: [email protected] <[email protected]>
主题: 回复:读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,没有异常但是checkpoint失败
hi,附件挂了,没看到饿。可以考虑挂gist放链接出来。
不过看这个信息,checkpoint的失败是由于任务状态变更导致的(应该是任务重启了,所以running变scheduled了)。
建议往任务重启的方向排查一下。
Bruce <[email protected]> 于2020年8月10日周一 下午5:01写道:
> 您好,这里有个问题反馈下!
>
> 读取hive表的数据转化为流;按照jdbc两阶段提交方式写入oracle数据库,
> 没有抛任何异常但是checkpoint失败:
> job eb447d27efb8134da40c0c1dd19fffdf is not in state RUNNING but SCHEDULED
> instead. Aborting checkpoint.
> 附件
> 1.flink.log是yarn jobmanager打印的伪日志
> 2.Job.txt是job的伪代码
> 3.jdbc两阶段提交的伪代码附件
> ------------------------------
> 发自我的iPhone
>