Hi,

I am using application mode.

Thanks,
Qihua

On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote:

> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I want is flink app has multiple jobs,
>> each job manage a stream. Currently our flink app has only 1 job that
>> manage multiple streams.
>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>> when the second executeAsync() was called, it shows " *Job
>> 00000000000000000000000000000000 was recovered successfully.*"
>> Looks like the second executeAsync() recover the first job. Not start a
>> second job.
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi,
>>>
>>> env.execute("Job 1"); is a blocking call. You either have to use
>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>> finishes then this would also work by having sequential execution.
>>>
>>> However, I think what you actually want to do is to use the same env
>>> with 2 topologies and 1 single execute like this.
>>>
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
>>> stream2.addSink(new DiscardingSink<>());
>>> env.execute("Job 1+2");
>>>
>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote:
>>>
>>>> Hi,
>>>> Does anyone know how to run multiple jobs in same flink application?
>>>> I did a simple test.  First job was started. I did see the log message,
>>>> but I didn't see the second job was started, even I saw the log message.
>>>>
>>>> public void testJobs() throws Exception {
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream<String> stream1 = env.addSource(new
>>>> SourceFunction<String>());
>>>> stream1.addSink(new FlinkKafkaProducer());
>>>> printf("### first job");
>>>> env.execute("Job 1");
>>>>
>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream<String> stream2 = env.addSource(new
>>>> SourceFunction<String>());
>>>> stream2.addSink(new DiscardingSink<>());
>>>> printf("### second job");
>>>>     env.execute("Job 2");
>>>> }
>>>>
>>>> Here is the log:
>>>> ### first job
>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>> 00000000000000000000000000000000 is submitted.
>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>
>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>> execution of job job1 (00000000000000000000000000000000) under job master
>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>> scheduling with scheduling strategy
>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>> job1 (00000000000000000000000000000000) switched from state CREATED to
>>>> RUNNING.
>>>>
>>>> ### second job
>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter
>>>> : ### second job
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>> ResourceManager address, beginning registration
>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>> Starting ZooKeeperLeaderRetrievalService
>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>> 00000000000000000000000000000000.
>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>> 00000000000000000000000000000000 was recovered successfully.
>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>> 00000000000000000000000000000000.
>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>> successfully registered at ResourceManager, leader id:
>>>> 956d4431ca90d45d92c027046cd0404e.
>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>> 00000000000000000000000000000000 with allocation id
>>>> 21134414fc60d4ef3e940609cef960b6.
>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>> 00000000000000000000000000000000 with allocation id
>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>
>>>

Reply via email to