Hi! This "Finished successfully with value: 0" seems quite suspicious. If you search through the code base no log is printing such information. Could you please check which component is printing this log and determine which process this exit code belongs to?
Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 15:48写道: > The job construction itself is a bit complex, but it can either be a > StatementSet that's being filled, or there is some kind of conversion Table > -> DataStream and then we put the transformations on the DataStream itself. > Invocation looks like this: > > executionEffect = > if (...) > FlinkTask.lockedEffect(flink.execute(jobName)) > else FlinkTask.lockedEffect(statementSet.execute()) > > If I don't infinitely block on this, it terminates right after starting > the execution: > > 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not > contain a setter for field partitionKey > 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ... > cannot be used as a POJO type because not all fields are valid POJO fields, > and must be processed as GenericType. Please read the Flink documentation > on "Data Types & Serialization" for details of the effect on performance. > 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not > contain a setter for field stage > 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ... > cannot be used as a POJO type because not all fields are valid POJO fields, > and must be processed as GenericType. Please read the Flink documentation > on "Data Types & Serialization" for details of the effect on performance. > 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated > configuration key 'akka.client.timeout' instead of proper key > 'client.timeout' > 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job > 492c9f07d8b3458a52595ab49f636205 is submitted. > 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting Job > with JobId=492c9f07d8b3458a52595ab49f636205. > 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph > submission '....' (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....' > (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC > endpoint for org.apache.flink.runtime.jobmaster.JobMaster at > akka://flink/user/rpc/jobmanager_2 . > 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...' > (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using > restart back off time strategy > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, > backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder > Running initialization on master for job ... > (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder > Successfully ran initialization on master in 0 ms. > 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1 > pipelined regions in 0 ms > 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state backend > has been configured, using default (HashMap) > org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e > 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend > loader loads the state backend as HashMapStateBackend > 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint > storage is set to 'jobmanager' > 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No checkpoint > found during restore. > 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job > 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored > state) > 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the > checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400. > 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring job > 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for > 492c9f07d8b3458a52595ab49f636205 located at file:.. > 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to > restore > 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover > strategy > org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24 > for ... (492c9f07d8b3458a52595ab49f636205). > 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of job > '...' (492c9f07d8b3458a52595ab49f636205) under job master id > 00000000000000000000000000000000. > 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting > scheduling with scheduling strategy > [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] > 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ... > (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING. > 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1) > (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED. > 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map -> > Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched > from CREATED to SCHEDULED. > 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> > Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched > from CREATED to SCHEDULED. > 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> > (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from > CREATED to SCHEDULED. > 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) > switched from CREATED to SCHEDULED. > 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) > switched from CREATED to SCHEDULED. > 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) > switched from CREATED to SCHEDULED. > 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to > ResourceManager akka.tcp://flink@localhost > :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) > 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved > ResourceManager address, beginning registration > 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job > manager > 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 > for job 492c9f07d8b3458a52595ab49f636205. > 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job > manager > 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 > for job 492c9f07d8b3458a52595ab49f636205. > 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully > registered at ResourceManager, leader id: 00000000000000000000000000000000. > 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received > resource requirements from job 492c9f07d8b3458a52595ab49f636205: > [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, > numberOfRequiredSlots=3}] > 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1) > (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ... > (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to > ...:64216-650fc2 @ ... (dataPort=64218) with allocation id > 2e63675e30c595a8538f7a006fe0678d > 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map -> > Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched > from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map -> > Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id > 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id > 2e63675e30c595a8538f7a006fe0678d > 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map -> > Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched > from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map -> > Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id > 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id > fe0a5941283557538901c8a9774a2584 > 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map -> > Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched > from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map -> > Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id > 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id > 026dabf16a12ddf35399938466a27572 > 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) > switched from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch -> > Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id > bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id > 2e63675e30c595a8538f7a006fe0678d > 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) > switched from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch > -> Sink: SnowflakeSinkProvider(...) (2/3) (attempt #0) with attempt id > 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id > fe0a5941283557538901c8a9774a2584 > 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: > SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) > switched from SCHEDULED to DEPLOYING. > 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch > -> Sink: SnowflakeSinkProvider(...) (3/3) (attempt #0) with attempt id > 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id > 026dabf16a12ddf35399938466a27572 > 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0 > 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting > StandaloneSessionClusterEntrypoint down with application status UNKNOWN. > Diagnostics Cluster entrypoint has been closed externally.. > 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down > rest endpoint. > 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at > 0.0.0.0:64213 > > Process finished with exit code 239 > > On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yuva...@gmail.com> wrote: > >> I mean it finishes successful and exists with status code 0. Both when >> running locally and submitting to the cluster. >> >> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <tsreape...@gmail.com> wrote: >> >>> Hi! >>> >>> By "the streaming job stops" do you mean the job ends with CANCELED >>> state instead of FINISHED state? Which kind of job are you running? Is it a >>> select job or an insert job? Insert jobs should run continuously once >>> they're submitted. Could you share your user code if possible? >>> >>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 14:11写道: >>> >>>> Hi Caizhi, >>>> >>>> If I don't block on statementset.execute, the job finishes immediately >>>> with exit code 0 and the streaming job stops, and that's not what I want. I >>>> somehow need to block. >>>> >>>> >>>> >>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <tsreape...@gmail.com> wrote: >>>> >>>>> Hi! >>>>> >>>>> You can poll the status of that job with REST API [1]. You can tell >>>>> that the job successfully finishes by the FINISHED state and that the job >>>>> fails by the FAILED state. >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid >>>>> >>>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 02:36写道: >>>>> >>>>>> Hi, >>>>>> >>>>>> Flink 1.14.2 >>>>>> Scala 2.12 >>>>>> >>>>>> I have a streaming job that executes and I want to infinitely wait >>>>>> for it's completion, or if an exception is thrown during initialization. >>>>>> When using *statementSet.execute().await()*, I get an error: >>>>>> >>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job >>>>>> Result cannot be fetched through the Job Client when in Web Submission.* >>>>>> at >>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88) >>>>>> at >>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) >>>>>> ... 7 more >>>>>> >>>>>> This is because the Web Submission via the REST API is using >>>>>> the WebSubmissionJobClient. >>>>>> >>>>>> How can I wait on my Flink SQL streaming job when submitting through >>>>>> the REST API? >>>>>> -- >>>>>> Best Regards, >>>>>> Yuval Itzchakov >>>>>> >>>>> > > -- > Best Regards, > Yuval Itzchakov. >