Hello folks! We're looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted.
Our current submission command looks like this: $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar -application-args-go-here The behavior observed in versions <= 1.9 is the following: 1. A Flink cluster gets deployed to YARN 2. Our application code is run, building graphs and submitting jobs When we rebuilt and submit using 1.11.2, we now observe the following: 1. Our application code is run, building graph and submitting jobs 2. A Flink cluster gets deployed to YARN once execute() is invoked I presume that this is a result of FLIP-85 [1] ? This change in behavior proves to be a problem for us as our application is multi-threaded, and each thread submits its own job to the Flink cluster. What we see is the first thread to peexecute() submits a job to YARN, and others fail with a ClusterDeploymentException. 2021-08-13 14:47:42,299 [flink-thread-#1] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-13 14:47:42,299 [flink-thread-#2] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-13 14:47:42,304 [flink-thread-#1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:42,304 [flink-thread-#2] WARN PluginConfig - The plugins directory [plugins] does not exist. Listening for transport dt_socket at address: 5005 2021-08-13 14:47:46,716 [flink-thread-#2] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:46,716 [flink-thread-#1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:54,820 [flink-thread-#1] INFO YarnClusterDescriptor - Adding delegation token to the AM container. 2021-08-13 14:47:54,837 [flink-thread-#1] INFO DFSClient - Created HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536 2021-08-13 14:47:54,860 [flink-thread-#1] INFO TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user) 2021-08-13 14:47:54,860 [flink-thread-#1] INFO Utils - Attempting to obtain Kerberos security token for HBase 2021-08-13 14:47:54,861 [flink-thread-#1] INFO Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration". 2021-08-13 14:47:54,901 [flink-thread-#1] INFO YarnClusterDescriptor - Submitting application master application_1628393898291_71530 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception running data flow for flink-thread-#2 org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) ... Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114) ... Caused by: java.lang.ExceptionInInitializerError at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client.<init>(Client.java:187) at com.sun.jersey.api.client.Client.<init>(Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) Is the only solution here to move to application mode [2]? Doing so would imply a migration requirement (which may have its own set of problems.) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode Best, Andreas ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>