Actually there is an ongoing client API refactoring on this stuff[1] and one of the main purpose is eliminating hijacking env.execute...
Best, tison. [1] https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 下午7:12写道: > So I believe (I did't test it) the solution for this case is keeping the > original exception thrown from `env.execute()` and throwing this exception > out of main method. > It's a bit tricky, maybe we could have a better design of this scenario. > > Thanks, > Biao /'bɪ.aʊ/ > > > > On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote: > >> The key point of this case is in `PackagedProgram#callMainMethod`. >> The `ProgramAbortException` is expected when executing the main method >> here. This `ProgramAbortException` thrown is wrapped with >> `InvocationTargetException` by Java reflection layer [1]. There is a piece >> of codes handling `InvocationTargetException`. >> >> try { >> mainMethod.invoke(null, (Object) args); >> } >> catch (... >> catch (InvocationTargetException e) { >> Throwable exceptionInMethod = e.getTargetException(); >> if (exceptionInMethod instanceof Error) { >> throw (Error) exceptionInMethod; *------>* >> *`ProgramAbortException` >> would be caught expectedly here.* >> } else if (exceptionInMethod instanceof >> ProgramParametrizationException) { >> throw (ProgramParametrizationException) exceptionInMethod; >> } else if (exceptionInMethod instanceof ProgramInvocationException) { >> throw (ProgramInvocationException) exceptionInMethod; >> } else { *------> If I'm right, the wrapped exception (Boxed Error >> or something else) change the exception, it is caught here* >> throw new ProgramInvocationException("The main method caused an >> error: " + exceptionInMethod.getMessage(), exceptionInMethod); >> } >> >> The `ProgramInvocationException` is handled specially in >> `OptimizerPlanEnvironment`. >> >> try { >> prog.invokeInteractiveModeForExecution(); >> } >> catch (ProgramInvocationException e) { >> throw e; *------> The submission is failed here in this case* >> } >> catch (Throwable t) { >> // the invocation gets aborted with the preview plan >> if (optimizerPlan != null) { >> return optimizerPlan; *------> Normally it should >> be here* >> } else { >> throw new ProgramInvocationException("The program caused an error: ", >> t); >> } ... >> >> 1. >> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> Well, I think I got the solution though I am not yet sure of the problem >>> .. The original code looked like this .. >>> >>> Try { >>> // from a parent class called Runner which runs a streamlet >>> // run returns an abstraction which completes a Promise depending on >>> whether >>> // the Job was successful or not >>> val streamletExecution = >>> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >>> >>> // the runner waits for the execution to complete >>> // In normal circumstances it will run forever for streaming data >>> source unless >>> // being stopped forcibly or any of the queries faces an exception >>> Await.result(streamletExecution.completed, Duration.Inf) >>> } match { //.. >>> >>> and then the streamlet.run(..) in turn finally invoked the following .. >>> >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >>> // creates datastreams and read from / writes to Kafka >>> // I pasted the body of this earlier in the thread >>> buildExecutionGraph() >>> >>> env.execute(..) >>> >>> This DID NOT run and failed with the exception I reported earlier. But >>> when I change the code to get the run statement out of the Try block, >>> things run fine .. like this .. >>> >>> // from a parent class called Runner which runs a streamlet >>> // run returns an abstraction which completes a Promise depending on >>> whether >>> // the Job was successful or not >>> val streamletExecution = >>> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >>> >>> Try { >>> // the runner waits for the execution to complete >>> // In normal circumstances it will run forever for streaming data >>> source unless >>> // being stopped forcibly or any of the queries faces an exception >>> Await.result(streamletExecution.completed, Duration.Inf) >>> } match { //.. >>> >>> Apparently it looks like the exception that I was facing earlier leaked >>> through the Flink engine and Try caught it and it got logged. But removing >>> it out of Try now enables Flink to catch it back and follow the course that >>> it should. But I am not sure if this is a cogent explanation and looking >>> forward to some more accurate one from the experts. Note there is no >>> asynchrony of concurrency going on here - the Runner code may look a bit >>> over-engineered but there is a context to this. The Runner code handles not >>> only Flink but other types of streaming engines as well like Spark and Akka >>> Streams. >>> >>> regards. >>> >>> >>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote: >>> >>>> Hi Zili, >>>> >>>> Thanks for pointing that out. >>>> I didn't realize that it's a REST API based case. Debasish's case has >>>> been discussed not only in this thread... >>>> >>>> It's really hard to analyze the case without the full picture. >>>> >>>> I think the reason of why `ProgramAbortException` is not caught is that >>>> he did something outside `env.execute`. Like executing this piece of codes >>>> inside a Scala future. >>>> >>>> I guess the scenario is that he is submitting job through REST API. But >>>> in the main method, he wraps `env.execute` with Scala future, not executing >>>> it directly. >>>> The reason of env has been set to `StreamPlanEnvironment` is >>>> `JarHandlerUtils` retrieves job graph through it. >>>> And the `ProgramAbortException` is not thrown out, because the Scala >>>> future tackles this exception. >>>> So retrieving job graph fails due to an unrecognized exception (Boxed >>>> Error). >>>> >>>> Thanks, >>>> Biao /'bɪ.aʊ/ >>>> >>>> >>>> >>>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote: >>>> >>>>> Hi Biao, >>>>> >>>>> The log below already infers that the job was submitted via REST API >>>>> and I don't think it matters. >>>>> >>>>> at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$ >>>>> JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$ >>>>> getJobGraphAsync$6(JarRunHandler.java:142) >>>>> >>>>> What I don't understand it that flink DOES catch the exception at the >>>>> point it is reported thrown... >>>>> >>>>> Best, >>>>> tison. >>>>> >>>>> >>>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道: >>>>> >>>>>> >>>>>> > We submit the code through Kubernetes Flink Operator which uses the >>>>>> REST API to submit the job to the Job Manager >>>>>> >>>>>> So you are submitting job through REST API, not Flink client? Could >>>>>> you explain more about this? >>>>>> >>>>>> Thanks, >>>>>> Biao /'bɪ.aʊ/ >>>>>> >>>>>> >>>>>> >>>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> wrote: >>>>>> >>>>>>> Hi Dian - >>>>>>> >>>>>>> We submit one job through the operator. We just use the following to >>>>>>> complete a promise when the job completes .. >>>>>>> >>>>>>> Try { >>>>>>> createLogic.executeStreamingQueries(ctx.env) >>>>>>> }.fold( >>>>>>> th ⇒ completionPromise.tryFailure(th), >>>>>>> _ ⇒ completionPromise.trySuccess(Dun) >>>>>>> ) >>>>>>> >>>>>>> If we totally do away with the promise and future stuff then we >>>>>>> don't get the boxed error - only the exception reported in Caused By. >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Debasish, >>>>>>>> >>>>>>>> In which case will the exception occur? Does it occur when you >>>>>>>> submit one job at a time or when multiple jobs are submitted at the >>>>>>>> same >>>>>>>> time? I'm asking this because I noticed that you used Future to >>>>>>>> execute the >>>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case. >>>>>>>> >>>>>>>> Regards, >>>>>>>> Dian >>>>>>>> >>>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>>> >>>>>>>> Hi tison - >>>>>>>> >>>>>>>> Please find my response below in >>. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Debasish, >>>>>>>>> >>>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be >>>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan >>>>>>>>> in its catch (Throwable t) branch. >>>>>>>>> >>>>>>>> >>>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I >>>>>>>> am only doing val env = >>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment. >>>>>>>> >>>>>>>>> >>>>>>>>> It should always throw a ProgramInvocationException instead of >>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any >>>>>>>>> exception thrown in the main method of your code. >>>>>>>>> >>>>>>>>> Another important problem is how the code is executed, (set >>>>>>>>> context environment should be another flink internal operation) >>>>>>>>> but given that you submit the job via flink k8s operator it might >>>>>>>>> require time to take a look at k8s operator implementation. >>>>>>>>> >>>>>>>> >>>>>>>> >> We submit the code through Kubernetes Flink Operator which uses >>>>>>>> the REST API to submit the job to the Job Manager >>>>>>>> >>>>>>>>> >>>>>>>>> However, given we catch Throwable in the place this exception >>>>>>>>> thrown, I highly suspect whether it is executed by an official >>>>>>>>> flink release. >>>>>>>>> >>>>>>>> >>>>>>>> >> It is an official Flink release 1.9.0 >>>>>>>> >>>>>>>>> >>>>>>>>> A completed version of the code and the submission process is >>>>>>>>> helpful. Besides, what is buildExecutionGraph return type, >>>>>>>>> I think it is not ExecutionGraph in flink... >>>>>>>>> >>>>>>>> >>>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's >>>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka >>>>>>>> and >>>>>>>> then finally writes to Kafka. >>>>>>>> >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> tison. >>>>>>>>> >>>>>>>>> >>>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道: >>>>>>>>> >>>>>>>>>> This is the complete stack trace which we get from execution on >>>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error >>>>>>>>>> comes >>>>>>>>>> from the fact that we complete a Promise with Success when it >>>>>>>>>> returns a >>>>>>>>>> JobExecutionResult and with Failure when we get an exception. And >>>>>>>>>> here we r >>>>>>>>>> getting an exception. So the real stack trace we have is the one >>>>>>>>>> below in >>>>>>>>>> Caused By. >>>>>>>>>> >>>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error >>>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112) >>>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112) >>>>>>>>>> at >>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) >>>>>>>>>> at scala.util.Failure.fold(Try.scala:240) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) >>>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) >>>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43) >>>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30) >>>>>>>>>> at pipelines.runner.Runner.main(Runner.scala) >>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>> at >>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>> at >>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>>> at >>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>> Caused by: >>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>> at >>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) >>>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>>> at >>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) >>>>>>>>>> ... 20 more >>>>>>>>>> >>>>>>>>>> regards. >>>>>>>>>> >>>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Regarding to the code you pasted, personally I think nothing is >>>>>>>>>>> wrong. The problem is how it's executed. As you can see from the >>>>>>>>>>> implementation of of >>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it >>>>>>>>>>> may created different StreamExecutionEnvironment implementations >>>>>>>>>>> under >>>>>>>>>>> different scenarios. Could you paste the full exception stack if it >>>>>>>>>>> exists? >>>>>>>>>>> It's difficult to figure out what's wrong with the current stack >>>>>>>>>>> trace. >>>>>>>>>>> >>>>>>>>>>> Regards, >>>>>>>>>>> Dian >>>>>>>>>>> >>>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> >>>>>>>>>>> 写道: >>>>>>>>>>> >>>>>>>>>>> Can it be the case that the threadLocal stuff in >>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >>>>>>>>>>> does >>>>>>>>>>> not behave deterministically when we submit job through a >>>>>>>>>>> Kubernetes Flink >>>>>>>>>>> operator ? Utils also selects the factory to create the context >>>>>>>>>>> based on >>>>>>>>>>> either Thread local storage or a static mutable variable. >>>>>>>>>>> >>>>>>>>>>> Can these be source of problems in our case ? >>>>>>>>>>> >>>>>>>>>>> regards. >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh < >>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using >>>>>>>>>>>> >>>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>> >>>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any >>>>>>>>>>>> suggestion ? >>>>>>>>>>>> >>>>>>>>>>>> regards. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>>> >>>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches >>>>>>>>>>>>> the Throwable and so it could also catch " >>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to >>>>>>>>>>>>> the cause of this exception, I have the same feeling with Tison >>>>>>>>>>>>> and I also >>>>>>>>>>>>> think that the wrong StreamExecutionEnvironment is used. >>>>>>>>>>>>> >>>>>>>>>>>>> Regards, >>>>>>>>>>>>> Dian >>>>>>>>>>>>> >>>>>>>>>>>>> [1] >>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>>> >>>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> >>>>>>>>>>>>> 写道: >>>>>>>>>>>>> >>>>>>>>>>>>> Hi Tison - >>>>>>>>>>>>> >>>>>>>>>>>>> This is the code that builds the computation graph. readStream >>>>>>>>>>>>> reads from Kafka and writeStream writes to Kafka. >>>>>>>>>>>>> >>>>>>>>>>>>> override def buildExecutionGraph = { >>>>>>>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>>>>>>> readStream(inTaxiRide) >>>>>>>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>>> >>>>>>>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>>>>>>> readStream(inTaxiFare) >>>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>>> >>>>>>>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>>>>>>> rides >>>>>>>>>>>>> .connect(fares) >>>>>>>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>>>>>>> >>>>>>>>>>>>> writeStream(out, processed) >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> I also checked that my code enters this function >>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>>> and >>>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink >>>>>>>>>>>>> code base >>>>>>>>>>>>> to see where this exception is caught. If I take off the tests, I >>>>>>>>>>>>> don't see >>>>>>>>>>>>> any catch of this exception .. >>>>>>>>>>>>> >>>>>>>>>>>>> $ find . -name "*.java" | xargs grep >>>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException" >>>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>>>>>>>>>>>> @Test(expected = >>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class, >>>>>>>>>>>>> timeout = 30_000) >>>>>>>>>>>>> >>>>>>>>>>>>> What am I missing here ? >>>>>>>>>>>>> >>>>>>>>>>>>> regards. >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen < >>>>>>>>>>>>> wander4...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>>>> >>>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that should >>>>>>>>>>>>>> be always caught by >>>>>>>>>>>>>> Flink internally. I would suggest you share the >>>>>>>>>>>>>> job(abstractly). Generally it is because >>>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment >>>>>>>>>>>>>> directly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> tison. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> >>>>>>>>>>>>>> 于2019年9月23日周一 上午5:09写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack? >>>>>>>>>>>>>>> They’re usually pretty active on there. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Here’s the link: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> Austin >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes >>>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug >>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception >>>>>>>>>>>>>>>> that I >>>>>>>>>>>>>>>> reported .. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I am thinking that this exception must be coming because of >>>>>>>>>>>>>>>> some other exceptions, which are not reported BTW. I expected >>>>>>>>>>>>>>>> a Caused By >>>>>>>>>>>>>>>> portion in the stack trace. Any clue as to which area I should >>>>>>>>>>>>>>>> look into to >>>>>>>>>>>>>>>> debug this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am >>>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes >>>>>>>>>>>>>>>>> using the Flink >>>>>>>>>>>>>>>>> operator from Lyft. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu < >>>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a >>>>>>>>>>>>>>>>>> job before submitting it for execution. It's thrown with >>>>>>>>>>>>>>>>>> special purpose >>>>>>>>>>>>>>>>>> and will be caught internally in [1] and will not be thrown >>>>>>>>>>>>>>>>>> to end users >>>>>>>>>>>>>>>>>> usually. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> You could check the following places to find out the >>>>>>>>>>>>>>>>>> cause to this problem: >>>>>>>>>>>>>>>>>> 1. Check the execution environment you used >>>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the >>>>>>>>>>>>>>>>>> type of the env wrapped in StreamPlanEnvironment is >>>>>>>>>>>>>>>>>> OptimizerPlanEnvironment. >>>>>>>>>>>>>>>>>> Usually it should be. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>> Dian >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh < >>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi - >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> When you get an exception stack trace like this .. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean >>>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a >>>>>>>>>>>>>>>>>> stacktrace ? In my case >>>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any >>>>>>>>>>>>>>>>>> information as to what >>>>>>>>>>>>>>>>>> can go wrong. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Any help will be appreciated. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Sent from my iPhone >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>> >>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>> >>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Debasish Ghosh >>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>> >>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Debasish Ghosh >>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>> http://manning.com/ghosh >>>>>>>>>> >>>>>>>>>> Twttr: @debasishg >>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >>