Robert is right. We Could only support single job submission in application mode when the HA mode is enabled.
This is a known limitation of current application mode implementation. Best, Yang Robert Metzger <rmetz...@apache.org> 于2021年6月24日周四 上午3:54写道: > Thanks a lot for checking again. I just started Flink in Application mode > with a jar that contains two "executeAsync" submissions, and indeed two > jobs are running. > > I think the problem in your case is that you are using High Availability > (I guess, because there are log statements from the > ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]: > > The Application Mode allows for multi-execute() applications but >> High-Availability is not supported in these cases. High-Availability in >> Application Mode is only supported for single-execute() applications. > > > See also: https://issues.apache.org/jira/browse/FLINK-19358 > > Sorry again that I gave you invalid information in my first answer. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ > > > > > On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote: > >> Hi Robert, >> >> But I saw Flink doc shows application mode can run multiple jobs? Or I >> misunderstand it? >> >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ >> >> >> >> *Compared to the Per-Job mode, the Application Mode allows the submission of >> applications consisting of multiple jobs. The order of job execution is not >> affected by the deployment mode but by the call used to launch the job. >> Using execute(), which is blocking, establishes an order and it will lead to >> the execution of the "next" job being postponed until "this" job finishes. >> Using executeAsync(), which is non-blocking, will lead to the "next" job >> starting before "this" job finishes.* >> >> >> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi Qihua, >>> >>> Application Mode is meant for executing one job at a time, not multiple >>> jobs on the same JobManager. >>> If you want to do that, you need to use session mode, which allows >>> managing multiple jobs on the same JobManager. >>> >>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote: >>> >>>> Hi Arvid, >>>> >>>> Do you know if I can start multiple jobs for a single flink application? >>>> >>>> Thanks, >>>> Qihua >>>> >>>> >>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> I am using application mode. >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote: >>>>> >>>>>> Hi Qihua, >>>>>> >>>>>> Which execution mode are you using? >>>>>> >>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Thank you for your reply. What I want is flink app has multiple >>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job >>>>>>> that >>>>>>> manage multiple streams. >>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the >>>>>>> log, when the second executeAsync() was called, it shows " *Job >>>>>>> 00000000000000000000000000000000 was recovered successfully.*" >>>>>>> Looks like the second executeAsync() recover the first job. Not >>>>>>> start a second job. >>>>>>> >>>>>>> Thanks, >>>>>>> Qihua >>>>>>> >>>>>>> >>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use >>>>>>>> executeAsync or use a separate thread to submit the second job. If Job >>>>>>>> 1 >>>>>>>> finishes then this would also work by having sequential execution. >>>>>>>> >>>>>>>> However, I think what you actually want to do is to use the same >>>>>>>> env with 2 topologies and 1 single execute like this. >>>>>>>> >>>>>>>> StreamExecutionEnvironment env = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>> SourceFunction<String>()); >>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>> SourceFunction<String>()); >>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>> env.execute("Job 1+2"); >>>>>>>> >>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi, >>>>>>>>> Does anyone know how to run multiple jobs in same flink >>>>>>>>> application? >>>>>>>>> I did a simple test. First job was started. I did see the log >>>>>>>>> message, but I didn't see the second job was started, even I saw the >>>>>>>>> log >>>>>>>>> message. >>>>>>>>> >>>>>>>>> public void testJobs() throws Exception { >>>>>>>>> StreamExecutionEnvironment env = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> DataStream<String> stream1 = env.addSource(new >>>>>>>>> SourceFunction<String>()); >>>>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>>>> printf("### first job"); >>>>>>>>> env.execute("Job 1"); >>>>>>>>> >>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>>>> DataStream<String> stream2 = env.addSource(new >>>>>>>>> SourceFunction<String>()); >>>>>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>>>>> printf("### second job"); >>>>>>>>> env.execute("Job 2"); >>>>>>>>> } >>>>>>>>> >>>>>>>>> Here is the log: >>>>>>>>> ### first job >>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>>> Job 00000000000000000000000000000000 is submitted. >>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>>> Submitting Job with JobId=00000000000000000000000000000000. >>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>>>>>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>>>>>> Submitting job 00000000000000000000000000000000 (job1). >>>>>>>>> >>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>> /leader/resource_manager_lock. >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job >>>>>>>>> master >>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>>>>>> scheduling with scheduling strategy >>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>>>>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - >>>>>>>>> Job job1 (00000000000000000000000000000000) switched from state >>>>>>>>> CREATED to >>>>>>>>> RUNNING. >>>>>>>>> >>>>>>>>> ### second job >>>>>>>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - >>>>>>>>> IndexWriter : ### second job >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>>>>>>> ResourceManager address, beginning registration >>>>>>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>>>>>> Starting ZooKeeperLeaderRetrievalService >>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>> - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>>> 00000000000000000000000000000000. >>>>>>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>>>>>> Job 00000000000000000000000000000000 was recovered successfully. >>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>> - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07 >>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>>>>>>> 00000000000000000000000000000000. >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>>>>>>> successfully registered at ResourceManager, leader id: >>>>>>>>> 956d4431ca90d45d92c027046cd0404e. >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] >>>>>>>>> and >>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>> 21134414fc60d4ef3e940609cef960b6. >>>>>>>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] >>>>>>>>> and >>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>>>>>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager >>>>>>>>> - Request slot with profile ResourceProfile{UNKNOWN} for job >>>>>>>>> 00000000000000000000000000000000 with allocation id >>>>>>>>> 650bd9100a35ef5086fd4614f5253b55. >>>>>>>>> >>>>>>>>