Hello folks!

We're looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN 
and each have their own clusters, with each application having multiple jobs 
submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm 
app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar 
-application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1.     A Flink cluster gets deployed to YARN

2.     Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1.     Our application code is run, building graph and submitting jobs

2.     A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is 
multi-threaded, and each thread submits its own job to the Flink cluster. What 
we see is the first thread to peexecute() submits a job to YARN, and others 
fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain 
Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available 
(not packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - 
Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception 
running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
        at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
        at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
        at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
        ...
Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        ...
Caused by: java.lang.ExceptionInInitializerError
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)

Is the only solution here to move to application mode [2]? Doing so would imply 
a migration requirement (which may have its own set of problems.)

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Reply via email to