Hi Zili,

Thanks for pointing that out.
I didn't realize that it's a REST API based case. Debasish's case has been
discussed not only in this thread...

It's really hard to analyze the case without the full picture.

I think the reason of why `ProgramAbortException` is not caught is that he
did something outside `env.execute`. Like executing this piece of codes
inside a Scala future.

I guess the scenario is that he is submitting job through REST API. But in
the main method, he wraps `env.execute` with Scala future, not executing it
directly.
The reason of env has been set to `StreamPlanEnvironment` is
`JarHandlerUtils` retrieves job graph through it.
And the `ProgramAbortException` is not thrown out, because the Scala future
tackles this exception.
So retrieving job graph fails due to an unrecognized exception (Boxed
Error).

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote:

> Hi Biao,
>
> The log below already infers that the job was submitted via REST API and I
> don't think it matters.
>
> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
> getJobGraphAsync$6(JarRunHandler.java:142)
>
> What I don't understand it that flink DOES catch the exception at the
> point it is reported thrown...
>
> Best,
> tison.
>
>
> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道:
>
>>
>> > We submit the code through Kubernetes Flink Operator which uses the
>> REST API to submit the job to the Job Manager
>>
>> So you are submitting job through REST API, not Flink client? Could you
>> explain more about this?
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Hi Dian -
>>>
>>> We submit one job through the operator. We just use the following to
>>> complete a promise when the job completes ..
>>>
>>>       Try {
>>>         createLogic.executeStreamingQueries(ctx.env)
>>>       }.fold(
>>>         th ⇒ completionPromise.tryFailure(th),
>>>         _ ⇒ completionPromise.trySuccess(Dun)
>>>       )
>>>
>>> If we totally do away with the promise and future stuff then we don't
>>> get the boxed error - only the exception reported in Caused By.
>>>
>>> regards.
>>>
>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com> wrote:
>>>
>>>> Hi Debasish,
>>>>
>>>> In which case will the exception occur? Does it occur when you submit
>>>> one job at a time or when multiple jobs are submitted at the same time? I'm
>>>> asking this because I noticed that you used Future to execute the job
>>>> unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>
>>>> Regards,
>>>> Dian
>>>>
>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>
>>>> Hi tison -
>>>>
>>>> Please find my response below in >>.
>>>>
>>>> regards.
>>>>
>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com> wrote:
>>>>
>>>>> Hi Debasish,
>>>>>
>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be caught
>>>>> at OptimizerPlanEnvironment#getOptimizedPlan
>>>>> in its catch (Throwable t) branch.
>>>>>
>>>>
>>>> >> true but what I get is a StreamPlanEnvironment. From my code I am
>>>> only doing val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>> .
>>>>
>>>>>
>>>>> It should always throw a ProgramInvocationException instead of
>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>> exception thrown in the main method of your code.
>>>>>
>>>>> Another important problem is how the code is executed, (set context
>>>>> environment should be another flink internal operation)
>>>>> but given that you submit the job via flink k8s operator it might
>>>>> require time to take a look at k8s operator implementation.
>>>>>
>>>>
>>>> >> We submit the code through Kubernetes Flink Operator which uses the
>>>> REST API to submit the job to the Job Manager
>>>>
>>>>>
>>>>> However, given we catch Throwable in the place this exception thrown,
>>>>> I highly suspect whether it is executed by an official
>>>>> flink release.
>>>>>
>>>>
>>>> >> It is an official Flink release 1.9.0
>>>>
>>>>>
>>>>> A completed version of the code and the submission process is helpful.
>>>>> Besides, what is buildExecutionGraph return type,
>>>>> I think it is not ExecutionGraph in flink...
>>>>>
>>>>
>>>> >> buildExecutionGraph is our function which returns a Unit. It's not
>>>> ExecutionGraph. It builds the DataStream s by reading from Kafka and then
>>>> finally writes to Kafka.
>>>>
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道:
>>>>>
>>>>>> This is the complete stack trace which we get from execution on
>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error comes
>>>>>> from the fact that we complete a Promise with Success when it returns a
>>>>>> JobExecutionResult and with Failure when we get an exception. And here 
>>>>>> we r
>>>>>> getting an exception. So the real stack trace we have is the one below in
>>>>>> Caused By.
>>>>>>
>>>>>> java.util.concurrent.ExecutionException: Boxed Error
>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
>>>>>> at
>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
>>>>>> at scala.util.Failure.fold(Try.scala:240)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> at
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> at
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>> at
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>> at
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>> at
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>> at
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> at
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>> Caused by:
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>> at
>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>> at
>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
>>>>>> ... 20 more
>>>>>>
>>>>>> regards.
>>>>>>
>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Regarding to the code you pasted, personally I think nothing is
>>>>>>> wrong. The problem is how it's executed. As you can see from the
>>>>>>> implementation of of 
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it
>>>>>>> may created different StreamExecutionEnvironment implementations under
>>>>>>> different scenarios. Could you paste the full exception stack if it 
>>>>>>> exists?
>>>>>>> It's difficult to figure out what's wrong with the current stack trace.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Dian
>>>>>>>
>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>
>>>>>>> Can it be the case that the threadLocal stuff in
>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>>>>>>>  does
>>>>>>> not behave deterministically when we submit job through a Kubernetes 
>>>>>>> Flink
>>>>>>> operator ? Utils also selects the factory to create the context based on
>>>>>>> either Thread local storage or a static mutable variable.
>>>>>>>
>>>>>>> Can these be source of problems in our case ?
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> ah .. Ok .. I get the Throwable part. I am using
>>>>>>>>
>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>
>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any
>>>>>>>> suggestion ?
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Debasish,
>>>>>>>>>
>>>>>>>>> As I said before, the exception is caught in [1]. It catches the
>>>>>>>>> Throwable and so it could also catch "
>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the
>>>>>>>>> cause of this exception, I have the same feeling with Tison and I also
>>>>>>>>> think that the wrong StreamExecutionEnvironment is used.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dian
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>
>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi Tison -
>>>>>>>>>
>>>>>>>>> This is the code that builds the computation graph. readStream reads
>>>>>>>>> from Kafka and writeStream writes to Kafka.
>>>>>>>>>
>>>>>>>>>     override def buildExecutionGraph = {
>>>>>>>>>       val rides: DataStream[TaxiRide] =
>>>>>>>>>         readStream(inTaxiRide)
>>>>>>>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>
>>>>>>>>>       val fares: DataStream[TaxiFare] =
>>>>>>>>>         readStream(inTaxiFare)
>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>
>>>>>>>>>       val processed: DataStream[TaxiRideFare] =
>>>>>>>>>         rides
>>>>>>>>>           .connect(fares)
>>>>>>>>>           .flatMap(new EnrichmentFunction)
>>>>>>>>>
>>>>>>>>>       writeStream(out, processed)
>>>>>>>>>     }
>>>>>>>>>
>>>>>>>>> I also checked that my code enters this function
>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>  and
>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink code 
>>>>>>>>> base
>>>>>>>>> to see where this exception is caught. If I take off the tests, I 
>>>>>>>>> don't see
>>>>>>>>> any catch of this exception ..
>>>>>>>>>
>>>>>>>>> $ find . -name "*.java" | xargs grep
>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>>>>>>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class,
>>>>>>>>> timeout = 30_000)
>>>>>>>>>
>>>>>>>>> What am I missing here ?
>>>>>>>>>
>>>>>>>>> regards.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Debasish,
>>>>>>>>>>
>>>>>>>>>> As mentioned by Dian, it is an internal exception that should be
>>>>>>>>>> always caught by
>>>>>>>>>> Flink internally. I would suggest you share the job(abstractly).
>>>>>>>>>> Generally it is because
>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> tison.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一
>>>>>>>>>> 上午5:09写道:
>>>>>>>>>>
>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack?
>>>>>>>>>>> They’re usually pretty active on there.
>>>>>>>>>>>
>>>>>>>>>>> Here’s the link:
>>>>>>>>>>>
>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Austin
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes cluster
>>>>>>>>>>>> using a Flink Operator. Hence it's difficult to debug in the 
>>>>>>>>>>>> traditional
>>>>>>>>>>>> sense of the term. And all I get is the exception that I reported 
>>>>>>>>>>>> ..
>>>>>>>>>>>>
>>>>>>>>>>>> Caused by:
>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>> at
>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>
>>>>>>>>>>>> I am thinking that this exception must be coming because of
>>>>>>>>>>>> some other exceptions, which are not reported BTW. I expected a 
>>>>>>>>>>>> Caused By
>>>>>>>>>>>> portion in the stack trace. Any clue as to which area I should 
>>>>>>>>>>>> look into to
>>>>>>>>>>>> debug this.
>>>>>>>>>>>>
>>>>>>>>>>>> regards.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am getting
>>>>>>>>>>>>> this exception running my application on Kubernetes using the 
>>>>>>>>>>>>> Flink
>>>>>>>>>>>>> operator from Lyft.
>>>>>>>>>>>>>
>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This exception is used internally to get the plan of a job
>>>>>>>>>>>>>> before submitting it for execution. It's thrown with special 
>>>>>>>>>>>>>> purpose and
>>>>>>>>>>>>>> will be caught internally in [1] and will not be thrown to end 
>>>>>>>>>>>>>> users
>>>>>>>>>>>>>> usually.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> You could check the following places to find out the cause to
>>>>>>>>>>>>>> this problem:
>>>>>>>>>>>>>> 1. Check the execution environment you used
>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the
>>>>>>>>>>>>>> type of the env wrapped in StreamPlanEnvironment is 
>>>>>>>>>>>>>> OptimizerPlanEnvironment.
>>>>>>>>>>>>>> Usually it should be.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi -
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When you get an exception stack trace like this ..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>> at
>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean what
>>>>>>>>>>>>>> kind of errors can potentially lead to such a stacktrace ? In my 
>>>>>>>>>>>>>> case it
>>>>>>>>>>>>>> starts from env.execute(..) but does not give any information as 
>>>>>>>>>>>>>> to what
>>>>>>>>>>>>>> can go wrong.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any help will be appreciated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>
>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Debasish Ghosh
>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>
>>>>>>>>> Twttr: @debasishg
>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Debasish Ghosh
>>>>>> http://manning.com/ghosh2
>>>>>> http://manning.com/ghosh
>>>>>>
>>>>>> Twttr: @debasishg
>>>>>> Blog: http://debasishg.blogspot.com
>>>>>> Code: http://github.com/debasishg
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>>
>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>

Reply via email to