Hi, I am using application mode.
Thanks, Qihua On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote: > Hi Qihua, > > Which execution mode are you using? > > On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote: > >> Hi, >> >> Thank you for your reply. What I want is flink app has multiple jobs, >> each job manage a stream. Currently our flink app has only 1 job that >> manage multiple streams. >> I did try env.executeAsyc(), but it still doesn't work. From the log, >> when the second executeAsync() was called, it shows " *Job >> 00000000000000000000000000000000 was recovered successfully.*" >> Looks like the second executeAsync() recover the first job. Not start a >> second job. >> >> Thanks, >> Qihua >> >> >> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote: >> >>> Hi, >>> >>> env.execute("Job 1"); is a blocking call. You either have to use >>> executeAsync or use a separate thread to submit the second job. If Job 1 >>> finishes then this would also work by having sequential execution. >>> >>> However, I think what you actually want to do is to use the same env >>> with 2 topologies and 1 single execute like this. >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream<String> stream1 = env.addSource(new SourceFunction<String>()); >>> stream1.addSink(new FlinkKafkaProducer()); >>> DataStream<String> stream2 = env.addSource(new SourceFunction<String>()); >>> stream2.addSink(new DiscardingSink<>()); >>> env.execute("Job 1+2"); >>> >>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote: >>> >>>> Hi, >>>> Does anyone know how to run multiple jobs in same flink application? >>>> I did a simple test. First job was started. I did see the log message, >>>> but I didn't see the second job was started, even I saw the log message. >>>> >>>> public void testJobs() throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream<String> stream1 = env.addSource(new >>>> SourceFunction<String>()); >>>> stream1.addSink(new FlinkKafkaProducer()); >>>> printf("### first job"); >>>> env.execute("Job 1"); >>>> >>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream<String> stream2 = env.addSource(new >>>> SourceFunction<String>()); >>>> stream2.addSink(new DiscardingSink<>()); >>>> printf("### second job"); >>>> env.execute("Job 2"); >>>> } >>>> >>>> Here is the log: >>>> ### first job >>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>> 00000000000000000000000000000000 is submitted. >>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>> Submitting Job with JobId=00000000000000000000000000000000. >>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>> Received JobGraph submission 00000000000000000000000000000000 (job1). >>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>> Submitting job 00000000000000000000000000000000 (job1). >>>> >>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>> execution of job job1 (00000000000000000000000000000000) under job master >>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>> scheduling with scheduling strategy >>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> job1 (00000000000000000000000000000000) switched from state CREATED to >>>> RUNNING. >>>> >>>> ### second job >>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter >>>> : ### second job >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>> ResourceManager address, beginning registration >>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> Starting ZooKeeperLeaderRetrievalService >>>> /leader/00000000000000000000000000000000/job_manager_lock. >>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>> 00000000000000000000000000000000. >>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>> 00000000000000000000000000000000 was recovered successfully. >>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >>>> 00000000000000000000000000000000. >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >>>> successfully registered at ResourceManager, leader id: >>>> 956d4431ca90d45d92c027046cd0404e. >>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and >>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Request slot with profile ResourceProfile{UNKNOWN} for job >>>> 00000000000000000000000000000000 with allocation id >>>> 21134414fc60d4ef3e940609cef960b6. >>>> INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - >>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and >>>> profile ResourceProfile{UNKNOWN} from resource manager. >>>> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >>>> Request slot with profile ResourceProfile{UNKNOWN} for job >>>> 00000000000000000000000000000000 with allocation id >>>> 650bd9100a35ef5086fd4614f5253b55. >>>> >>>