Hi David, I was able to get this working using your suggestion:
1) Deploy a Flink YARN Session Cluster, noting the host + port of the session’s Job Manager. 2) Submit a Flink job using the session’s details, i.e submitting Flink job with ‘-m host:port’ option. Thanks for clearing things up. // ah From: David Morávek <d...@apache.org> Sent: Tuesday, August 17, 2021 4:37 AM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com> Cc: Ravichandran, Soorya Prasanna [Engineering] <soorya.ravichand...@ny.email.gs.com>; user@flink.apache.org Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11 Hi Andreas, the problem here is that the command you're using is starting a per-job cluster (which is obvious from the used deployment method "YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster` flag is deprecated and no longer supported, I think this is something we should completely remove in the near future. Also this was always supposed to start your job in per-job mode, but unfortunately in older versions this was kind of simulated using session cluster, so I'd say it has just worked by an accident (a.k.a "undocumented bug / feature"). What you really want to do is to start a session cluster upfront and than use a `yarn-session` deployment target (where you need to provide yarn application id so Flink can search for the active JobManager). This is well documented in the yarn section of the docs<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=> [1]. Can you please try this approach a let me know if that helped? [1] https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=> Best, D. On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote: Hi David, You’re correct about classpathing problems – thanks for your help in spotting them. I was able to get past that exception by removing some conflicting packages in my shaded JAR, but I’m seeing something else that’s interesting. With the 2 threads trying to submit jobs, one of the threads is able submit and process data successfully, while the other thread fails. Log snippet: 2021-08-16 13:43:12,893 [thread-1] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-16 13:43:12,893 [thread-2] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-16 13:43:12,897 [thread-2] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-16 13:43:12,897 [thread-1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-16 13:43:13,104 [thread-2] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-16 13:43:13,104 [thread-1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-16 13:43:20,475 [thread-1] INFO YarnClusterDescriptor - Adding delegation token to the AM container. 2021-08-16 13:43:20,488 [thread-1] INFO DFSClient - Created HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536 2021-08-16 13:43:20,512 [thread-1] INFO TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56247060 for delp) 2021-08-16 13:43:20,513 [thread-1] INFO Utils - Attempting to obtain Kerberos security token for HBase 2021-08-16 13:43:20,513 [thread-1] INFO Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration". 2021-08-16 13:43:20,564 [thread-2] WARN YarnClusterDescriptor - Add job graph to local resource fail. 2021-08-16 13:43:20,570 [thread-1] INFO YarnClusterDescriptor - Submitting application master application_1628992879699_11275 2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data flow for thread-2 org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) at com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49) ... Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152) ... 2021-08-16 13:43:20,979 [thread-1] INFO TimelineClientImpl - Timeline service address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/ 2021-08-16 13:43:21,377 [thread-1] INFO YarnClientImpl - Submitted application application_1628992879699_11275 2021-08-16 13:43:21,377 [thread-1] INFO YarnClusterDescriptor - Waiting for the cluster to be allocated 2021-08-16 13:43:21,379 [thread-1] INFO YarnClusterDescriptor - Deploying cluster, current state ACCEPTED 2021-08-16 13:43:28,435 [thread-1] INFO YarnClusterDescriptor - YARN application has been deployed successfully. 2021-08-16 13:43:28,436 [thread-1] INFO YarnClusterDescriptor - Found Web Interface d279536-023.dc.gs.com:41019<http://d279536-023.dc.gs.com:41019> of application 'application_1628992879699_11275'. 2021-08-16 13:43:28,443 [thread-1] INFO AbstractJobClusterExecutor - Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO FlinkJobSubmitter$2 - job completed for thread-2 with parallelism 1 Program execution finished Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished. I’ve generated and sent you a signup link to our firm’s secure document-sharing app called Lockbox. In the repository, I’ve uploaded both our full client and YARN app logs (named half_failure-client_log and half_failure-yarn-log, respectively) in a directory named Flink support logs/Flink 1.11/1.11.2_POC. The logs are quite brief – would you be able to have a look at see if you can see if there’s something we’re doing that’s clearly wrong? Something I did notice is that with the upgrade, our submissions are now using the introduction of this ContextEnvironment#executeAsync method. If it means anything, our client doesn’t require asynchronous job submission. // ah From: David Morávek <d...@apache.org<mailto:d...@apache.org>> Sent: Monday, August 16, 2021 6:28 AM To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com<mailto:andreas.ha...@ny.email.gs.com>> Cc: user@flink.apache.org<mailto:user@flink.apache.org> Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11 Hi Andreas, Per-job and session deployment modes should not be affected by this FLIP. Application mode is just a new deployment mode (where job driver runs embedded within JM), that co-exists with these two. From information you've provided, I'd say your actual problem is this exception: ``` Caused by: java.lang.ExceptionInInitializerError at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client.<init>(Client.java:187) at com.sun.jersey.api.client.Client.<init>(Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) ``` I've seen this exception a few times with Hadoop already and it's usually a dependency / class-path problem. If you google for this you'll find many references. Best, D. On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] <andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote: Hello folks! We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN and each have their own clusters, with each application having multiple jobs submitted. Our current submission command looks like this: $ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar -application-args-go-here The behavior observed in versions <= 1.9 is the following: 1. A Flink cluster gets deployed to YARN 2. Our application code is run, building graphs and submitting jobs When we rebuilt and submit using 1.11.2, we now observe the following: 1. Our application code is run, building graph and submitting jobs 2. A Flink cluster gets deployed to YARN once execute() is invoked I presume that this is a result of FLIP-85 [1] ? This change in behavior proves to be a problem for us as our application is multi-threaded, and each thread submits its own job to the Flink cluster. What we see is the first thread to peexecute() submits a job to YARN, and others fail with a ClusterDeploymentException. 2021-08-13 14:47:42,299 [flink-thread-#1] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-13 14:47:42,299 [flink-thread-#2] INFO YarnClusterDescriptor - Cluster specification: ClusterSpecification{masterMemoryMB=4096, taskManagerMemoryMB=18432, slotsPerTaskManager=2} 2021-08-13 14:47:42,304 [flink-thread-#1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:42,304 [flink-thread-#2] WARN PluginConfig - The plugins directory [plugins] does not exist. Listening for transport dt_socket at address: 5005 2021-08-13 14:47:46,716 [flink-thread-#2] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:46,716 [flink-thread-#1] WARN PluginConfig - The plugins directory [plugins] does not exist. 2021-08-13 14:47:54,820 [flink-thread-#1] INFO YarnClusterDescriptor - Adding delegation token to the AM container. 2021-08-13 14:47:54,837 [flink-thread-#1] INFO DFSClient - Created HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536 2021-08-13 14:47:54,860 [flink-thread-#1] INFO TokenCache - Got dt for hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: (HDFS_DELEGATION_TOKEN token 56208379 for user) 2021-08-13 14:47:54,860 [flink-thread-#1] INFO Utils - Attempting to obtain Kerberos security token for HBase 2021-08-13 14:47:54,861 [flink-thread-#1] INFO Utils - HBase is not available (not packaged with this application): ClassNotFoundException : "org.apache.hadoop.hbase.HBaseConfiguration". 2021-08-13 14:47:54,901 [flink-thread-#1] INFO YarnClusterDescriptor - Submitting application master application_1628393898291_71530 2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception running data flow for flink-thread-#2 org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70) at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973) at org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124) at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72) ... Caused by: java.io.IOException: Filesystem closed at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152) at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138) at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919) at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114) ... Caused by: java.lang.ExceptionInInitializerError at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182) at com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175) at com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162) at com.sun.jersey.api.client.Client.init(Client.java:342) at com.sun.jersey.api.client.Client.access$000(Client.java:118) at com.sun.jersey.api.client.Client$1.f(Client.java:191) at com.sun.jersey.api.client.Client$1.f(Client.java:187) at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193) at com.sun.jersey.api.client.Client.<init>(Client.java:187) at com.sun.jersey.api.client.Client.<init>(Client.java:170) at org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285) Is the only solution here to move to application mode [2]? Doing so would imply a migration requirement (which may have its own set of problems.) [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=> [2] https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=> Best, Andreas ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices> ________________________________ Your Personal Data: We may collect and process information about you that may be subject to data protection laws. For more information about how we use and disclose your personal data, how we protect your information, our legal basis to use your information, your rights and who you can contact, please refer to: www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>