Hi David, I was able to get this working using your suggestion:

1)    Deploy a Flink YARN Session Cluster, noting the host + port of the 
session’s Job Manager.

2)    Submit a Flink job using the session’s details, i.e submitting Flink job 
with ‘-m host:port’ option.

Thanks for clearing things up.

// ah

From: David Morávek <d...@apache.org>
Sent: Tuesday, August 17, 2021 4:37 AM
To: Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com>
Cc: Ravichandran, Soorya Prasanna [Engineering] 
<soorya.ravichand...@ny.email.gs.com>; user@flink.apache.org
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

the problem here is that the command you're using is starting a per-job cluster 
(which is obvious from the used deployment method 
"YarnClusterDescriptor.deployJobCluster"). Apparently the `-m yarn-cluster` 
flag is deprecated and no longer supported, I think this is something we should 
completely remove in the near future. Also this was always supposed to start 
your job in per-job mode, but unfortunately in older versions this was kind of 
simulated using session cluster, so I'd say it has just worked by an accident 
(a.k.a "undocumented bug / feature").

What you really want to do is to start a session cluster upfront and than use a 
`yarn-session` deployment target (where you need to provide yarn application id 
so Flink can search for the active JobManager). This is well documented in the 
yarn section of the 
docs<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=>
 [1].

Can you please try this approach a let me know if that helped?

[1] 
https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Dmaster_docs_deployment_resource-2Dproviders_yarn_-23session-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=iu5vv8EZhy9VwahC4h6axF6B3ID6YDDFOzJcKLO8-Tw&s=QDBi2Ei2xYUfeKmx2aBFVcrAAOvtM3_iMT6GKr0aG80&e=>

Best,
D.

On Mon, Aug 16, 2021 at 8:52 PM Hailu, Andreas [Engineering] 
<andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote:
Hi David,

You’re correct about classpathing problems – thanks for your help in spotting 
them. I was able to get past that exception by removing some conflicting 
packages in my shaded JAR, but I’m seeing something else that’s interesting. 
With the 2 threads trying to submit jobs, one of the threads is able submit and 
process data successfully, while the other thread fails.

Log snippet:
2021-08-16 13:43:12,893 [thread-1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,893 [thread-2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-16 13:43:12,897 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:12,897 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-2] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:13,104 [thread-1] WARN  PluginConfig - The plugins directory 
[plugins] does not exist.
2021-08-16 13:43:20,475 [thread-1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-16 13:43:20,488 [thread-1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56247060 for delp on ha-hdfs:d279536
2021-08-16 13:43:20,512 [thread-1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56247060 for delp)
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - Attempting to obtain Kerberos 
security token for HBase
2021-08-16 13:43:20,513 [thread-1] INFO  Utils - HBase is not available (not 
packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-16 13:43:20,564 [thread-2] WARN  YarnClusterDescriptor - Add job graph 
to local resource fail.
2021-08-16 13:43:20,570 [thread-1] INFO  YarnClusterDescriptor - Submitting 
application master application_1628992879699_11275
2021-08-16 13:43:20,570 [thread-2] ERROR FlowDataBase - Exception running data 
flow for thread-2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
            at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
            at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
            at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
            at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
            at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
            at 
com.gs.ep.da.lake.refinerlib.flink.ExecutionEnvironmentWrapper.execute(ExecutionEnvironmentWrapper.java:49)
...
Caused by: java.io.IOException: Filesystem closed
            at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
            at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
...
2021-08-16 13:43:20,979 [thread-1] INFO  TimelineClientImpl - Timeline service 
address: http://d279536-003.dc.gs.com:8188/ws/v1/timeline/
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClientImpl - Submitted application 
application_1628992879699_11275
2021-08-16 13:43:21,377 [thread-1] INFO  YarnClusterDescriptor - Waiting for 
the cluster to be allocated
2021-08-16 13:43:21,379 [thread-1] INFO  YarnClusterDescriptor - Deploying 
cluster, current state ACCEPTED
2021-08-16 13:43:28,435 [thread-1] INFO  YarnClusterDescriptor - YARN 
application has been deployed successfully.
2021-08-16 13:43:28,436 [thread-1] INFO  YarnClusterDescriptor - Found Web 
Interface d279536-023.dc.gs.com:41019<http://d279536-023.dc.gs.com:41019> of 
application 'application_1628992879699_11275'.
2021-08-16 13:43:28,443 [thread-1] INFO  AbstractJobClusterExecutor - Job has 
been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
Job has been submitted with JobID e15bac7f9fb4b8b0d60f996c3a0880f3
2021-08-16 13:43:38,629 [FlinkJobSubmitter.Poll] INFO  FlinkJobSubmitter$2 - 
job completed for thread-2 with parallelism 1
Program execution finished
Job with JobID e15bac7f9fb4b8b0d60f996c3a0880f3 has finished.

I’ve generated and sent you a signup link to our firm’s secure document-sharing 
app called Lockbox. In the repository, I’ve uploaded both our full client and 
YARN app logs (named half_failure-client_log and half_failure-yarn-log, 
respectively) in a directory named Flink support logs/Flink 1.11/1.11.2_POC. 
The logs are quite brief – would you be able to have a look at see if you can 
see if there’s something we’re doing that’s clearly wrong?

Something I did notice is that with the upgrade, our submissions are now using 
the introduction of this ContextEnvironment#executeAsync method. If it means 
anything, our client doesn’t require asynchronous job submission.
// ah

From: David Morávek <d...@apache.org<mailto:d...@apache.org>>
Sent: Monday, August 16, 2021 6:28 AM
To: Hailu, Andreas [Engineering] 
<andreas.ha...@ny.email.gs.com<mailto:andreas.ha...@ny.email.gs.com>>
Cc: user@flink.apache.org<mailto:user@flink.apache.org>
Subject: Re: Upgrading from Flink on YARN 1.9 to 1.11

Hi Andreas,

Per-job and session deployment modes should not be affected by this FLIP. 
Application mode is just a new deployment mode (where job driver runs embedded 
within JM), that co-exists with these two.

From information you've provided, I'd say your actual problem is this exception:

```
Caused by: java.lang.ExceptionInInitializerError
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)
```

I've seen this exception a few times with Hadoop already and it's usually a 
dependency / class-path problem. If you google for this you'll find many 
references.

Best,
D.


On Fri, Aug 13, 2021 at 9:40 PM Hailu, Andreas [Engineering] 
<andreas.ha...@gs.com<mailto:andreas.ha...@gs.com>> wrote:
Hello folks!

We’re looking to upgrade from 1.9 to 1.11. Our Flink applications run on YARN 
and each have their own clusters, with each application having multiple jobs 
submitted.

Our current submission command looks like this:
$ run -m yarn-cluster --class com.class.name.Here -p 2 -yqu queue-name -ynm 
app-name -yn 1 -ys 2 -yjm 4096 -ytm 12288 /path/to/artifact.jar 
-application-args-go-here

The behavior observed in versions <= 1.9 is the following:

1.     A Flink cluster gets deployed to YARN

2.     Our application code is run, building graphs and submitting jobs

When we rebuilt and submit using 1.11.2, we now observe the following:

1.     Our application code is run, building graph and submitting jobs

2.     A Flink cluster gets deployed to YARN once execute() is invoked

I presume that this is a result of FLIP-85 [1] ?

This change in behavior proves to be a problem for us as our application is 
multi-threaded, and each thread submits its own job to the Flink cluster. What 
we see is the first thread to peexecute() submits a job to YARN, and others 
fail with a ClusterDeploymentException.

2021-08-13 14:47:42,299 [flink-thread-#1] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,299 [flink-thread-#2] INFO  YarnClusterDescriptor - Cluster 
specification: ClusterSpecification{masterMemoryMB=4096, 
taskManagerMemoryMB=18432, slotsPerTaskManager=2}
2021-08-13 14:47:42,304 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:42,304 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
Listening for transport dt_socket at address: 5005
2021-08-13 14:47:46,716 [flink-thread-#2] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:46,716 [flink-thread-#1] WARN  PluginConfig - The plugins 
directory [plugins] does not exist.
2021-08-13 14:47:54,820 [flink-thread-#1] INFO  YarnClusterDescriptor - Adding 
delegation token to the AM container.
2021-08-13 14:47:54,837 [flink-thread-#1] INFO  DFSClient - Created 
HDFS_DELEGATION_TOKEN token 56208379 for delp on ha-hdfs:d279536
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  TokenCache - Got dt for 
hdfs://d279536; Kind: HDFS_DELEGATION_TOKEN, Service: ha-hdfs:d279536, Ident: 
(HDFS_DELEGATION_TOKEN token 56208379 for user)
2021-08-13 14:47:54,860 [flink-thread-#1] INFO  Utils - Attempting to obtain 
Kerberos security token for HBase
2021-08-13 14:47:54,861 [flink-thread-#1] INFO  Utils - HBase is not available 
(not packaged with this application): ClassNotFoundException : 
"org.apache.hadoop.hbase.HBaseConfiguration".
2021-08-13 14:47:54,901 [flink-thread-#1] INFO  YarnClusterDescriptor - 
Submitting application master application_1628393898291_71530
2021-08-13 14:47:54,904 [flink-thread-#2] ERROR FlowDataBase - Exception 
running data flow for flink-thread-#2
org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy 
Yarn job cluster.
        at 
org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:431)
        at 
org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
        at 
org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:973)
        at 
org.apache.flink.client.program.ContextEnvironment.executeAsync(ContextEnvironment.java:124)
        at 
org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:72)
        ...
Caused by: java.io.IOException: Filesystem closed
        at org.apache.hadoop.hdfs.DFSClient.checkOpen(DFSClient.java:826)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2152)
        at org.apache.hadoop.hdfs.DFSClient.listPaths(DFSClient.java:2138)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:919)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:114)
        ...
Caused by: java.lang.ExceptionInInitializerError
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:182)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.initReaders(MessageBodyFactory.java:175)
        at 
com.sun.jersey.core.spi.factory.MessageBodyFactory.init(MessageBodyFactory.java:162)
        at com.sun.jersey.api.client.Client.init(Client.java:342)
        at com.sun.jersey.api.client.Client.access$000(Client.java:118)
        at com.sun.jersey.api.client.Client$1.f(Client.java:191)
        at com.sun.jersey.api.client.Client$1.f(Client.java:187)
        at com.sun.jersey.spi.inject.Errors.processWithErrors(Errors.java:193)
        at com.sun.jersey.api.client.Client.<init>(Client.java:187)
       at com.sun.jersey.api.client.Client.<init>(Client.java:170)
        at 
org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl.serviceInit(TimelineClientImpl.java:285)

Is the only solution here to move to application mode [2]? Doing so would imply 
a migration requirement (which may have its own set of problems.)

[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-85+Flink+Application+Mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__cwiki.apache.org_confluence_display_FLINK_FLIP-2D85-2BFlink-2BApplication-2BMode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=jNWOsyLnWqYqe1rrtYoAAvkFMqIMdw2hdO1oeAj58DM&e=>
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/yarn_setup.html#run-an-application-in-application-mode<https://urldefense.proofpoint.com/v2/url?u=https-3A__ci.apache.org_projects_flink_flink-2Ddocs-2Drelease-2D1.11_ops_deployment_yarn-5Fsetup.html-23run-2Dan-2Dapplication-2Din-2Dapplication-2Dmode&d=DwMFaQ&c=7563p3e2zaQw0AB1wrFVgyagb2IE5rTZOYPxLxfZlX4&r=hRr4SA7BtUvKoMBP6VDhfisy2OJ1ZAzai-pcCC6TFXM&m=mnYz4bDVCgx2XkuVpk5Yd9zUWgu5ZpT__4F7X4Bw3fk&s=TYm-DbNnyhKJ8xvjIZ1rhYJ8LjO86DYVa653ZlIuA2M&e=>

Best,
Andreas

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Reply via email to