Hey Andreas, thanks a lot for providing me with the full logs. The JobManager actually received 2 job submissions. There are 2 relevant log messages. 1. "Received JobGraph submission xxx" 2. "Submitting job" 1 is logged right after the dispatcher receives the JobGraph, before the duplicate submission check is done. 2 is logged once we know that there is no duplicate job.
We have the following log messages (which you actually posted in here on the list already) *TYPE 1:* 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). *TYPE 2:* 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). *TYPE 1:* 2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30] org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 EST 2021). 2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90] org.apache.flink.runtime.jobmaster.JobMaster - Stopping the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021(8e1c2fdd68feee100d8fee003efef3e2). So at 14:06, you are submitting the job, at 14:09 it fails. In between (14:08) you are trying to submit the job again, which gets (rightfully) rejected. It seems that the second submission didn't get logged properly in your client. I don't think there's a bug on Flink's side of things. On Thu, Jan 21, 2021 at 7:17 PM Hailu, Andreas [Engineering] < andreas.ha...@gs.com> wrote: > Hi Robert, > > > > I sent you an email with instructions to create an account to view the > logs through our secure repository. I’ve included the JobManager and client > application logs there. > > > > We have a thread pool that we use to submit multiple jobs in parallel, but > in there there’s no retry logic – if any one job fails, it’s an overall > failure for the entire application. In regards to the timespan between when > the job was logged to have been initially submitted from the client app > logs and when the JobManager logs it as being received – we’re submitting a > large number of jobs as a part of this application. Is it possible that > it’s busy processing other jobs? > > > > *// *ah > > > > *From:* Robert Metzger <rmetz...@apache.org> > *Sent:* Thursday, January 21, 2021 10:00 AM > *To:* Hailu, Andreas [Engineering] <andreas.ha...@ny.email.gs.com> > *Cc:* user@flink.apache.org > *Subject:* Re: org.apache.flink.runtime.client.JobSubmissionException: > Job has already been submitted > > > > Thanks a lot for your message. > > > > Why is there a difference of 5 minutes between the timestamp of the job > submission from the client to the timestamp on the JobManager where the > submission is received? > > Is there any service / custom logic involved in the job submission? (e.g. > a proxy in between, that has some retry mechanism, or some custom code that > does retries?) > > > > Could you provide the full JobManager logs of that timeframe, not just > those messages filtered for 8e1c2fdd68feee100d8fee003efef3e2? > > > > On Wed, Jan 20, 2021 at 10:20 PM Hailu, Andreas [Engineering] < > andreas.ha...@gs.com> wrote: > > Hello, > > > > We’re running 1.9.2 on YARN, and are seeing some interesting behavior when > submitting jobs in a multi-threaded fashion to an application’s Flink > cluster. The error we see reported in the client application logs is the > following: > > > > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 8e1c2fdd68feee100d8fee003efef3e2) > > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:255) > > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) > > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:326) > > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:62) > > ... > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > > at java.lang.Thread.run(Thread.java:745) > > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Failed > to submit JobGraph. > > at > org.apache.flink.client.program.rest.RestClusterClient.lambda$submitJob$8(RestClusterClient.java:391) > > at > java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870) > > at > java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$8(FutureUtils.java:263) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > > at > java.util.concurrent.CompletableFuture.postFire(CompletableFuture.java:561) > > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:929) > > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:442) > > ... 3 more > > Caused by: org.apache.flink.runtime.rest.util.RestClientException: > [Internal server error., <Exception on server side: > > org.apache.flink.runtime.client.JobSubmissionException: Job has already > been submitted. > > > > Looking through the JobManager logs, I see this interesting sequence for > JobID 8e1c2fdd68feee100d8fee003efef3e2: > > > > 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received > JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed > Jan 20 14:01:42 EST 2021). > > 2021-01-20 14:06:58,225 INFO [flink-akka.actor.default-dispatcher-91] > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting > job 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed Jan 20 14:01:42 > EST 2021). > > 2021-01-20 14:07:25,843 INFO [flink-akka.actor.default-dispatcher-16] > org.apache.flink.runtime.jobmaster.JobMaster - > Initializing job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 > (8e1c2fdd68feee100d8fee003efef3e2). > > 2021-01-20 14:07:26,109 INFO [flink-akka.actor.default-dispatcher-16] > org.apache.flink.runtime.jobmaster.JobMaster - Using > restart strategy NoRestartStrategy for Flink Java Job at Wed Jan 20 > 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2). > > 2021-01-20 14:07:28,705 INFO [flink-akka.actor.default-dispatcher-16] > org.apache.flink.runtime.jobmaster.JobMaster - Running > initialization on master for job Flink Java Job at Wed Jan 20 14:01:42 EST > 2021 (8e1c2fdd68feee100d8fee003efef3e2). > > 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-16] > org.apache.flink.runtime.jobmaster.JobManagerRunner - JobManager > runner for job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 > (8e1c2fdd68feee100d8fee003efef3e2) was granted leadership with session id > 00000000-0000-0000-0000-000000000000 at akka.tcp:// > fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124. > > 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] > org.apache.flink.runtime.jobmaster.JobMaster - Starting > execution of job Flink Java Job at Wed Jan 20 14:01:42 EST 2021 > (8e1c2fdd68feee100d8fee003efef3e2) under job master id > 00000000000000000000000000000000. > > 2021-01-20 14:07:29,821 INFO [flink-akka.actor.default-dispatcher-64] > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink > Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) > switched from state CREATED to RUNNING. > > 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] > org.apache.flink.yarn.YarnResourceManager - Registering > job manager 00000000000000000000000000000...@akka.tcp:// > fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 for job > 8e1c2fdd68feee100d8fee003efef3e2. > > 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-2] > org.apache.flink.yarn.YarnResourceManager - Registered > job manager > 00000000000000000000000000000...@akka.tcp://fl...@d43723-191.dc.gs.com:37966/user/jobmanager_124 > for job 8e1c2fdd68feee100d8fee003efef3e2. > > 2021-01-20 14:07:29,822 INFO [flink-akka.actor.default-dispatcher-64] > org.apache.flink.yarn.YarnResourceManager - Request > slot with profile ResourceProfile{cpuCores=-1.0, heapMemoryInMB=-1, > directMemoryInMB=-1, nativeMemoryInMB=-1, networkMemoryInMB=-1, > managedMemoryInMB=-1} for job 8e1c2fdd68feee100d8fee003efef3e2 with > allocation id 5bca3bde577f93169928e04749b45343. > > 2021-01-20 14:08:45,199 INFO [flink-akka.actor.default-dispatcher-30] > org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received > JobGraph submission 8e1c2fdd68feee100d8fee003efef3e2 (Flink Java Job at Wed > Jan 20 14:01:42 EST 2021). > > 2021-01-20 14:09:19,981 INFO [flink-akka.actor.default-dispatcher-90] > org.apache.flink.runtime.jobmaster.JobMaster - Stopping > the JobMaster for job Flink Java Job at Wed Jan 20 14:01:42 EST > 2021(8e1c2fdd68feee100d8fee003efef3e2). > > 2021-01-20 14:09:19,982 INFO [flink-akka.actor.default-dispatcher-90] > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink > Java Job at Wed Jan 20 14:01:42 EST 2021 (8e1c2fdd68feee100d8fee003efef3e2) > switched from state RUNNING to FAILING. > > > > It would appear that the job for ID 8e1c2fdd68feee100d8fee003efef3e2, the > cluster somehow received the submission request twice? The client log only > show a single submission for this job: > > > > 2021-01-20 14:01:55,775 [ProductHistory-18359] INFO RestClusterClient - > Submitting job 8e1c2fdd68feee100d8fee003efef3e2 (detached: false). > > > > So while the job is submitted a single time, the dispatcher somehow tries > to perform two submissions resulting in a failure. How does this happen? > > > > ____________ > > > > *Andreas Hailu* > > *Data Lake Engineering *| Goldman Sachs & Co. > > > > > ------------------------------ > > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices > > > ------------------------------ > > Your Personal Data: We may collect and process information about you that > may be subject to data protection laws. For more information about how we > use and disclose your personal data, how we protect your information, our > legal basis to use your information, your rights and who you can contact, > please refer to: www.gs.com/privacy-notices >