Hey Andreas,
thanks a lot for providing me with the full logs.

The JobManager actually received 2 job submissions.
There are 2 relevant log messages.
1. "Received JobGraph submission xxx"
2. "Submitting job"
1 is logged right after the dispatcher receives the JobGraph, before the
duplicate submission check is done. 2 is logged once we know that there is
no duplicate job.

We have the following log messages (which you actually posted in here on
the list already)

*TYPE 1:* 2021-01-20 14:06:58,225 INFO
 [flink-akka.actor.default-dispatcher-91]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed
Jan 20 14:01:42 EST 2021).
*TYPE 2:* 2021-01-20 14:06:58,225 INFO
 [flink-akka.actor.default-dispatcher-91]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting
job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42
EST 2021).
*TYPE 1:* 2021-01-20 14:08:45,199 INFO
 [flink-akka.actor.default-dispatcher-30]
org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed
Jan 20 14:01:42 EST 2021).

2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90]
org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping
the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST
2021(8e1c2fdd68feee100d8fee003efef3e2).


So at 14:06, you are submitting the job, at 14:09 it fails.

In between (14:08) you are trying to submit the job again, which gets
(rightfully) rejected. It seems that the second submission didn't get
logged properly in your client.
I don't think there's a bug on Flink's side of things.


On Thu, Jan 21, 2021 at 7:17 PM Hailu, Andreas [Engineering] <
andreas.ha...@gs.com> wrote:

> Hi Robert,
>
>
>
> I sent you an email with instructions to create an account to view the
> logs through our secure repository. I’ve included the JobManager and client
> application logs there.
>
>
>
> We have a thread pool that we use to submit multiple jobs in parallel, but
> in there there’s no retry logic – if any one job fails, it’s an overall
> failure for the entire application. In regards to the timespan between when
> the job was logged to have been initially submitted from the client app
> logs and when the JobManager logs it as being received – we’re submitting a
> large number of jobs as a part of this application. Is it possible that
> it’s busy processing other jobs?
>
>
>
> *// *ah
>
>
>
> *From:* Robert Metzger <rmetz...@apache.org>
> *Sent:* Thursday, January 21, 2021 10:00 AM
> *To:* Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com>
> *Cc:* user@flink.apache.org
> *Subject:* Re: org.apache.flink.runtime.client.JobSubmissionException:
> Job has already been submitted
>
>
>
> Thanks a lot for your message.
>
>
>
> Why is there a difference of 5 minutes between the timestamp of the job
> submission from the client to the timestamp on the JobManager where the
> submission is received?
>
> Is there any service / custom logic involved in the job submission? (e.g.
> a proxy in between, that has some retry mechanism, or some custom code that
> does retries?)
>
>
>
> Could you provide the full JobManager logs of that timeframe, not just
> those messages filtered for 8e1c2fdd68feee100d8fee003efef3e2?
>
>
>
> On Wed, Jan 20, 2021 at 10:20 PM Hailu, Andreas [Engineering] <
> andreas.ha...@gs.com> wrote:
>
> Hello,
>
>
>
> We’re running 1.9.2 on YARN, and are seeing some interesting behavior when
> submitting jobs in a multi-threaded fashion to an application’s Flink
> cluster. The error we see reported in the client application logs is the
> following:
>
>
>
> org.apache.flink.client.program.ProgramInvocationException: Could not
> retrieve the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2)
>
>            at
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255)
>
>            at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
>
>            at
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326)
>
>            at
> org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62)
>
> ...
>
>            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>            at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
>            at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>
>            at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>
>            at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>
>            at java.lang.Thread.run(Thread.java:745)
>
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit JobGraph.
>
>            at
> org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391)
>
>            at
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>
>            at
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>
>            at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>            at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>
>            at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263)
>
>            at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>
>            at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>
>            at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>
>            at
> java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561)
>
>            at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929)
>
>            at
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442)
>
>            ... 3 more
>
> Caused by: org.apache.flink.runtime.rest.util.RestClientException:
> [Internal server error., <Exception on server side:
>
> org.apache.flink.runtime.client.JobSubmissionException: Job has already
> been submitted.
>
>
>
> Looking through the JobManager logs, I see this interesting sequence for
> JobID 8e1c2fdd68feee100d8fee003efef3e2:
>
>
>
> 2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
> JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed
> Jan 20 14:01:42 EST 2021).
>
> 2021-01-20 14:06:58,225 INFO  [flink-akka.actor.default-dispatcher-91]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Submitting
> job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42
> EST 2021).
>
> 2021-01-20 14:07:25,843 INFO  [flink-akka.actor.default-dispatcher-16]
> org.apache.flink.runtime.jobmaster.JobMaster                  -
> Initializing job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
> (8e1c2fdd68feee100d8fee003efef3e2).
>
> 2021-01-20 14:07:26,109 INFO  [flink-akka.actor.default-dispatcher-16]
> org.apache.flink.runtime.jobmaster.JobMaster                  - Using
> restart strategy NoRestartStrategy for Flink Java Job at Wed Jan 20
> 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2).
>
> 2021-01-20 14:07:28,705 INFO  [flink-akka.actor.default-dispatcher-16]
> org.apache.flink.runtime.jobmaster.JobMaster                  - Running
> initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST
> 2021 (8e1c2fdd68feee100d8fee003efef3e2).
>
> 2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-16]
> org.apache.flink.runtime.jobmaster.JobManagerRunner           - JobManager
> runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
> (8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id
> 00000000-0000-0000-0000-000000000000 at akka.tcp://
> fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124.
>
> 2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64]
> org.apache.flink.runtime.jobmaster.JobMaster                  - Starting
> execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021
> (8e1c2fdd68feee100d8fee003efef3e2) under job master id
> 00000000000000000000000000000000.
>
> 2021-01-20 14:07:29,821 INFO  [flink-akka.actor.default-dispatcher-64]
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink
> Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2)
> switched from state CREATED to RUNNING.
>
> 2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2]
> org.apache.flink.yarn.YarnResourceManager                     - Registering
> job manager 00000000000000000000000000000...@akka.tcp://
> fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 for job
> 8e1c2fdd68feee100d8fee003efef3e2.
>
> 2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-2]
> org.apache.flink.yarn.YarnResourceManager                     - Registered
> job manager 
> 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124
> for job 8e1c2fdd68feee100d8fee003efef3e2.
>
> 2021-01-20 14:07:29,822 INFO  [flink-akka.actor.default-dispatcher-64]
> org.apache.flink.yarn.YarnResourceManager                     - Request
> slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1,
> directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1,
> managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with
> allocation id 5bca3bde577f93169928e04749b45343.
>
> 2021-01-20 14:08:45,199 INFO  [flink-akka.actor.default-dispatcher-30]
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher      - Received
> JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed
> Jan 20 14:01:42 EST 2021).
>
> 2021-01-20 14:09:19,981 INFO  [flink-akka.actor.default-dispatcher-90]
> org.apache.flink.runtime.jobmaster.JobMaster                  - Stopping
> the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST
> 2021(8e1c2fdd68feee100d8fee003efef3e2).
>
> 2021-01-20 14:09:19,982 INFO  [flink-akka.actor.default-dispatcher-90]
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink
> Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2)
> switched from state RUNNING to FAILING.
>
>
>
> It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the
> cluster somehow received the submission request twice? The client log only
> show a single submission for this job:
>
>
>
> 2021-01-20 14:01:55,775 [ProductHistory-18359] INFO  RestClusterClient -
> Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false).
>
>
>
> So while the job is submitted a single time, the dispatcher somehow tries
> to perform two submissions resulting in a failure. How does this happen?
>
>
>
> ____________
>
>
>
> *Andreas Hailu*
>
> *Data Lake Engineering *| Goldman Sachs & Co.
>
>
>
>
> ------------------------------
>
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>
>
> ------------------------------
>
> Your Personal Data: We may collect and process information about you that
> may be subject to data protection laws. For more information about how we
> use and disclose your personal data, how we protect your information, our
> legal basis to use your information, your rights and who you can contact,
> please refer to: www.gs.com/privacy-notices
>

Reply via email to