Hi Caizhi, This is our program printing out the status code, but it doesn't really matter. The point is that I have no ability to run a StatementSet through the WebSubmission Rest API without blocking ATM.
On Wed, Dec 22, 2021 at 1:39 PM Caizhi Weng <tsreape...@gmail.com> wrote: > Hi! > > This "Finished successfully with value: 0" seems quite suspicious. If you > search through the code base no log is printing such information. Could you > please check which component is printing this log and determine which > process this exit code belongs to? > > Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 15:48写道: > >> The job construction itself is a bit complex, but it can either be a >> StatementSet that's being filled, or there is some kind of conversion Table >> -> DataStream and then we put the transformations on the DataStream itself. >> Invocation looks like this: >> >> executionEffect = >> if (...) >> FlinkTask.lockedEffect(flink.execute(jobName)) >> else FlinkTask.lockedEffect(statementSet.execute()) >> >> If I don't infinitely block on this, it terminates right after starting >> the execution: >> >> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not >> contain a setter for field partitionKey >> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ... >> cannot be used as a POJO type because not all fields are valid POJO fields, >> and must be processed as GenericType. Please read the Flink documentation >> on "Data Types & Serialization" for details of the effect on performance. >> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not >> contain a setter for field stage >> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ... >> cannot be used as a POJO type because not all fields are valid POJO fields, >> and must be processed as GenericType. Please read the Flink documentation >> on "Data Types & Serialization" for details of the effect on performance. >> 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated >> configuration key 'akka.client.timeout' instead of proper key >> 'client.timeout' >> 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job >> 492c9f07d8b3458a52595ab49f636205 is submitted. >> 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting >> Job with JobId=492c9f07d8b3458a52595ab49f636205. >> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph >> submission '....' (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....' >> (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC >> endpoint for org.apache.flink.runtime.jobmaster.JobMaster at >> akka://flink/user/rpc/jobmanager_2 . >> 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...' >> (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using >> restart back off time strategy >> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1, >> backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder >> Running initialization on master for job ... >> (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder >> Successfully ran initialization on master in 0 ms. >> 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1 >> pipelined regions in 0 ms >> 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state >> backend has been configured, using default (HashMap) >> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e >> 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend >> loader loads the state backend as HashMapStateBackend >> 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint >> storage is set to 'jobmanager' >> 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No >> checkpoint found during restore. >> 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job >> 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored >> state) >> 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the >> checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400. >> 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring >> job 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for >> 492c9f07d8b3458a52595ab49f636205 located at file:.. >> 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to >> restore >> 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover >> strategy >> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24 >> for ... (492c9f07d8b3458a52595ab49f636205). >> 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of >> job '...' (492c9f07d8b3458a52595ab49f636205) under job master id >> 00000000000000000000000000000000. >> 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting >> scheduling with scheduling strategy >> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy] >> 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ... >> (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING. >> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1) >> (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map -> >> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched >> from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> >> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched >> from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map -> >> (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from >> CREATED to SCHEDULED. >> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) >> switched from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) >> switched from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) >> switched from CREATED to SCHEDULED. >> 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to >> ResourceManager akka.tcp://flink@localhost >> :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000) >> 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved >> ResourceManager address, beginning registration >> 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job >> manager >> 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 >> for job 492c9f07d8b3458a52595ab49f636205. >> 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job >> manager >> 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2 >> for job 492c9f07d8b3458a52595ab49f636205. >> 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully >> registered at ResourceManager, leader id: 00000000000000000000000000000000. >> 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received >> resource requirements from job 492c9f07d8b3458a52595ab49f636205: >> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN}, >> numberOfRequiredSlots=3}] >> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1) >> (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ... >> (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to >> ...:64216-650fc2 @ ... (dataPort=64218) with allocation id >> 2e63675e30c595a8538f7a006fe0678d >> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map -> >> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched >> from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map -> >> Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id >> 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id >> 2e63675e30c595a8538f7a006fe0678d >> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map -> >> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched >> from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map -> >> Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id >> 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id >> fe0a5941283557538901c8a9774a2584 >> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map -> >> Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched >> from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map -> >> Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id >> 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id >> 026dabf16a12ddf35399938466a27572 >> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf) >> switched from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch -> >> Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id >> bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id >> 2e63675e30c595a8538f7a006fe0678d >> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124) >> switched from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch >> -> Sink: SnowflakeSinkProvider(...) (2/3) (attempt #0) with attempt id >> 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id >> fe0a5941283557538901c8a9774a2584 >> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink: >> SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887) >> switched from SCHEDULED to DEPLOYING. >> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch >> -> Sink: SnowflakeSinkProvider(...) (3/3) (attempt #0) with attempt id >> 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id >> 026dabf16a12ddf35399938466a27572 >> 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0 >> 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting >> StandaloneSessionClusterEntrypoint down with application status UNKNOWN. >> Diagnostics Cluster entrypoint has been closed externally.. >> 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down >> rest endpoint. >> 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at >> 0.0.0.0:64213 >> >> Process finished with exit code 239 >> >> On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yuva...@gmail.com> >> wrote: >> >>> I mean it finishes successful and exists with status code 0. Both when >>> running locally and submitting to the cluster. >>> >>> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <tsreape...@gmail.com> wrote: >>> >>>> Hi! >>>> >>>> By "the streaming job stops" do you mean the job ends with CANCELED >>>> state instead of FINISHED state? Which kind of job are you running? Is it a >>>> select job or an insert job? Insert jobs should run continuously once >>>> they're submitted. Could you share your user code if possible? >>>> >>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 14:11写道: >>>> >>>>> Hi Caizhi, >>>>> >>>>> If I don't block on statementset.execute, the job finishes immediately >>>>> with exit code 0 and the streaming job stops, and that's not what I want. >>>>> I >>>>> somehow need to block. >>>>> >>>>> >>>>> >>>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <tsreape...@gmail.com> wrote: >>>>> >>>>>> Hi! >>>>>> >>>>>> You can poll the status of that job with REST API [1]. You can tell >>>>>> that the job successfully finishes by the FINISHED state and that the job >>>>>> fails by the FAILED state. >>>>>> >>>>>> [1] >>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid >>>>>> >>>>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 02:36写道: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> Flink 1.14.2 >>>>>>> Scala 2.12 >>>>>>> >>>>>>> I have a streaming job that executes and I want to infinitely wait >>>>>>> for it's completion, or if an exception is thrown during initialization. >>>>>>> When using *statementSet.execute().await()*, I get an error: >>>>>>> >>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job >>>>>>> Result cannot be fetched through the Job Client when in Web Submission.* >>>>>>> at >>>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88) >>>>>>> at >>>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54) >>>>>>> ... 7 more >>>>>>> >>>>>>> This is because the Web Submission via the REST API is using >>>>>>> the WebSubmissionJobClient. >>>>>>> >>>>>>> How can I wait on my Flink SQL streaming job when submitting through >>>>>>> the REST API? >>>>>>> -- >>>>>>> Best Regards, >>>>>>> Yuval Itzchakov >>>>>>> >>>>>> >> >> -- >> Best Regards, >> Yuval Itzchakov. >> > -- Best Regards, Yuval Itzchakov.