Hi Zili, Great to hear that! Hope to see the new client soon!
Thanks, Biao /'bɪ.aʊ/ On Tue, 24 Sep 2019 at 19:23, Zili Chen <wander4...@gmail.com> wrote: > Actually there is an ongoing client API refactoring on this stuff[1] and > one of the main purpose is > eliminating hijacking env.execute... > > Best, > tison. > > [1] > https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E > > > Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 下午7:12写道: > >> So I believe (I did't test it) the solution for this case is keeping the >> original exception thrown from `env.execute()` and throwing this exception >> out of main method. >> It's a bit tricky, maybe we could have a better design of this scenario. >> >> Thanks, >> Biao /'bɪ.aʊ/ >> >> >> >> On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote: >> >>> The key point of this case is in `PackagedProgram#callMainMethod`. >>> The `ProgramAbortException` is expected when executing the main method >>> here. This `ProgramAbortException` thrown is wrapped with >>> `InvocationTargetException` by Java reflection layer [1]. There is a piece >>> of codes handling `InvocationTargetException`. >>> >>> try { >>> mainMethod.invoke(null, (Object) args); >>> } >>> catch (... >>> catch (InvocationTargetException e) { >>> Throwable exceptionInMethod = e.getTargetException(); >>> if (exceptionInMethod instanceof Error) { >>> throw (Error) exceptionInMethod; *------>* >>> *`ProgramAbortException` >>> would be caught expectedly here.* >>> } else if (exceptionInMethod instanceof >>> ProgramParametrizationException) { >>> throw (ProgramParametrizationException) exceptionInMethod; >>> } else if (exceptionInMethod instanceof ProgramInvocationException) { >>> throw (ProgramInvocationException) exceptionInMethod; >>> } else { *------> If I'm right, the wrapped exception (Boxed >>> Error or something else) change the exception, it is caught here* >>> throw new ProgramInvocationException("The main method caused an >>> error: " + exceptionInMethod.getMessage(), exceptionInMethod); >>> } >>> >>> The `ProgramInvocationException` is handled specially in >>> `OptimizerPlanEnvironment`. >>> >>> try { >>> prog.invokeInteractiveModeForExecution(); >>> } >>> catch (ProgramInvocationException e) { >>> throw e; *------> The submission is failed here in this case* >>> } >>> catch (Throwable t) { >>> // the invocation gets aborted with the preview plan >>> if (optimizerPlan != null) { >>> return optimizerPlan; *------> Normally it >>> should be here* >>> } else { >>> throw new ProgramInvocationException("The program caused an error: >>> ", t); >>> } ... >>> >>> 1. >>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception >>> >>> Thanks, >>> Biao /'bɪ.aʊ/ >>> >>> >>> >>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com> >>> wrote: >>> >>>> Well, I think I got the solution though I am not yet sure of the >>>> problem .. The original code looked like this .. >>>> >>>> Try { >>>> // from a parent class called Runner which runs a streamlet >>>> // run returns an abstraction which completes a Promise depending on >>>> whether >>>> // the Job was successful or not >>>> val streamletExecution = >>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >>>> >>>> // the runner waits for the execution to complete >>>> // In normal circumstances it will run forever for streaming data >>>> source unless >>>> // being stopped forcibly or any of the queries faces an exception >>>> Await.result(streamletExecution.completed, Duration.Inf) >>>> } match { //.. >>>> >>>> and then the streamlet.run(..) in turn finally invoked the following .. >>>> >>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>> >>>> // creates datastreams and read from / writes to Kafka >>>> // I pasted the body of this earlier in the thread >>>> buildExecutionGraph() >>>> >>>> env.execute(..) >>>> >>>> This DID NOT run and failed with the exception I reported earlier. But >>>> when I change the code to get the run statement out of the Try block, >>>> things run fine .. like this .. >>>> >>>> // from a parent class called Runner which runs a streamlet >>>> // run returns an abstraction which completes a Promise depending on >>>> whether >>>> // the Job was successful or not >>>> val streamletExecution = >>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig) >>>> >>>> Try { >>>> // the runner waits for the execution to complete >>>> // In normal circumstances it will run forever for streaming data >>>> source unless >>>> // being stopped forcibly or any of the queries faces an exception >>>> Await.result(streamletExecution.completed, Duration.Inf) >>>> } match { //.. >>>> >>>> Apparently it looks like the exception that I was facing earlier leaked >>>> through the Flink engine and Try caught it and it got logged. But removing >>>> it out of Try now enables Flink to catch it back and follow the course that >>>> it should. But I am not sure if this is a cogent explanation and looking >>>> forward to some more accurate one from the experts. Note there is no >>>> asynchrony of concurrency going on here - the Runner code may look a bit >>>> over-engineered but there is a context to this. The Runner code handles not >>>> only Flink but other types of streaming engines as well like Spark and Akka >>>> Streams. >>>> >>>> regards. >>>> >>>> >>>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote: >>>> >>>>> Hi Zili, >>>>> >>>>> Thanks for pointing that out. >>>>> I didn't realize that it's a REST API based case. Debasish's case has >>>>> been discussed not only in this thread... >>>>> >>>>> It's really hard to analyze the case without the full picture. >>>>> >>>>> I think the reason of why `ProgramAbortException` is not caught is >>>>> that he did something outside `env.execute`. Like executing this piece of >>>>> codes inside a Scala future. >>>>> >>>>> I guess the scenario is that he is submitting job through REST API. >>>>> But in the main method, he wraps `env.execute` with Scala future, not >>>>> executing it directly. >>>>> The reason of env has been set to `StreamPlanEnvironment` is >>>>> `JarHandlerUtils` retrieves job graph through it. >>>>> And the `ProgramAbortException` is not thrown out, because the Scala >>>>> future tackles this exception. >>>>> So retrieving job graph fails due to an unrecognized exception (Boxed >>>>> Error). >>>>> >>>>> Thanks, >>>>> Biao /'bɪ.aʊ/ >>>>> >>>>> >>>>> >>>>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote: >>>>> >>>>>> Hi Biao, >>>>>> >>>>>> The log below already infers that the job was submitted via REST API >>>>>> and I don't think it matters. >>>>>> >>>>>> at org.apache.flink.runtime.webmonitor.handlers.utils. >>>>>> JarHandlerUtils$JarHandlerContext.toJobGraph( >>>>>> JarHandlerUtils.java:126) >>>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$ >>>>>> getJobGraphAsync$6(JarRunHandler.java:142) >>>>>> >>>>>> What I don't understand it that flink DOES catch the exception at the >>>>>> point it is reported thrown... >>>>>> >>>>>> Best, >>>>>> tison. >>>>>> >>>>>> >>>>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道: >>>>>> >>>>>>> >>>>>>> > We submit the code through Kubernetes Flink Operator which uses >>>>>>> the REST API to submit the job to the Job Manager >>>>>>> >>>>>>> So you are submitting job through REST API, not Flink client? Could >>>>>>> you explain more about this? >>>>>>> >>>>>>> Thanks, >>>>>>> Biao /'bɪ.aʊ/ >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Dian - >>>>>>>> >>>>>>>> We submit one job through the operator. We just use the following >>>>>>>> to complete a promise when the job completes .. >>>>>>>> >>>>>>>> Try { >>>>>>>> createLogic.executeStreamingQueries(ctx.env) >>>>>>>> }.fold( >>>>>>>> th ⇒ completionPromise.tryFailure(th), >>>>>>>> _ ⇒ completionPromise.trySuccess(Dun) >>>>>>>> ) >>>>>>>> >>>>>>>> If we totally do away with the promise and future stuff then we >>>>>>>> don't get the boxed error - only the exception reported in Caused By. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi Debasish, >>>>>>>>> >>>>>>>>> In which case will the exception occur? Does it occur when you >>>>>>>>> submit one job at a time or when multiple jobs are submitted at the >>>>>>>>> same >>>>>>>>> time? I'm asking this because I noticed that you used Future to >>>>>>>>> execute the >>>>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Dian >>>>>>>>> >>>>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi tison - >>>>>>>>> >>>>>>>>> Please find my response below in >>. >>>>>>>>> >>>>>>>>> regards. >>>>>>>>> >>>>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi Debasish, >>>>>>>>>> >>>>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be >>>>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan >>>>>>>>>> in its catch (Throwable t) branch. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I >>>>>>>>> am only doing val env = >>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> It should always throw a ProgramInvocationException instead of >>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any >>>>>>>>>> exception thrown in the main method of your code. >>>>>>>>>> >>>>>>>>>> Another important problem is how the code is executed, (set >>>>>>>>>> context environment should be another flink internal operation) >>>>>>>>>> but given that you submit the job via flink k8s operator it might >>>>>>>>>> require time to take a look at k8s operator implementation. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >> We submit the code through Kubernetes Flink Operator which uses >>>>>>>>> the REST API to submit the job to the Job Manager >>>>>>>>> >>>>>>>>>> >>>>>>>>>> However, given we catch Throwable in the place this exception >>>>>>>>>> thrown, I highly suspect whether it is executed by an official >>>>>>>>>> flink release. >>>>>>>>>> >>>>>>>>> >>>>>>>>> >> It is an official Flink release 1.9.0 >>>>>>>>> >>>>>>>>>> >>>>>>>>>> A completed version of the code and the submission process is >>>>>>>>>> helpful. Besides, what is buildExecutionGraph return type, >>>>>>>>>> I think it is not ExecutionGraph in flink... >>>>>>>>>> >>>>>>>>> >>>>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's >>>>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka >>>>>>>>> and >>>>>>>>> then finally writes to Kafka. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> tison. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道: >>>>>>>>>> >>>>>>>>>>> This is the complete stack trace which we get from execution on >>>>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error >>>>>>>>>>> comes >>>>>>>>>>> from the fact that we complete a Promise with Success when it >>>>>>>>>>> returns a >>>>>>>>>>> JobExecutionResult and with Failure when we get an exception. And >>>>>>>>>>> here we r >>>>>>>>>>> getting an exception. So the real stack trace we have is the one >>>>>>>>>>> below in >>>>>>>>>>> Caused By. >>>>>>>>>>> >>>>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error >>>>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) >>>>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112) >>>>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112) >>>>>>>>>>> at >>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) >>>>>>>>>>> at scala.util.Failure.fold(Try.scala:240) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) >>>>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) >>>>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) >>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43) >>>>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30) >>>>>>>>>>> at pipelines.runner.Runner.main(Runner.scala) >>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>>>>>>>> at >>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>>>>>>>> at >>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >>>>>>>>>>> at >>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >>>>>>>>>>> at java.lang.Thread.run(Thread.java:748) >>>>>>>>>>> Caused by: >>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>> at >>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) >>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213) >>>>>>>>>>> at >>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) >>>>>>>>>>> ... 20 more >>>>>>>>>>> >>>>>>>>>>> regards. >>>>>>>>>>> >>>>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Regarding to the code you pasted, personally I think nothing is >>>>>>>>>>>> wrong. The problem is how it's executed. As you can see from the >>>>>>>>>>>> implementation of of >>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it >>>>>>>>>>>> may created different StreamExecutionEnvironment implementations >>>>>>>>>>>> under >>>>>>>>>>>> different scenarios. Could you paste the full exception stack if >>>>>>>>>>>> it exists? >>>>>>>>>>>> It's difficult to figure out what's wrong with the current stack >>>>>>>>>>>> trace. >>>>>>>>>>>> >>>>>>>>>>>> Regards, >>>>>>>>>>>> Dian >>>>>>>>>>>> >>>>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> >>>>>>>>>>>> 写道: >>>>>>>>>>>> >>>>>>>>>>>> Can it be the case that the threadLocal stuff in >>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >>>>>>>>>>>> does >>>>>>>>>>>> not behave deterministically when we submit job through a >>>>>>>>>>>> Kubernetes Flink >>>>>>>>>>>> operator ? Utils also selects the factory to create the context >>>>>>>>>>>> based on >>>>>>>>>>>> either Thread local storage or a static mutable variable. >>>>>>>>>>>> >>>>>>>>>>>> Can these be source of problems in our case ? >>>>>>>>>>>> >>>>>>>>>>>> regards. >>>>>>>>>>>> >>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh < >>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using >>>>>>>>>>>>> >>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._ >>>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>>>>>>>>>>>> >>>>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any >>>>>>>>>>>>> suggestion ? >>>>>>>>>>>>> >>>>>>>>>>>>> regards. >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>>>> >>>>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches >>>>>>>>>>>>>> the Throwable and so it could also catch " >>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding >>>>>>>>>>>>>> to the cause of this exception, I have the same feeling with >>>>>>>>>>>>>> Tison and I >>>>>>>>>>>>>> also think that the wrong StreamExecutionEnvironment is used. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>> Dian >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>>>> >>>>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> >>>>>>>>>>>>>> 写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi Tison - >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is the code that builds the computation graph. >>>>>>>>>>>>>> readStream reads from Kafka and writeStream writes to Kafka. >>>>>>>>>>>>>> >>>>>>>>>>>>>> override def buildExecutionGraph = { >>>>>>>>>>>>>> val rides: DataStream[TaxiRide] = >>>>>>>>>>>>>> readStream(inTaxiRide) >>>>>>>>>>>>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>>>> >>>>>>>>>>>>>> val fares: DataStream[TaxiFare] = >>>>>>>>>>>>>> readStream(inTaxiFare) >>>>>>>>>>>>>> .keyBy("rideId") >>>>>>>>>>>>>> >>>>>>>>>>>>>> val processed: DataStream[TaxiRideFare] = >>>>>>>>>>>>>> rides >>>>>>>>>>>>>> .connect(fares) >>>>>>>>>>>>>> .flatMap(new EnrichmentFunction) >>>>>>>>>>>>>> >>>>>>>>>>>>>> writeStream(out, processed) >>>>>>>>>>>>>> } >>>>>>>>>>>>>> >>>>>>>>>>>>>> I also checked that my code enters this function >>>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>>>> and >>>>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink >>>>>>>>>>>>>> code base >>>>>>>>>>>>>> to see where this exception is caught. If I take off the tests, >>>>>>>>>>>>>> I don't see >>>>>>>>>>>>>> any catch of this exception .. >>>>>>>>>>>>>> >>>>>>>>>>>>>> $ find . -name "*.java" | xargs grep >>>>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException" >>>>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>>>>>>>>>>>>> @Test(expected = >>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class, >>>>>>>>>>>>>> timeout = 30_000) >>>>>>>>>>>>>> >>>>>>>>>>>>>> What am I missing here ? >>>>>>>>>>>>>> >>>>>>>>>>>>>> regards. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen < >>>>>>>>>>>>>> wander4...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Hi Debasish, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that >>>>>>>>>>>>>>> should be always caught by >>>>>>>>>>>>>>> Flink internally. I would suggest you share the >>>>>>>>>>>>>>> job(abstractly). Generally it is because >>>>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment >>>>>>>>>>>>>>> directly. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>> tison. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> >>>>>>>>>>>>>>> 于2019年9月23日周一 上午5:09写道: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack? >>>>>>>>>>>>>>>> They’re usually pretty active on there. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Here’s the link: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>> Austin >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes >>>>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug >>>>>>>>>>>>>>>>> in the >>>>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception >>>>>>>>>>>>>>>>> that I >>>>>>>>>>>>>>>>> reported .. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I am thinking that this exception must be coming because >>>>>>>>>>>>>>>>> of some other exceptions, which are not reported BTW. I >>>>>>>>>>>>>>>>> expected a Caused >>>>>>>>>>>>>>>>> By portion in the stack trace. Any clue as to which area I >>>>>>>>>>>>>>>>> should look into >>>>>>>>>>>>>>>>> to debug this. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am >>>>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes >>>>>>>>>>>>>>>>>> using the Flink >>>>>>>>>>>>>>>>>> operator from Lyft. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu < >>>>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a >>>>>>>>>>>>>>>>>>> job before submitting it for execution. It's thrown with >>>>>>>>>>>>>>>>>>> special purpose >>>>>>>>>>>>>>>>>>> and will be caught internally in [1] and will not be thrown >>>>>>>>>>>>>>>>>>> to end users >>>>>>>>>>>>>>>>>>> usually. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> You could check the following places to find out the >>>>>>>>>>>>>>>>>>> cause to this problem: >>>>>>>>>>>>>>>>>>> 1. Check the execution environment you used >>>>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if >>>>>>>>>>>>>>>>>>> the type of the env wrapped in StreamPlanEnvironment is >>>>>>>>>>>>>>>>>>> OptimizerPlanEnvironment. >>>>>>>>>>>>>>>>>>> Usually it should be. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards, >>>>>>>>>>>>>>>>>>> Dian >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> [1] >>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>>>>>>>>>>>> [2] >>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh < >>>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hi - >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> When you get an exception stack trace like this .. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Caused by: >>>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>>>>>>>>>>>> at >>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean >>>>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a >>>>>>>>>>>>>>>>>>> stacktrace ? In my case >>>>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any >>>>>>>>>>>>>>>>>>> information as to what >>>>>>>>>>>>>>>>>>> can go wrong. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Any help will be appreciated. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> regards. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Sent from my iPhone >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>>> >>>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>>> >>>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Debasish Ghosh >>>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>>> >>>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Debasish Ghosh >>>>>>>>>>> http://manning.com/ghosh2 >>>>>>>>>>> http://manning.com/ghosh >>>>>>>>>>> >>>>>>>>>>> Twttr: @debasishg >>>>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>>>> Code: http://github.com/debasishg >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Debasish Ghosh >>>>>>>>> http://manning.com/ghosh2 >>>>>>>>> http://manning.com/ghosh >>>>>>>>> >>>>>>>>> Twttr: @debasishg >>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>> Code: http://github.com/debasishg >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Debasish Ghosh >>>>>>>> http://manning.com/ghosh2 >>>>>>>> http://manning.com/ghosh >>>>>>>> >>>>>>>> Twttr: @debasishg >>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>> Code: http://github.com/debasishg >>>>>>>> >>>>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>>