Hi Caizhi,
This is our program printing out the status code, but it doesn't really
matter. The point is that I have no ability to run a StatementSet through
the WebSubmission Rest API without blocking ATM.

On Wed, Dec 22, 2021 at 1:39 PM Caizhi Weng <tsreape...@gmail.com> wrote:

> Hi!
>
> This "Finished successfully with value: 0" seems quite suspicious. If you
> search through the code base no log is printing such information. Could you
> please check which component is printing this log and determine which
> process this exit code belongs to?
>
> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 15:48写道:
>
>> The job construction itself is a bit complex, but it can either be a
>> StatementSet that's being filled, or there is some kind of conversion Table
>> -> DataStream and then we put the transformations on the DataStream itself.
>> Invocation looks like this:
>>
>>       executionEffect =
>>         if (...)
>>           FlinkTask.lockedEffect(flink.execute(jobName))
>>         else FlinkTask.lockedEffect(statementSet.execute())
>>
>> If I don't infinitely block on this, it terminates right after starting
>> the execution:
>>
>> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor class ... does not
>> contain a setter for field partitionKey
>> 2021-12-22 09:25:25,522 INFO o.a.f.a.j.t.TypeExtractor Class class ...
>> cannot be used as a POJO type because not all fields are valid POJO fields,
>> and must be processed as GenericType. Please read the Flink documentation
>> on "Data Types & Serialization" for details of the effect on performance.
>> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor class ... does not
>> contain a setter for field stage
>> 2021-12-22 09:25:25,533 INFO o.a.f.a.j.t.TypeExtractor Class class ...
>> cannot be used as a POJO type because not all fields are valid POJO fields,
>> and must be processed as GenericType. Please read the Flink documentation
>> on "Data Types & Serialization" for details of the effect on performance.
>> 2021-12-22 09:25:27,678 WARN o.a.f.c.Configuration Config uses deprecated
>> configuration key 'akka.client.timeout' instead of proper key
>> 'client.timeout'
>> 2021-12-22 09:25:27,841 INFO o.a.f.c.d.a.e.EmbeddedExecutor Job
>> 492c9f07d8b3458a52595ab49f636205 is submitted.
>> 2021-12-22 09:25:27,842 INFO o.a.f.c.d.a.e.EmbeddedExecutor Submitting
>> Job with JobId=492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Received JobGraph
>> submission '....' (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,491 INFO o.a.f.r.d.Dispatcher Submitting job '....'
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,519 INFO o.a.f.r.r.a.AkkaRpcService Starting RPC
>> endpoint for org.apache.flink.runtime.jobmaster.JobMaster at
>> akka://flink/user/rpc/jobmanager_2 .
>> 2021-12-22 09:25:28,528 INFO o.a.f.r.j.JobMaster Initializing job '...'
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,554 INFO o.a.f.r.s.DefaultSchedulerFactory Using
>> restart back off time strategy
>> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=1,
>> backoffTimeMS=10000) for ... (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,599 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
>> Running initialization on master for job ...
>> (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,600 INFO o.a.f.r.e.DefaultExecutionGraphBuilder
>> Successfully ran initialization on master in 0 ms.
>> 2021-12-22 09:25:28,621 INFO o.a.f.r.s.a.DefaultExecutionTopology Built 1
>> pipelined regions in 0 ms
>> 2021-12-22 09:25:28,679 INFO o.a.f.r.s.StateBackendLoader No state
>> backend has been configured, using default (HashMap)
>> org.apache.flink.runtime.state.hashmap.HashMapStateBackend@4c81fc2e
>> 2021-12-22 09:25:28,680 INFO o.a.f.r.s.StateBackendLoader State backend
>> loader loads the state backend as HashMapStateBackend
>> 2021-12-22 09:25:28,681 INFO o.a.f.r.s.CheckpointStorageLoader Checkpoint
>> storage is set to 'jobmanager'
>> 2021-12-22 09:25:28,701 INFO o.a.f.r.c.CheckpointCoordinator No
>> checkpoint found during restore.
>> 2021-12-22 09:25:28,702 INFO o.a.f.r.c.CheckpointCoordinator Starting job
>> 492c9f07d8b3458a52595ab49f636205 from savepoint .... (allowing non restored
>> state)
>> 2021-12-22 09:25:28,727 INFO o.a.f.r.c.CheckpointCoordinator Reset the
>> checkpoint ID of job 492c9f07d8b3458a52595ab49f636205 to 8400.
>> 2021-12-22 09:25:28,728 INFO o.a.f.r.c.CheckpointCoordinator Restoring
>> job 492c9f07d8b3458a52595ab49f636205 from Savepoint 8399 @ 0 for
>> 492c9f07d8b3458a52595ab49f636205 located at file:..
>> 2021-12-22 09:25:28,745 INFO o.a.f.r.c.h.MasterHooks No master state to
>> restore
>> 2021-12-22 09:25:28,750 INFO o.a.f.r.s.DefaultScheduler Using failover
>> strategy
>> org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@3114dd24
>> for ... (492c9f07d8b3458a52595ab49f636205).
>> 2021-12-22 09:25:28,764 INFO o.a.f.r.j.JobMaster Starting execution of
>> job '...' (492c9f07d8b3458a52595ab49f636205) under job master id
>> 00000000000000000000000000000000.
>> 2021-12-22 09:25:28,765 INFO o.a.f.r.s.DefaultScheduler Starting
>> scheduling with scheduling strategy
>> [org.apache.flink.runtime.scheduler.strategy.PipelinedRegionSchedulingStrategy]
>> 2021-12-22 09:25:28,766 INFO o.a.f.r.e.DefaultExecutionGraph Job ...
>> (492c9f07d8b3458a52595ab49f636205) switched from state CREATED to RUNNING.
>> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution Source: ... (1/1)
>> (3c3260f3f0c7d82452a46fc383ceb932) switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,772 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
>> from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
>> from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ... -> Map ->
>> (Calc(select=[...]) (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched from
>> CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (2/3) (8d046c60a84900cba31877ec28f81124)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,773 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (3/3) (39a1afd89f627816f018fa9652865887)
>> switched from CREATED to SCHEDULED.
>> 2021-12-22 09:25:28,790 INFO o.a.f.r.j.JobMaster Connecting to
>> ResourceManager akka.tcp://flink@localhost
>> :6123/user/rpc/resourcemanager_*(00000000000000000000000000000000)
>> 2021-12-22 09:25:28,794 INFO o.a.f.r.r.RetryingRegistration Resolved
>> ResourceManager address, beginning registration
>> 2021-12-22 09:25:28,796 INFO o.a.f.r.r.ResourceManager Registering job
>> manager 
>> 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
>> for job 492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,800 INFO o.a.f.r.r.ResourceManager Registered job
>> manager 
>> 00000000000000000000000000000...@akka.tcp://flink@localhost:6123/user/rpc/jobmanager_2
>> for job 492c9f07d8b3458a52595ab49f636205.
>> 2021-12-22 09:25:28,801 INFO o.a.f.r.j.JobMaster JobManager successfully
>> registered at ResourceManager, leader id: 00000000000000000000000000000000.
>> 2021-12-22 09:25:28,803 INFO o.a.f.r.r.s.DeclarativeSlotManager Received
>> resource requirements from job 492c9f07d8b3458a52595ab49f636205:
>> [ResourceRequirement{resourceProfile=ResourceProfile{UNKNOWN},
>> numberOfRequiredSlots=3}]
>> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Source: ... (1/1)
>> (3c3260f3f0c7d82452a46fc383ceb932) switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,872 INFO o.a.f.r.e.Execution Deploying Source: ...
>> (1/1) (attempt #0) with attempt id 3c3260f3f0c7d82452a46fc383ceb932 to
>> ...:64216-650fc2 @ ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (3c05f0bd5ca1bd4903398bb39b5992fa) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,880 INFO o.a.f.r.e.Execution Deploying c... -> Map ->
>> Calc(select=[...]) -> Map (1/3) (attempt #0) with attempt id
>> 3c05f0bd5ca1bd4903398bb39b5992fa to ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (644722a664ac6a9797b8638a225dbbf9) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,886 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
>> Calc(select=[...]) -> Map (2/3) (attempt #0) with attempt id
>> 644722a664ac6a9797b8638a225dbbf9 to ... (dataPort=64218) with allocation id
>> fe0a5941283557538901c8a9774a2584
>> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution ... -> Map ->
>> Calc(select=[...]) -> Map (3/3) (00cf0b3a6d6c1cd1f0cf48ad8b393921) switched
>> from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,887 INFO o.a.f.r.e.Execution Deploying ... -> Map ->
>> Calc(select=[...]) -> Map (3/3) (attempt #0) with attempt id
>> 00cf0b3a6d...d8b393921 to ... (dataPort=64218) with allocation id
>> 026dabf16a12ddf35399938466a27572
>> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (1/3) (bcaeb5103effbbddc2b4fc7ad801abbf)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,888 INFO o.a.f.r.e.Execution Deploying ...-batch ->
>> Sink: SnowflakeSinkProvider(...) (1/3) (attempt #0) with attempt id
>> bcaeb5103effbbddc2b4fc7ad801abbf to ... (dataPort=64218) with allocation id
>> 2e63675e30c595a8538f7a006fe0678d
>> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (2/3)  (8d046c60a84900cba31877ec28f81124)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,889 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
>> -> Sink: SnowflakeSinkProvider(...) (2/3)  (attempt #0) with attempt id
>> 8d046c60a84900cba31877ec28f81124 to ... (dataPort=64218) with allocation id
>> fe0a5941283557538901c8a9774a2584
>> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution ...-sink-batch -> Sink:
>> SnowflakeSinkProvider(...) (3/3)  (39a1afd89f627816f018fa9652865887)
>> switched from SCHEDULED to DEPLOYING.
>> 2021-12-22 09:25:28,890 INFO o.a.f.r.e.Execution Deploying ...-sink-batch
>> -> Sink: SnowflakeSinkProvider(...) (3/3)  (attempt #0) with attempt id
>> 39a1afd89f627816f018fa9652865887 to ... (dataPort=64218) with allocation id
>> 026dabf16a12ddf35399938466a27572
>> 2021-12-22 09:25:28,917 INFO Finished successfully with value: 0
>> 2021-12-22 09:25:28,922 INFO o.a.f.r.e.ClusterEntrypoint Shutting
>> StandaloneSessionClusterEntrypoint down with application status UNKNOWN.
>> Diagnostics Cluster entrypoint has been closed externally..
>> 2021-12-22 09:25:28,923 INFO o.a.f.r.r.RestServerEndpoint Shutting down
>> rest endpoint.
>> 2021-12-22 09:25:28,943 INFO o.a.f.r.b.BlobServer Stopped BLOB server at
>> 0.0.0.0:64213
>>
>> Process finished with exit code 239
>>
>> On Wed, Dec 22, 2021 at 8:47 AM Yuval Itzchakov <yuva...@gmail.com>
>> wrote:
>>
>>> I mean it finishes successful and exists with status code 0. Both when
>>> running locally and submitting to the cluster.
>>>
>>> On Wed, Dec 22, 2021, 08:36 Caizhi Weng <tsreape...@gmail.com> wrote:
>>>
>>>> Hi!
>>>>
>>>> By "the streaming job stops" do you mean the job ends with CANCELED
>>>> state instead of FINISHED state? Which kind of job are you running? Is it a
>>>> select job or an insert job? Insert jobs should run continuously once
>>>> they're submitted. Could you share your user code if possible?
>>>>
>>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 14:11写道:
>>>>
>>>>> Hi Caizhi,
>>>>>
>>>>> If I don't block on statementset.execute, the job finishes immediately
>>>>> with exit code 0 and the streaming job stops, and that's not what I want. 
>>>>> I
>>>>> somehow need to block.
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 22, 2021, 03:43 Caizhi Weng <tsreape...@gmail.com> wrote:
>>>>>
>>>>>> Hi!
>>>>>>
>>>>>> You can poll the status of that job with REST API [1]. You can tell
>>>>>> that the job successfully finishes by the FINISHED state and that the job
>>>>>> fails by the FAILED state.
>>>>>>
>>>>>> [1]
>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jobs-jobid
>>>>>>
>>>>>> Yuval Itzchakov <yuva...@gmail.com> 于2021年12月22日周三 02:36写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Flink 1.14.2
>>>>>>> Scala 2.12
>>>>>>>
>>>>>>> I have a streaming job that executes and I want to infinitely wait
>>>>>>> for it's completion, or if an exception is thrown during initialization.
>>>>>>> When using *statementSet.execute().await()*, I get an error:
>>>>>>>
>>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException:* The Job
>>>>>>> Result cannot be fetched through the Job Client when in Web Submission.*
>>>>>>> at
>>>>>>> org.apache.flink.client.deployment.application.WebSubmissionJobClient.getJobExecutionResult(WebSubmissionJobClient.java:88)
>>>>>>> at
>>>>>>> org.apache.flink.table.api.internal.InsertResultIterator.hasNext(InsertResultIterator.java:54)
>>>>>>> ... 7 more
>>>>>>>
>>>>>>> This is because the Web Submission via the REST API is using
>>>>>>> the WebSubmissionJobClient.
>>>>>>>
>>>>>>> How can I wait on my Flink SQL streaming job when submitting through
>>>>>>> the REST API?
>>>>>>> --
>>>>>>> Best Regards,
>>>>>>> Yuval Itzchakov
>>>>>>>
>>>>>>
>>
>> --
>> Best Regards,
>> Yuval Itzchakov.
>>
>

-- 
Best Regards,
Yuval Itzchakov.

Reply via email to