Actually there is an ongoing client API refactoring on this stuff[1] and
one of the main purpose is
eliminating hijacking env.execute...

Best,
tison.

[1]
https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E


Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 下午7:12写道:

> So I believe (I did't test it) the solution for this case is keeping the
> original exception thrown from `env.execute()` and throwing this exception
> out of main method.
> It's a bit tricky, maybe we could have a better design of this scenario.
>
> Thanks,
> Biao /'bɪ.aʊ/
>
>
>
> On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote:
>
>> The key point of this case is in `PackagedProgram#callMainMethod`.
>> The `ProgramAbortException` is expected when executing the main method
>> here. This `ProgramAbortException` thrown is wrapped with
>> `InvocationTargetException` by Java reflection layer [1]. There is a piece
>> of codes handling `InvocationTargetException`.
>>
>> try {
>>   mainMethod.invoke(null, (Object) args);
>> }
>> catch (...
>> catch (InvocationTargetException e) {
>>   Throwable exceptionInMethod = e.getTargetException();
>>   if (exceptionInMethod instanceof Error) {
>>     throw (Error) exceptionInMethod;        *------>* 
>> *`ProgramAbortException`
>> would be caught expectedly here.*
>>   } else if (exceptionInMethod instanceof
>> ProgramParametrizationException) {
>>     throw (ProgramParametrizationException) exceptionInMethod;
>>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>>     throw (ProgramInvocationException) exceptionInMethod;
>>   } else {     *------> If I'm right, the wrapped exception (Boxed Error
>> or something else) change the exception, it is caught here*
>>     throw new ProgramInvocationException("The main method caused an
>> error: " + exceptionInMethod.getMessage(), exceptionInMethod);
>>   }
>>
>> The `ProgramInvocationException` is handled specially in
>> `OptimizerPlanEnvironment`.
>>
>> try {
>>   prog.invokeInteractiveModeForExecution();
>> }
>> catch (ProgramInvocationException e) {
>>   throw e;       *------> The submission is failed here in this case*
>> }
>> catch (Throwable t) {
>>   // the invocation gets aborted with the preview plan
>>   if (optimizerPlan != null) {
>>     return optimizerPlan;                    *------> Normally it should
>> be here*
>>   } else {
>>     throw new ProgramInvocationException("The program caused an error: ",
>> t);
>>   } ...
>>
>> 1.
>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com>
>> wrote:
>>
>>> Well, I think I got the solution though I am not yet sure of the problem
>>> .. The original code looked like this ..
>>>
>>> Try {
>>>   // from a parent class called Runner which runs a streamlet
>>>   // run returns an abstraction which completes a Promise depending on
>>> whether
>>>   // the Job was successful or not
>>>   val streamletExecution =
>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>
>>>   // the runner waits for the execution to complete
>>>   // In normal circumstances it will run forever for streaming data
>>> source unless
>>>   // being stopped forcibly or any of the queries faces an exception
>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>> } match { //..
>>>
>>> and then the streamlet.run(..) in turn finally invoked the following ..
>>>
>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>> // creates datastreams and read from / writes to Kafka
>>> // I pasted the body of this earlier in the thread
>>> buildExecutionGraph()
>>>
>>> env.execute(..)
>>>
>>> This DID NOT run and failed with the exception I reported earlier. But
>>> when I change the code to get the run statement out of the Try block,
>>> things run fine .. like this ..
>>>
>>> // from a parent class called Runner which runs a streamlet
>>> // run returns an abstraction which completes a Promise depending on
>>> whether
>>> // the Job was successful or not
>>> val streamletExecution =
>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>
>>> Try {
>>>   // the runner waits for the execution to complete
>>>   // In normal circumstances it will run forever for streaming data
>>> source unless
>>>   // being stopped forcibly or any of the queries faces an exception
>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>> } match { //..
>>>
>>> Apparently it looks like the exception that I was facing earlier leaked
>>> through the Flink engine and Try caught it and it got logged. But removing
>>> it out of Try now enables Flink to catch it back and follow the course that
>>> it should. But I am not sure if this is a cogent explanation and looking
>>> forward to some more accurate one from the experts. Note there is no
>>> asynchrony of concurrency going on here - the Runner code may look a bit
>>> over-engineered but there is a context to this. The Runner code handles not
>>> only Flink but other types of streaming engines as well like Spark and Akka
>>> Streams.
>>>
>>> regards.
>>>
>>>
>>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>>
>>>> Hi Zili,
>>>>
>>>> Thanks for pointing that out.
>>>> I didn't realize that it's a REST API based case. Debasish's case has
>>>> been discussed not only in this thread...
>>>>
>>>> It's really hard to analyze the case without the full picture.
>>>>
>>>> I think the reason of why `ProgramAbortException` is not caught is that
>>>> he did something outside `env.execute`. Like executing this piece of codes
>>>> inside a Scala future.
>>>>
>>>> I guess the scenario is that he is submitting job through REST API. But
>>>> in the main method, he wraps `env.execute` with Scala future, not executing
>>>> it directly.
>>>> The reason of env has been set to `StreamPlanEnvironment` is
>>>> `JarHandlerUtils` retrieves job graph through it.
>>>> And the `ProgramAbortException` is not thrown out, because the Scala
>>>> future tackles this exception.
>>>> So retrieving job graph fails due to an unrecognized exception (Boxed
>>>> Error).
>>>>
>>>> Thanks,
>>>> Biao /'bɪ.aʊ/
>>>>
>>>>
>>>>
>>>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote:
>>>>
>>>>> Hi Biao,
>>>>>
>>>>> The log below already infers that the job was submitted via REST API
>>>>> and I don't think it matters.
>>>>>
>>>>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$
>>>>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>>>> getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>
>>>>> What I don't understand it that flink DOES catch the exception at the
>>>>> point it is reported thrown...
>>>>>
>>>>> Best,
>>>>> tison.
>>>>>
>>>>>
>>>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道:
>>>>>
>>>>>>
>>>>>> > We submit the code through Kubernetes Flink Operator which uses the
>>>>>> REST API to submit the job to the Job Manager
>>>>>>
>>>>>> So you are submitting job through REST API, not Flink client? Could
>>>>>> you explain more about this?
>>>>>>
>>>>>> Thanks,
>>>>>> Biao /'bɪ.aʊ/
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <
>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Dian -
>>>>>>>
>>>>>>> We submit one job through the operator. We just use the following to
>>>>>>> complete a promise when the job completes ..
>>>>>>>
>>>>>>>       Try {
>>>>>>>         createLogic.executeStreamingQueries(ctx.env)
>>>>>>>       }.fold(
>>>>>>>         th ⇒ completionPromise.tryFailure(th),
>>>>>>>         _ ⇒ completionPromise.trySuccess(Dun)
>>>>>>>       )
>>>>>>>
>>>>>>> If we totally do away with the promise and future stuff then we
>>>>>>> don't get the boxed error - only the exception reported in Caused By.
>>>>>>>
>>>>>>> regards.
>>>>>>>
>>>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Debasish,
>>>>>>>>
>>>>>>>> In which case will the exception occur? Does it occur when you
>>>>>>>> submit one job at a time or when multiple jobs are submitted at the 
>>>>>>>> same
>>>>>>>> time? I'm asking this because I noticed that you used Future to 
>>>>>>>> execute the
>>>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Dian
>>>>>>>>
>>>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>>
>>>>>>>> Hi tison -
>>>>>>>>
>>>>>>>> Please find my response below in >>.
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Debasish,
>>>>>>>>>
>>>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be
>>>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan
>>>>>>>>> in its catch (Throwable t) branch.
>>>>>>>>>
>>>>>>>>
>>>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I
>>>>>>>> am only doing val env =
>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> It should always throw a ProgramInvocationException instead of
>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>>>>>> exception thrown in the main method of your code.
>>>>>>>>>
>>>>>>>>> Another important problem is how the code is executed, (set
>>>>>>>>> context environment should be another flink internal operation)
>>>>>>>>> but given that you submit the job via flink k8s operator it might
>>>>>>>>> require time to take a look at k8s operator implementation.
>>>>>>>>>
>>>>>>>>
>>>>>>>> >> We submit the code through Kubernetes Flink Operator which uses
>>>>>>>> the REST API to submit the job to the Job Manager
>>>>>>>>
>>>>>>>>>
>>>>>>>>> However, given we catch Throwable in the place this exception
>>>>>>>>> thrown, I highly suspect whether it is executed by an official
>>>>>>>>> flink release.
>>>>>>>>>
>>>>>>>>
>>>>>>>> >> It is an official Flink release 1.9.0
>>>>>>>>
>>>>>>>>>
>>>>>>>>> A completed version of the code and the submission process is
>>>>>>>>> helpful. Besides, what is buildExecutionGraph return type,
>>>>>>>>> I think it is not ExecutionGraph in flink...
>>>>>>>>>
>>>>>>>>
>>>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's
>>>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka 
>>>>>>>> and
>>>>>>>> then finally writes to Kafka.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> tison.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道:
>>>>>>>>>
>>>>>>>>>> This is the complete stack trace which we get from execution on
>>>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error 
>>>>>>>>>> comes
>>>>>>>>>> from the fact that we complete a Promise with Success when it 
>>>>>>>>>> returns a
>>>>>>>>>> JobExecutionResult and with Failure when we get an exception. And 
>>>>>>>>>> here we r
>>>>>>>>>> getting an exception. So the real stack trace we have is the one 
>>>>>>>>>> below in
>>>>>>>>>> Caused By.
>>>>>>>>>>
>>>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error
>>>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
>>>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
>>>>>>>>>> at
>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
>>>>>>>>>> at scala.util.Failure.fold(Try.scala:240)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
>>>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
>>>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>>>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>> at
>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>> at
>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>> at
>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>> Caused by:
>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>> at
>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>>> at
>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
>>>>>>>>>> ... 20 more
>>>>>>>>>>
>>>>>>>>>> regards.
>>>>>>>>>>
>>>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Regarding to the code you pasted, personally I think nothing is
>>>>>>>>>>> wrong. The problem is how it's executed. As you can see from the
>>>>>>>>>>> implementation of of 
>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it
>>>>>>>>>>> may created different StreamExecutionEnvironment implementations 
>>>>>>>>>>> under
>>>>>>>>>>> different scenarios. Could you paste the full exception stack if it 
>>>>>>>>>>> exists?
>>>>>>>>>>> It's difficult to figure out what's wrong with the current stack 
>>>>>>>>>>> trace.
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Dian
>>>>>>>>>>>
>>>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Can it be the case that the threadLocal stuff in
>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>>>>>>>>>>>  does
>>>>>>>>>>> not behave deterministically when we submit job through a 
>>>>>>>>>>> Kubernetes Flink
>>>>>>>>>>> operator ? Utils also selects the factory to create the context 
>>>>>>>>>>> based on
>>>>>>>>>>> either Thread local storage or a static mutable variable.
>>>>>>>>>>>
>>>>>>>>>>> Can these be source of problems in our case ?
>>>>>>>>>>>
>>>>>>>>>>> regards.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using
>>>>>>>>>>>>
>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>
>>>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any
>>>>>>>>>>>> suggestion ?
>>>>>>>>>>>>
>>>>>>>>>>>> regards.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>>
>>>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches
>>>>>>>>>>>>> the Throwable and so it could also catch "
>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to
>>>>>>>>>>>>> the cause of this exception, I have the same feeling with Tison 
>>>>>>>>>>>>> and I also
>>>>>>>>>>>>> think that the wrong StreamExecutionEnvironment is used.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Tison -
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is the code that builds the computation graph. readStream
>>>>>>>>>>>>>  reads from Kafka and writeStream writes to Kafka.
>>>>>>>>>>>>>
>>>>>>>>>>>>>     override def buildExecutionGraph = {
>>>>>>>>>>>>>       val rides: DataStream[TaxiRide] =
>>>>>>>>>>>>>         readStream(inTaxiRide)
>>>>>>>>>>>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>>
>>>>>>>>>>>>>       val fares: DataStream[TaxiFare] =
>>>>>>>>>>>>>         readStream(inTaxiFare)
>>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>>
>>>>>>>>>>>>>       val processed: DataStream[TaxiRideFare] =
>>>>>>>>>>>>>         rides
>>>>>>>>>>>>>           .connect(fares)
>>>>>>>>>>>>>           .flatMap(new EnrichmentFunction)
>>>>>>>>>>>>>
>>>>>>>>>>>>>       writeStream(out, processed)
>>>>>>>>>>>>>     }
>>>>>>>>>>>>>
>>>>>>>>>>>>> I also checked that my code enters this function
>>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>  and
>>>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink 
>>>>>>>>>>>>> code base
>>>>>>>>>>>>> to see where this exception is caught. If I take off the tests, I 
>>>>>>>>>>>>> don't see
>>>>>>>>>>>>> any catch of this exception ..
>>>>>>>>>>>>>
>>>>>>>>>>>>> $ find . -name "*.java" | xargs grep
>>>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>>>>>>>>>>> @Test(expected = 
>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class,
>>>>>>>>>>>>> timeout = 30_000)
>>>>>>>>>>>>>
>>>>>>>>>>>>> What am I missing here ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <
>>>>>>>>>>>>> wander4...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that should
>>>>>>>>>>>>>> be always caught by
>>>>>>>>>>>>>> Flink internally. I would suggest you share the
>>>>>>>>>>>>>> job(abstractly). Generally it is because
>>>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment
>>>>>>>>>>>>>> directly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> tison.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com>
>>>>>>>>>>>>>> 于2019年9月23日周一 上午5:09写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack?
>>>>>>>>>>>>>>> They’re usually pretty active on there.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here’s the link:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes
>>>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug 
>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception 
>>>>>>>>>>>>>>>> that I
>>>>>>>>>>>>>>>> reported ..
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am thinking that this exception must be coming because of
>>>>>>>>>>>>>>>> some other exceptions, which are not reported BTW. I expected 
>>>>>>>>>>>>>>>> a Caused By
>>>>>>>>>>>>>>>> portion in the stack trace. Any clue as to which area I should 
>>>>>>>>>>>>>>>> look into to
>>>>>>>>>>>>>>>> debug this.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am
>>>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes 
>>>>>>>>>>>>>>>>> using the Flink
>>>>>>>>>>>>>>>>> operator from Lyft.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <
>>>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a
>>>>>>>>>>>>>>>>>> job before submitting it for execution. It's thrown with 
>>>>>>>>>>>>>>>>>> special purpose
>>>>>>>>>>>>>>>>>> and will be caught internally in [1] and will not be thrown 
>>>>>>>>>>>>>>>>>> to end users
>>>>>>>>>>>>>>>>>> usually.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> You could check the following places to find out the
>>>>>>>>>>>>>>>>>> cause to this problem:
>>>>>>>>>>>>>>>>>> 1. Check the execution environment you used
>>>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the
>>>>>>>>>>>>>>>>>> type of the env wrapped in StreamPlanEnvironment is 
>>>>>>>>>>>>>>>>>> OptimizerPlanEnvironment.
>>>>>>>>>>>>>>>>>> Usually it should be.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <
>>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi -
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> When you get an exception stack trace like this ..
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean
>>>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a 
>>>>>>>>>>>>>>>>>> stacktrace ? In my case
>>>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any 
>>>>>>>>>>>>>>>>>> information as to what
>>>>>>>>>>>>>>>>>> can go wrong.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Any help will be appreciated.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>
>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>
>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>
>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Debasish Ghosh
>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>
>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Debasish Ghosh
>>>>>>> http://manning.com/ghosh2
>>>>>>> http://manning.com/ghosh
>>>>>>>
>>>>>>> Twttr: @debasishg
>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>> Code: http://github.com/debasishg
>>>>>>>
>>>>>>
>>>
>>> --
>>> Debasish Ghosh
>>> http://manning.com/ghosh2
>>> http://manning.com/ghosh
>>>
>>> Twttr: @debasishg
>>> Blog: http://debasishg.blogspot.com
>>> Code: http://github.com/debasishg
>>>
>>

Reply via email to