Hi,

env.execute("Job 1"); is a blocking call. You either have to use
executeAsync or use a separate thread to submit the second job. If Job 1
finishes then this would also work by having sequential execution.

However, I think what you actually want to do is to use the same env with 2
topologies and 1 single execute like this.

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
stream1.addSink(new FlinkKafkaProducer());
DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
stream2.addSink(new DiscardingSink<>());
env.execute("Job 1+2");

On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote:

> Hi,
> Does anyone know how to run multiple jobs in same flink application?
> I did a simple test.  First job was started. I did see the log message,
> but I didn't see the second job was started, even I saw the log message.
>
> public void testJobs() throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
> stream1.addSink(new FlinkKafkaProducer());
> printf("### first job");
> env.execute("Job 1");
>
> env = StreamExecutionEnvironment.getExecutionEnvironment();
> DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
> stream2.addSink(new DiscardingSink<>());
> printf("### second job");
>     env.execute("Job 2");
> }
>
> Here is the log:
> ### first job
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
> 00000000000000000000000000000000 is submitted.
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
> Submitting Job with JobId=00000000000000000000000000000000.
> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Received
> JobGraph submission 00000000000000000000000000000000 (job1).
> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> Submitting job 00000000000000000000000000000000 (job1).
>
> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
> of job job1 (00000000000000000000000000000000) under job master id
> b03cde9dc2aebdb39c46cda4c2a94c07.
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting scheduling
> with scheduling strategy
> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
> (00000000000000000000000000000000) switched from state CREATED to RUNNING.
>
> ### second job
> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter :
> ### second job
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
> ResourceManager address, beginning registration
> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  - Starting
> ZooKeeperLeaderRetrievalService
> /leader/00000000000000000000000000000000/job_manager_lock.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
> 00000000000000000000000000000000.
> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
> 00000000000000000000000000000000 was recovered successfully.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
> 00000000000000000000000000000000.
> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
> successfully registered at ResourceManager, leader id:
> 956d4431ca90d45d92c027046cd0404e.
> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Request slot with profile ResourceProfile{UNKNOWN} for job
> 00000000000000000000000000000000 with allocation id
> 21134414fc60d4ef3e940609cef960b6.
> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
> profile ResourceProfile{UNKNOWN} from resource manager.
> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
> Request slot with profile ResourceProfile{UNKNOWN} for job
> 00000000000000000000000000000000 with allocation id
> 650bd9100a35ef5086fd4614f5253b55.
>

Reply via email to