Robert is right. We Could only support single job submission in application
mode when the HA mode is enabled.

This is a known limitation of current application mode implementation.

Best,
Yang

Robert Metzger <rmetz...@apache.org> 于2021年6月24日周四 上午3:54写道:

> Thanks a lot for checking again. I just started Flink in Application mode
> with a jar that contains two "executeAsync" submissions, and indeed two
> jobs are running.
>
> I think the problem in your case is that you are using High Availability
> (I guess, because there are log statements from the
> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]:
>
> The Application Mode allows for multi-execute() applications but
>> High-Availability is not supported in these cases. High-Availability in
>> Application Mode is only supported for single-execute() applications.
>
>
> See also: https://issues.apache.org/jira/browse/FLINK-19358
>
> Sorry again that I gave you invalid information in my first answer.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>
>
>
>
> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang <yang...@gmail.com> wrote:
>
>> Hi Robert,
>>
>> But I saw Flink doc shows application mode can run multiple jobs? Or I
>> misunderstand it?
>>
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/
>>
>>
>>
>> *Compared to the Per-Job mode, the Application Mode allows the submission of 
>> applications consisting of multiple jobs. The order of job execution is not 
>> affected by the deployment mode but by the call used to launch the job. 
>> Using execute(), which is blocking, establishes an order and it will lead to 
>> the execution of the "next" job being postponed until "this" job finishes. 
>> Using executeAsync(), which is non-blocking, will lead to the "next" job 
>> starting before "this" job finishes.*
>>
>>
>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Qihua,
>>>
>>> Application Mode is meant for executing one job at a time, not multiple
>>> jobs on the same JobManager.
>>> If you want to do that, you need to use session mode, which allows
>>> managing multiple jobs on the same JobManager.
>>>
>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote:
>>>
>>>> Hi Arvid,
>>>>
>>>> Do you know if I can start multiple jobs for a single flink application?
>>>>
>>>> Thanks,
>>>> Qihua
>>>>
>>>>
>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> I am using application mode.
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi Qihua,
>>>>>>
>>>>>> Which execution mode are you using?
>>>>>>
>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thank you for your reply. What I want is flink app has multiple
>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job 
>>>>>>> that
>>>>>>> manage multiple streams.
>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the
>>>>>>> log, when the second executeAsync() was called, it shows " *Job
>>>>>>> 00000000000000000000000000000000 was recovered successfully.*"
>>>>>>> Looks like the second executeAsync() recover the first job. Not
>>>>>>> start a second job.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Qihua
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>>>> executeAsync or use a separate thread to submit the second job. If Job 
>>>>>>>> 1
>>>>>>>> finishes then this would also work by having sequential execution.
>>>>>>>>
>>>>>>>> However, I think what you actually want to do is to use the same
>>>>>>>> env with 2 topologies and 1 single execute like this.
>>>>>>>>
>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>> SourceFunction<String>());
>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>> env.execute("Job 1+2");
>>>>>>>>
>>>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>> Does anyone know how to run multiple jobs in same flink
>>>>>>>>> application?
>>>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>>>> message, but I didn't see the second job was started, even I saw the 
>>>>>>>>> log
>>>>>>>>> message.
>>>>>>>>>
>>>>>>>>> public void testJobs() throws Exception {
>>>>>>>>> StreamExecutionEnvironment env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>>>> SourceFunction<String>());
>>>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>>>> printf("### first job");
>>>>>>>>> env.execute("Job 1");
>>>>>>>>>
>>>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>>>> SourceFunction<String>());
>>>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>>>> printf("### second job");
>>>>>>>>>     env.execute("Job 2");
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Here is the log:
>>>>>>>>> ### first job
>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>>> Job 00000000000000000000000000000000 is submitted.
>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>>>>>
>>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>>> Starting ZooKeeperLeaderRetrievalService 
>>>>>>>>> /leader/resource_manager_lock.
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>>> execution of job job1 (00000000000000000000000000000000) under job 
>>>>>>>>> master
>>>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>>>> scheduling with scheduling strategy
>>>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  -
>>>>>>>>> Job job1 (00000000000000000000000000000000) switched from state 
>>>>>>>>> CREATED to
>>>>>>>>> RUNNING.
>>>>>>>>>
>>>>>>>>> ### second job
>>>>>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>>>>>> IndexWriter : ### second job
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>>>>>> ResourceManager address, beginning registration
>>>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>  - Registering job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>>>> 00000000000000000000000000000000.
>>>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>>>> Job 00000000000000000000000000000000 was recovered successfully.
>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>  - Registered job manager b03cde9dc2aebdb39c46cda4c2a94c07
>>>>>>>>> @akka.tcp://flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>>>> 00000000000000000000000000000000.
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>>>>>> successfully registered at ResourceManager, leader id:
>>>>>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] 
>>>>>>>>> and
>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] 
>>>>>>>>> and
>>>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager
>>>>>>>>>  - Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>>>>>>
>>>>>>>>

Reply via email to