Hi,

I have an artifact which works perfectly fine with Per-Job Cluster Mode
with the following bash script:

#!/bin/env bash

export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run -t yarn-per-job myjar.jar myconf.conf

I tried Application Mode [1] using the exact same artifact with the
following script:

#!/bin/env bash


export FLINK_CONF_DIR=./conf

export HADOOP_CLASSPATH=`hadoop classpath`


$FLINK_HOME/bin/flink run-application -t yarn-application \

    
-Dyarn.provided.lib.dirs='hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins'
\

    -Dyarn.ship-files=myconf.conf \

    hdfs:///jars/myjar.jar myconf.conf

but the job fails with the following exception

2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task
                [] - session-window -> (Sink: kafka-sink, Sink:
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4)
switched from RUNNING to FAILED.

org.apache.kafka.common.KafkaException: Failed to construct kafka producer

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531)
~[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722)
[flink-dist_2.11-1.12.0.jar:1.12.0]

        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547)
[flink-dist_2.11-1.12.0.jar:1.12.0]

        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]

Caused by: org.apache.kafka.common.KafkaException: class
org.apache.kafka.common.serialization.ByteArraySerializer is not an
instance of org.apache.kafka.common.serialization.Serializer

        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        at
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359)
~[stream-calculator-0.1-SNAPSHOT.jar:?]

        ... 23 more

I have flink-connector-kafka_2.11 in my artifact and don't have it under
flink lib directory at all.

Thanks in advance,

p.s. the attached is the detailed log message from a TM

Dongwon

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#application-mode
2020-12-16 15:52:21,540 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - 
--------------------------------------------------------------------------------
2020-12-16 15:52:21,546 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Starting YARN TaskExecutor runner (Version: 1.12.0, Scala: 
2.11, Rev:fc00492, Date:2020-12-02T08:49:16+01:00)
2020-12-16 15:52:21,547 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  OS current user: yarn
2020-12-16 15:52:21,713 WARN  org.apache.hadoop.util.NativeCodeLoader           
           [] - Unable to load native-hadoop library for your platform... using 
builtin-java classes where applicable
2020-12-16 15:52:21,744 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Current Hadoop/Kerberos user: deploy
2020-12-16 15:52:21,745 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  JVM: OpenJDK 64-Bit Server VM - AdoptOpenJDK - 1.8/25.222-b10
2020-12-16 15:52:21,745 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Maximum heap size: 4171 MiBytes
2020-12-16 15:52:21,745 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  JAVA_HOME: /usr/local/jdk8u222-b10
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Hadoop version: 3.1.1.3.1.4.0-315
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  JVM Options:
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Xmx4563402682
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Xms4563402682
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -XX:MaxDirectMemorySize=1073741838
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -XX:MaxMetaspaceSize=268435456
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     
-Dlog.file=/data1/yarn/log/application_1600163418174_0105/container_1600163418174_0105_01_000003/taskmanager.log
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Dlog4j.configuration=file:./log4j.properties
2020-12-16 15:52:21,747 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Dlog4j.configurationFile=file:./log4j.properties
2020-12-16 15:52:21,748 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Program Arguments:
2020-12-16 15:52:21,749 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,749 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.framework.off-heap.size=134217728b
2020-12-16 15:52:21,749 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,749 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.network.max=939524110b
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.network.min=939524110b
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.framework.heap.size=134217728b
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.managed.size=3758096440b
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.cpu.cores=4.0
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.task.heap.size=4429184954b
2020-12-16 15:52:21,750 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -D
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     taskmanager.memory.task.off-heap.size=0b
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     --configDir
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     .
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.rpc.address=mobdata-devflink-dn04.dakao.io
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.memory.jvm-overhead.min=201326592b
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Dpipeline.classpaths=
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     
-Dtaskmanager.resource-id=container_1600163418174_0105_01_000003
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Dweb.port=0
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.memory.off-heap.size=134217728b
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Dexecution.target=embedded
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     
-Dweb.tmpdir=/tmp/flink-web-24dcd83a-5779-4bf1-bd6d-174382abf3e4
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     
-Dinternal.taskmanager.resource-id.metadata=mobdata-devflink-dn03.dakao.io:45454
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.rpc.port=46401
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     
-Dpipeline.jars=file:/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/container_1600163418174_0105_01_000001/stream-calculator-0.1-SNAPSHOT.jar
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Drest.address=mobdata-devflink-dn04.dakao.io
2020-12-16 15:52:21,751 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.memory.jvm-metaspace.size=268435456b
2020-12-16 15:52:21,752 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.memory.heap.size=1073741824b
2020-12-16 15:52:21,752 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -     -Djobmanager.memory.jvm-overhead.max=201326592b
2020-12-16 15:52:21,752 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] -  Classpath: 
:stream-calculator-0.1-SNAPSHOT.jar:lib/flink-csv-1.12.0.jar:lib/flink-json-1.12.0.jar:lib/flink-shaded-zookeeper-3.4.14.jar:lib/flink-table-blink_2.11-1.12.0.jar:lib/flink-table_2.11-1.12.0.jar:lib/log4j-1.2-api-2.12.1.jar:lib/log4j-api-2.12.1.jar:lib/log4j-core-2.12.1.jar:lib/log4j-slf4j-impl-2.12.1.jar:lib/flink-dist_2.11-1.12.0.jar:flink-conf.yaml::/usr/hdp/3.1.4.0-315/hadoop/conf:/usr/hdp/3.1.4.0-315/hadoop/azure-data-lake-store-sdk-2.3.3.jar:/usr/hdp/3.1.4.0-315/hadoop/azure-keyvault-core-1.0.0.jar:/usr/hdp/3.1.4.0-315/hadoop/azure-storage-7.0.0.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-annotations-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-annotations.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-auth-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-auth.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-azure-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-azure-datalake-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-azure-datalake.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-azure.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-common-3.1.1.3.1.4.0-315-tests.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-common-tests.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-common.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-kms-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-kms.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-nfs-3.1.1.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/hadoop-nfs.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/javax.servlet-api-3.1.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/stax2-api-3.1.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/ranger-hdfs-plugin-shim-1.2.0.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jaxb-api-2.2.11.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/metrics-core-3.2.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/ranger-plugin-classloader-1.2.0.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jaxb-impl-2.2.3-1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/ranger-yarn-plugin-shim-1.2.0.3.1.4.0-315.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-security-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/accessors-smart-1.2.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jcip-annotations-1.0-1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/animal-sniffer-annotations-1.17.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/asm-5.0.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/netty-3.10.5.Final.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/avro-1.7.7.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/checker-qual-2.8.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jsr311-api-1.1.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-beanutils-1.9.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/nimbus-jose-jwt-4.41.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-cli-1.2.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jul-to-slf4j-1.7.25.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-codec-1.11.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jersey-core-1.19.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/xz-1.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-admin-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-compress-1.4.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jersey-server-1.19.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-configuration2-2.1.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/slf4j-log4j12-1.7.25.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-io-2.5.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-client-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-lang-2.6.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-common-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-lang3-3.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-core-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-logging-1.1.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-crypto-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/paranamer-2.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/commons-net-3.6.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-identity-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/curator-client-2.12.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jersey-json-1.19.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/curator-framework-2.12.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-server-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/curator-recipes-2.12.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jersey-servlet-1.19.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/error_prone_annotations-2.3.2.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-simplekdc-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/failureaccess-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/gson-2.2.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/re2j-1.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/guava-28.0-jre.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jettison-1.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/htrace-core4-4.1.0-incubating.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerb-util-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/httpclient-4.5.2.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/slf4j-api-1.7.25.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/httpcore-4.4.4.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerby-asn1-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/j2objc-annotations-1.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-io-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-annotations-2.9.9.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerby-config-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-core-2.9.9.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerby-pkix-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-core-asl-1.9.13.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-http-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-databind-2.9.9.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerby-util-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-jaxrs-1.9.13.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-xml-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-mapper-asl-1.9.13.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/kerby-xdr-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jackson-xc-1.9.13.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jsch-0.1.54.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-servlet-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/json-smart-2.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jsp-api-2.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/jetty-webapp-9.3.24.v20180605.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/snappy-java-1.0.5.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/log4j-1.2.17.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/token-provider-1.0.1.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/woodstox-core-5.0.3.jar:/usr/hdp/3.1.4.0-315/hadoop/lib/zookeeper-3.4.6.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-3.1.1.3.1.4.0-315-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-client-3.1.1.3.1.4.0-315-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-client-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-client.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-httpfs-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-httpfs.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-native-client-3.1.1.3.1.4.0-315-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-native-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-native-client-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-native-client.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-nfs-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-nfs.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-rbf-3.1.1.3.1.4.0-315-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-rbf-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-rbf-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-rbf.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs-tests.jar:/usr/hdp/current/hadoop-hdfs-client/hadoop-hdfs.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-servlet-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/accessors-smart-1.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-json-1.19.jar:/usr/hdp/current/hadoop-hdfs-client/lib/animal-sniffer-annotations-1.17.jar:/usr/hdp/current/hadoop-hdfs-client/lib/paranamer-2.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/asm-5.0.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/protobuf-java-2.5.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/avro-1.7.7.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jsr311-api-1.1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/checker-qual-2.8.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-admin-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-beanutils-1.9.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/re2j-1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-cli-1.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-client-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-codec-1.11.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-server-1.19.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-collections-3.2.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-common-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-compress-1.4.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jettison-1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-configuration2-2.1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-core-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-daemon-1.0.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/snappy-java-1.0.5.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-io-2.5.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-crypto-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-lang-2.6.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-identity-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-lang3-3.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-server-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-logging-1.1.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-simplekdc-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-math3-3.1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/stax2-api-3.1.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/commons-net-3.6.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerb-util-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/curator-client-2.12.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-servlet-1.19.jar:/usr/hdp/current/hadoop-hdfs-client/lib/curator-framework-2.12.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerby-asn1-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/curator-recipes-2.12.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-http-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/error_prone_annotations-2.3.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerby-config-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/failureaccess-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/token-provider-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/gson-2.2.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/woodstox-core-5.0.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/guava-28.0-jre.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-io-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/htrace-core4-4.1.0-incubating.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerby-pkix-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/httpclient-4.5.2.jar:/usr/hdp/current/hadoop-hdfs-client/lib/xz-1.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/httpcore-4.4.4.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerby-util-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/j2objc-annotations-1.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-security-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-annotations-2.9.9.jar:/usr/hdp/current/hadoop-hdfs-client/lib/kerby-xdr-1.0.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-core-2.9.9.jar:/usr/hdp/current/hadoop-hdfs-client/lib/leveldbjni-all-1.8.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-core-asl-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-databind-2.9.9.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/log4j-1.2.17.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-jaxrs-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/json-simple-1.1.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-mapper-asl-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/netty-3.10.5.Final.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jackson-xc-1.9.13.jar:/usr/hdp/current/hadoop-hdfs-client/lib/netty-all-4.0.52.Final.jar:/usr/hdp/current/hadoop-hdfs-client/lib/javax.servlet-api-3.1.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jaxb-api-2.2.11.jar:/usr/hdp/current/hadoop-hdfs-client/lib/nimbus-jose-jwt-4.41.1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jaxb-impl-2.2.3-1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/okhttp-2.7.5.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jcip-annotations-1.0-1.jar:/usr/hdp/current/hadoop-hdfs-client/lib/okio-1.6.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jersey-core-1.19.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-util-ajax-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/json-smart-2.3.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-webapp-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jsr305-3.0.0.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jetty-xml-9.3.24.v20180605.jar:/usr/hdp/current/hadoop-hdfs-client/lib/jsch-0.1.54.jar:/usr/hdp/current/hadoop-hdfs-client/lib/listenablefuture-9999.0-empty-to-avoid-conflict-with-guava.jar:/usr/hdp/current/hadoop-hdfs-client/lib/zookeeper-3.4.6.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-api-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-api.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-web-proxy.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-distributedshell.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-unmanaged-am-launcher-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-applications-unmanaged-am-launcher.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-client-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-client.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-common.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-registry-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-registry.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-services-api-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-applicationhistoryservice-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-applicationhistoryservice.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-common-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-common.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-nodemanager-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-nodemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-resourcemanager-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-resourcemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-router-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-router.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-sharedcachemanager-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-sharedcachemanager.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-tests-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-tests.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-services-api.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-timeline-pluginstorage.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-server-web-proxy-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-services-core-3.1.1.3.1.4.0-315.jar:/usr/hdp/current/hadoop-yarn-client/hadoop-yarn-services-core.jar:/usr/hdp/current/hadoop-yarn-client/lib/HikariCP-java7-2.4.12.jar:/usr/hdp/current/hadoop-yarn-client/lib/aopalliance-1.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/bcpkix-jdk15on-1.60.jar:/usr/hdp/current/hadoop-yarn-client/lib/bcprov-jdk15on-1.60.jar:/usr/hdp/current/hadoop-yarn-client/lib/dnsjava-2.1.7.jar:/usr/hdp/current/hadoop-yarn-client/lib/ehcache-3.3.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/fst-2.50.jar:/usr/hdp/current/hadoop-yarn-client/lib/geronimo-jcache_1.0_spec-1.0-alpha-1.jar:/usr/hdp/current/hadoop-yarn-client/lib/guice-4.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/guice-servlet-4.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-jaxrs-base-2.9.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-jaxrs-json-provider-2.9.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/jackson-module-jaxb-annotations-2.9.9.jar:/usr/hdp/current/hadoop-yarn-client/lib/java-util-1.9.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/javax.inject-1.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-client-1.19.jar:/usr/hdp/current/hadoop-yarn-client/lib/jersey-guice-1.19.jar:/usr/hdp/current/hadoop-yarn-client/lib/json-io-2.5.1.jar:/usr/hdp/current/hadoop-yarn-client/lib/metrics-core-3.2.4.jar:/usr/hdp/current/hadoop-yarn-client/lib/mssql-jdbc-6.2.1.jre7.jar:/usr/hdp/current/hadoop-yarn-client/lib/objenesis-1.0.jar:/usr/hdp/current/hadoop-yarn-client/lib/snakeyaml-1.16.jar:/usr/hdp/current/hadoop-yarn-client/lib/swagger-annotations-1.5.4.jar
2020-12-16 15:52:21,754 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - 
--------------------------------------------------------------------------------
2020-12-16 15:52:21,755 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - Registered UNIX signal handlers for [TERM, HUP, INT]
2020-12-16 15:52:21,758 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - Current working Directory: 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/container_1600163418174_0105_01_000003
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: state.checkpoints.num-retained, 10
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: 
restart-strategy.failure-rate.max-failures-per-interval, 3
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: high-availability.cluster-id, 
application_1600163418174_0105
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-12-16 15:52:21,772 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: state.savepoints.dir, hdfs:///stream/ckpts
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.savepoint.ignore-unclaimed-state, false
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: yarn.provided.lib.dirs, 
hdfs:///flink-dists/flink-1.12.0/lib;hdfs:///flink-dists/flink-1.12.0/plugins
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: $internal.application.program-args, rtdev.conf
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: parallelism.default, 1
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: taskmanager.numberOfTaskSlots, 4
2020-12-16 15:52:21,773 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: cluster.evenly-spread-out-slots, true
2020-12-16 15:52:21,773 WARN  
org.apache.flink.configuration.GlobalConfiguration           [] - Error while 
trying to split key and value in configuration file ./flink-conf.yaml:13: 
"pipeline.classpaths: "
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: yarn.application.name, stream app
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: restart-strategy.failure-rate.failure-rate-interval, 
3min
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: metrics.reporter.prom.class, 
org.apache.flink.metrics.prometheus.PrometheusReporter
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: taskmanager.memory.process.size, 10g
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: metrics.reporter.prom.port, 9249-9300
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: jobmanager.archive.fs.dir, hdfs:///flink-completed-jobs
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: restart-strategy.failure-rate.delay, 20s
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: state.backend.incremental, true
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.target, yarn-application
2020-12-16 15:52:21,774 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: jobmanager.memory.process.size, 1600m
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: yarn.ship-files, rtdev.conf
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: jobmanager.rpc.port, 6123
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.checkpointing.interval, 20min
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.attached, true
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: internal.cluster.execution-mode, NORMAL
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.shutdown-on-attached-exit, false
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: pipeline.jars, 
hdfs:///stream/jars/stream-calculator-0.1-SNAPSHOT.jar
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: state.backend, rocksdb
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: execution.checkpointing.min-pause, 1min
2020-12-16 15:52:21,775 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: restart-strategy, failure-rate
2020-12-16 15:52:21,776 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: $internal.deployment.config-dir, ./conf
2020-12-16 15:52:21,776 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: $internal.yarn.log-config-file, ./conf/log4j.properties
2020-12-16 15:52:21,776 INFO  
org.apache.flink.configuration.GlobalConfiguration           [] - Loading 
configuration property: state.checkpoints.dir, hdfs:///stream/ckpts
2020-12-16 15:52:21,776 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - Current working/local Directory: 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105
2020-12-16 15:52:21,790 INFO  
org.apache.flink.runtime.clusterframework.BootstrapTools     [] - Setting 
directories for temporary files to: 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105
2020-12-16 15:52:21,791 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - TM: local keytab path obtained null
2020-12-16 15:52:21,791 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - TM: keytab principal obtained null
2020-12-16 15:52:21,797 INFO  org.apache.flink.yarn.YarnTaskExecutorRunner      
           [] - YARN daemon is running as: deploy Yarn client user obtainer: 
deploy
2020-12-16 15:52:21,916 INFO  
org.apache.flink.runtime.security.modules.HadoopModule       [] - Hadoop user 
set to deploy (auth:SIMPLE)
2020-12-16 15:52:21,925 INFO  
org.apache.flink.runtime.security.modules.JaasModule         [] - Jaas file 
will be created as 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/jaas-162693948539274386.conf.
2020-12-16 15:52:21,970 WARN  org.apache.flink.configuration.Configuration      
           [] - Config uses deprecated configuration key 'web.port' instead of 
proper key 'rest.port'
2020-12-16 15:52:21,974 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Using 
configured hostname/address for TaskManager: mobdata-devflink-dn03.dakao.io.
2020-12-16 15:52:21,976 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Trying to 
start actor system, external address mobdata-devflink-dn03.dakao.io:0, bind 
address 0.0.0.0:0.
2020-12-16 15:52:22,427 INFO  akka.event.slf4j.Slf4jLogger                      
           [] - Slf4jLogger started
2020-12-16 15:52:22,448 INFO  akka.remote.Remoting                              
           [] - Starting remoting
2020-12-16 15:52:22,559 INFO  akka.remote.Remoting                              
           [] - Remoting started; listening on addresses 
:[akka.tcp://fl...@mobdata-devflink-dn03.dakao.io:38681]
2020-12-16 15:52:22,829 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Actor system 
started at akka.tcp://fl...@mobdata-devflink-dn03.dakao.io:38681
2020-12-16 15:52:22,902 INFO  
org.apache.flink.metrics.prometheus.PrometheusReporter       [] - Started 
PrometheusReporter HTTP server on port 9259.
2020-12-16 15:52:22,903 INFO  
org.apache.flink.runtime.metrics.MetricRegistryImpl          [] - Reporting 
metrics for reporter prom of type 
org.apache.flink.metrics.prometheus.PrometheusReporter.
2020-12-16 15:52:22,907 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Trying to 
start actor system, external address mobdata-devflink-dn03.dakao.io:0, bind 
address 0.0.0.0:0.
2020-12-16 15:52:22,921 INFO  akka.event.slf4j.Slf4jLogger                      
           [] - Slf4jLogger started
2020-12-16 15:52:22,923 INFO  akka.remote.Remoting                              
           [] - Starting remoting
2020-12-16 15:52:22,929 INFO  akka.remote.Remoting                              
           [] - Remoting started; listening on addresses 
:[akka.tcp://flink-metr...@mobdata-devflink-dn03.dakao.io:42224]
2020-12-16 15:52:22,979 INFO  
org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils        [] - Actor system 
started at akka.tcp://flink-metr...@mobdata-devflink-dn03.dakao.io:42224
2020-12-16 15:52:22,992 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
           [] - Starting RPC endpoint for 
org.apache.flink.runtime.metrics.dump.MetricQueryService at 
akka://flink-metrics/user/rpc/MetricQueryService_container_1600163418174_0105_01_000003
 .
2020-12-16 15:52:23,003 INFO  org.apache.flink.runtime.blob.PermanentBlobCache  
           [] - Created BLOB cache storage directory 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/blobStore-94a1efa6-0056-4664-a1eb-5b09c90c10fb
2020-12-16 15:52:23,005 INFO  org.apache.flink.runtime.blob.TransientBlobCache  
           [] - Created BLOB cache storage directory 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/blobStore-705578c9-0e3c-4f6a-8d0a-17cf4236708b
2020-12-16 15:52:23,007 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2020-12-16 15:52:23,007 INFO  
org.apache.flink.runtime.externalresource.ExternalResourceUtils [] - Enabled 
external resources: []
2020-12-16 15:52:23,008 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerRunner      [] - Starting 
TaskManager with ResourceID: 
container_1600163418174_0105_01_000003(mobdata-devflink-dn03.dakao.io:45454)
2020-12-16 15:52:23,038 INFO  
org.apache.flink.runtime.taskexecutor.TaskManagerServices    [] - Temporary 
file directory 
'/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105': 
total 14899 GB, usable 14891 GB (99.95% usable)
2020-12-16 15:52:23,043 INFO  
org.apache.flink.runtime.io.disk.FileChannelManagerImpl      [] - 
FileChannelManager uses directory 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/flink-io-18fba940-649d-408e-94d3-031fd124ff69
 for spill files.
2020-12-16 15:52:23,052 INFO  
org.apache.flink.runtime.io.network.netty.NettyConfig        [] - NettyConfig 
[server address: /0.0.0.0, server port: 0, ssl enabled: false, memory segment 
size (bytes): 32768, transport type: AUTO, number of server threads: 4 
(manual), number of client threads: 4 (manual), server connect backlog: 0 (use 
Netty's default), client connect timeout (sec): 120, send/receive buffer size 
(bytes): 0 (use Netty's default)]
2020-12-16 15:52:23,055 INFO  
org.apache.flink.runtime.io.disk.FileChannelManagerImpl      [] - 
FileChannelManager uses directory 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/flink-netty-shuffle-97355cec-b6d3-494e-8422-ca9fe3a4a9f8
 for spill files.
2020-12-16 15:52:23,700 INFO  
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool [] - Allocated 896 
MB for network buffer pool (number of memory segments: 28672, bytes per 
segment: 32768).
2020-12-16 15:52:23,712 INFO  
org.apache.flink.runtime.io.network.NettyShuffleEnvironment  [] - Starting the 
network environment and its components.
2020-12-16 15:52:23,774 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient        [] - Transport 
type 'auto': using EPOLL.
2020-12-16 15:52:23,776 INFO  
org.apache.flink.runtime.io.network.netty.NettyClient        [] - Successful 
initialization (took 63 ms).
2020-12-16 15:52:23,782 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer        [] - Transport 
type 'auto': using EPOLL.
2020-12-16 15:52:23,817 INFO  
org.apache.flink.runtime.io.network.netty.NettyServer        [] - Successful 
initialization (took 39 ms). Listening on SocketAddress /0.0.0.0:43174.
2020-12-16 15:52:23,818 INFO  
org.apache.flink.runtime.taskexecutor.KvStateService         [] - Starting the 
kvState service and its components.
2020-12-16 15:52:23,915 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcService  
           [] - Starting RPC endpoint for 
org.apache.flink.runtime.taskexecutor.TaskExecutor at 
akka://flink/user/rpc/taskmanager_0 .
2020-12-16 15:52:23,929 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Start job 
leader service.
2020-12-16 15:52:23,931 INFO  org.apache.flink.runtime.filecache.FileCache      
           [] - User file cache uses directory 
/data1/yarn/local/usercache/deploy/appcache/application_1600163418174_0105/flink-dist-cache-f8fb3c56-f6b1-42cf-addc-0328c87009ed
2020-12-16 15:52:23,933 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Connecting to 
ResourceManager 
akka.tcp://fl...@mobdata-devflink-dn04.dakao.io:46401/user/rpc/resourcemanager_*(00000000000000000000000000000000).
2020-12-16 15:52:24,086 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Resolved 
ResourceManager address, beginning registration
2020-12-16 15:52:24,142 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Successful 
registration at resource manager 
akka.tcp://fl...@mobdata-devflink-dn04.dakao.io:46401/user/rpc/resourcemanager_*
 under registration id 67b2b227d1fdcdbcd01333d9fa7e865d.
2020-12-16 15:52:24,157 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Receive slot 
request 92f2faca112a3a2522b83bb1671c33fb for job 
04d16056ff91cfeabe50094dcd0b70c9 from resource manager with leader id 
00000000000000000000000000000000.
2020-12-16 15:52:24,162 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Allocated 
slot for 92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,163 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Add job 
04d16056ff91cfeabe50094dcd0b70c9 for job leader monitoring.
2020-12-16 15:52:24,164 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Try to 
register at job manager 
akka.tcp://fl...@mobdata-devflink-dn04.dakao.io:46401/user/rpc/jobmanager_2 
with leader id 00000000-0000-0000-0000-000000000000.
2020-12-16 15:52:24,175 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Resolved 
JobManager address, beginning registration
2020-12-16 15:52:24,187 INFO  
org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Successful 
registration at job manager 
akka.tcp://fl...@mobdata-devflink-dn04.dakao.io:46401/user/rpc/jobmanager_2 for 
job 04d16056ff91cfeabe50094dcd0b70c9.
2020-12-16 15:52:24,188 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Establish 
JobManager connection for job 04d16056ff91cfeabe50094dcd0b70c9.
2020-12-16 15:52:24,191 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Offer 
reserved slots to the leader of job 04d16056ff91cfeabe50094dcd0b70c9.
2020-12-16 15:52:24,207 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,229 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,271 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
enricher (1/1)#0 (032aff32afe042524cfb0639a3bc4b64), deploy into slot with 
allocation id 92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,272 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - enricher (1/1)#0 (032aff32afe042524cfb0639a3bc4b64) switched 
from CREATED to DEPLOYING.
2020-12-16 15:52:24,276 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Loading JAR files for task enricher (1/1)#0 
(032aff32afe042524cfb0639a3bc4b64) [DEPLOYING].
2020-12-16 15:52:24,276 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,279 INFO  org.apache.flink.runtime.blob.BlobClient          
           [] - Downloading 
04d16056ff91cfeabe50094dcd0b70c9/p-ad05abf47895f79601474d976f6b3a3d57c22c87-0bb80d2e3367581dc17772f3fab4fe36
 from mobdata-devflink-dn04.dakao.io/10.93.0.221:35086
2020-12-16 15:52:24,280 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
session-window -> (Sink: kafka-sink, Sink: session-window-late-data) (1/1)#0 
(ee9fc1aa21833c749e3c271fd52cbfd4), deploy into slot with allocation id 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:24,280 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - session-window -> (Sink: kafka-sink, Sink: 
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched 
from CREATED to DEPLOYING.
2020-12-16 15:52:24,281 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Loading JAR files for task session-window -> (Sink: kafka-sink, 
Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) 
[DEPLOYING].
2020-12-16 15:52:24,612 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Registering task at network: enricher (1/1)#0 
(032aff32afe042524cfb0639a3bc4b64) [DEPLOYING].
2020-12-16 15:52:24,612 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Registering task at network: session-window -> (Sink: 
kafka-sink, Sink: session-window-late-data) (1/1)#0 
(ee9fc1aa21833c749e3c271fd52cbfd4) [DEPLOYING].
2020-12-16 15:52:24,660 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:24,660 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:24,661 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2020-12-16 15:52:24,661 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2020-12-16 15:52:24,661 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-12-16 15:52:24,661 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-12-16 15:52:24,661 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:24,661 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:25,165 WARN  
org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The 
short-circuit local reads feature cannot be used because libhadoop cannot be 
loaded.
2020-12-16 15:52:25,182 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - session-window -> (Sink: kafka-sink, Sink: 
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched 
from DEPLOYING to RUNNING.
2020-12-16 15:52:25,182 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - enricher (1/1)#0 (032aff32afe042524cfb0639a3bc4b64) switched 
from DEPLOYING to RUNNING.
2020-12-16 15:52:25,323 INFO  
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction [] - 
FlinkKafkaProducer 1/1 - no state to restore
2020-12-16 15:52:25,346 INFO  org.apache.kafka.clients.producer.ProducerConfig  
           [] - ProducerConfig values: 
        acks = 1
        batch.size = 16384
        bootstrap.servers = [dev.daumkakao.io:9092]
        buffer.memory = 33554432
        client.dns.lookup = default
        client.id = 
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = false
        interceptor.classes = []
        key.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 11000000
        metadata.max.age.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class 
org.apache.kafka.clients.producer.internals.DefaultPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.endpoint.identification.algorithm = https
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLS
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 3600000
        transactional.id = null
        value.serializer = class 
org.apache.kafka.common.serialization.ByteArraySerializer

2020-12-16 15:52:25,355 INFO  org.apache.kafka.clients.producer.KafkaProducer   
           [] - [Producer clientId=producer-1] Closing the Kafka producer with 
timeoutMillis = 0 ms.
2020-12-16 15:52:25,364 WARN  org.apache.flink.runtime.taskmanager.Task         
           [] - session-window -> (Sink: kafka-sink, Sink: 
session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4) switched 
from RUNNING to FAILED.
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432) 
~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298) 
~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:78)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1158)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1259)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1255)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:950)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:100)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:398)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:389)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1128)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:185)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:167)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:107)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:264)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:400)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:507)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:501)
 ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:531) 
~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) 
[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_222]
Caused by: org.apache.kafka.common.KafkaException: class 
org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of 
org.apache.kafka.common.serialization.Serializer
        at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:374)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:392)
 ~[stream-calculator-0.1-SNAPSHOT.jar:?]
        at 
org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:359) 
~[stream-calculator-0.1-SNAPSHOT.jar:?]
        ... 23 more
2020-12-16 15:52:25,371 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Freeing task resources for session-window -> (Sink: kafka-sink, 
Sink: session-window-late-data) (1/1)#0 (ee9fc1aa21833c749e3c271fd52cbfd4).
2020-12-16 15:52:25,378 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state FAILED to JobManager for 
task session-window -> (Sink: kafka-sink, Sink: session-window-late-data) 
(1/1)#0 ee9fc1aa21833c749e3c271fd52cbfd4.
2020-12-16 15:52:25,422 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Attempting to cancel task enricher (1/1)#0 
(032aff32afe042524cfb0639a3bc4b64).
2020-12-16 15:52:25,422 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - enricher (1/1)#0 (032aff32afe042524cfb0639a3bc4b64) switched 
from RUNNING to CANCELING.
2020-12-16 15:52:25,422 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Triggering cancellation of task code enricher (1/1)#0 
(032aff32afe042524cfb0639a3bc4b64).
2020-12-16 15:52:25,742 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - enricher (1/1)#0 (032aff32afe042524cfb0639a3bc4b64) switched 
from CANCELING to CANCELED.
2020-12-16 15:52:25,742 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Freeing task resources for enricher (1/1)#0 
(032aff32afe042524cfb0639a3bc4b64).
2020-12-16 15:52:25,743 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task enricher (1/1)#0 032aff32afe042524cfb0639a3bc4b64.
2020-12-16 15:52:45,437 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:45,441 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
Source: kafka-source -> json-to-input -> wm-gen -> filter-drive-type (1/3)#1 
(5a99127bd6ee3bd816323586fb168e3c), deploy into slot with allocation id 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:45,441 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: kafka-source -> json-to-input -> wm-gen -> 
filter-drive-type (1/3)#1 (5a99127bd6ee3bd816323586fb168e3c) switched from 
CREATED to DEPLOYING.
2020-12-16 15:52:45,441 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Loading JAR files for task Source: kafka-source -> 
json-to-input -> wm-gen -> filter-drive-type (1/3)#1 
(5a99127bd6ee3bd816323586fb168e3c) [DEPLOYING].
2020-12-16 15:52:45,442 INFO  
org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Activate slot 
92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:45,442 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Registering task at network: Source: kafka-source -> 
json-to-input -> wm-gen -> filter-drive-type (1/3)#1 
(5a99127bd6ee3bd816323586fb168e3c) [DEPLOYING].
2020-12-16 15:52:45,444 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - Received task 
input-to-idata (1/1)#1 (f4299ed4cc3e532be710737686596ba7), deploy into slot 
with allocation id 92f2faca112a3a2522b83bb1671c33fb.
2020-12-16 15:52:45,445 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - input-to-idata (1/1)#1 (f4299ed4cc3e532be710737686596ba7) 
switched from CREATED to DEPLOYING.
2020-12-16 15:52:45,445 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Loading JAR files for task input-to-idata (1/1)#1 
(f4299ed4cc3e532be710737686596ba7) [DEPLOYING].
2020-12-16 15:52:45,445 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Registering task at network: input-to-idata (1/1)#1 
(f4299ed4cc3e532be710737686596ba7) [DEPLOYING].
2020-12-16 15:52:45,446 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:45,446 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2020-12-16 15:52:45,446 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-12-16 15:52:45,446 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:45,447 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: kafka-source -> json-to-input -> wm-gen -> 
filter-drive-type (1/3)#1 (5a99127bd6ee3bd816323586fb168e3c) switched from 
DEPLOYING to RUNNING.
2020-12-16 15:52:45,447 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
job/cluster config to configure application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:45,447 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2020-12-16 15:52:45,447 INFO  
org.apache.flink.contrib.streaming.state.RocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={}}.
2020-12-16 15:52:45,448 INFO  
org.apache.flink.streaming.runtime.tasks.StreamTask          [] - Using 
application-defined state backend: 
RocksDBStateBackend{checkpointStreamBackend=File State Backend (checkpoints: 
'hdfs:/stream/ckpts', savepoints: 'hdfs:/stream/ckpts', asynchronous: TRUE, 
fileStateThreshold: 20480), localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2020-12-16 15:52:45,448 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - input-to-idata (1/1)#1 (f4299ed4cc3e532be710737686596ba7) 
switched from DEPLOYING to RUNNING.
2020-12-16 15:52:45,604 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Attempting to cancel task input-to-idata (1/1)#1 
(f4299ed4cc3e532be710737686596ba7).
2020-12-16 15:52:45,604 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - input-to-idata (1/1)#1 (f4299ed4cc3e532be710737686596ba7) 
switched from RUNNING to CANCELING.
2020-12-16 15:52:45,604 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Triggering cancellation of task code input-to-idata (1/1)#1 
(f4299ed4cc3e532be710737686596ba7).
2020-12-16 15:52:45,607 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Attempting to cancel task Source: kafka-source -> json-to-input 
-> wm-gen -> filter-drive-type (1/3)#1 (5a99127bd6ee3bd816323586fb168e3c).
2020-12-16 15:52:45,607 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: kafka-source -> json-to-input -> wm-gen -> 
filter-drive-type (1/3)#1 (5a99127bd6ee3bd816323586fb168e3c) switched from 
RUNNING to CANCELING.
2020-12-16 15:52:45,607 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Triggering cancellation of task code Source: kafka-source -> 
json-to-input -> wm-gen -> filter-drive-type (1/3)#1 
(5a99127bd6ee3bd816323586fb168e3c).
2020-12-16 15:52:45,627 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - input-to-idata (1/1)#1 (f4299ed4cc3e532be710737686596ba7) 
switched from CANCELING to CANCELED.
2020-12-16 15:52:45,628 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Freeing task resources for input-to-idata (1/1)#1 
(f4299ed4cc3e532be710737686596ba7).
2020-12-16 15:52:45,628 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task input-to-idata (1/1)#1 f4299ed4cc3e532be710737686596ba7.
2020-12-16 15:52:45,639 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Source: kafka-source -> json-to-input -> wm-gen -> 
filter-drive-type (1/3)#1 (5a99127bd6ee3bd816323586fb168e3c) switched from 
CANCELING to CANCELED.
2020-12-16 15:52:45,639 INFO  org.apache.flink.runtime.taskmanager.Task         
           [] - Freeing task resources for Source: kafka-source -> 
json-to-input -> wm-gen -> filter-drive-type (1/3)#1 
(5a99127bd6ee3bd816323586fb168e3c).
2020-12-16 15:52:45,640 INFO  
org.apache.flink.runtime.taskexecutor.TaskExecutor           [] - 
Un-registering task and sending final execution state CANCELED to JobManager 
for task Source: kafka-source -> json-to-input -> wm-gen -> filter-drive-type 
(1/3)#1 5a99127bd6ee3bd816323586fb168e3c.

Reply via email to