Hi Debasish, The OptimizerPlanEnvironment.ProgramAbortException should be caught at OptimizerPlanEnvironment#getOptimizedPlan in its catch (Throwable t) branch.
It should always throw a ProgramInvocationException instead of OptimizerPlanEnvironment.ProgramAbortException if any exception thrown in the main method of your code. Another important problem is how the code is executed, (set context environment should be another flink internal operation) but given that you submit the job via flink k8s operator it might require time to take a look at k8s operator implementation. However, given we catch Throwable in the place this exception thrown, I highly suspect whether it is executed by an official flink release. A completed version of the code and the submission process is helpful. Besides, what is buildExecutionGraph return type, I think it is not ExecutionGraph in flink... Best, tison. Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道: > This is the complete stack trace which we get from execution on Kubernetes > using the Flink Kubernetes operator .. The boxed error comes from the fact > that we complete a Promise with Success when it returns a > JobExecutionResult and with Failure when we get an exception. And here we r > getting an exception. So the real stack trace we have is the one below in > Caused By. > > java.util.concurrent.ExecutionException: Boxed Error > at scala.concurrent.impl.Promise$.resolver(Promise.scala:87) > at > scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79) > at > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) > at scala.concurrent.Promise.tryFailure(Promise.scala:112) > at scala.concurrent.Promise.tryFailure$(Promise.scala:112) > at > scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186) > at scala.util.Failure.fold(Try.scala:240) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187) > at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153) > at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44) > at scala.util.Try$.apply(Try.scala:213) > at pipelines.runner.Runner$.run(Runner.scala:43) > at pipelines.runner.Runner$.main(Runner.scala:30) > at pipelines.runner.Runner.main(Runner.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438) > at > org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83) > at > org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80) > at > org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126) > at > org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > Caused by: > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException > at > org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > at > pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184) > at scala.util.Try$.apply(Try.scala:213) > at > pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184) > ... 20 more > > regards. > > On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com> wrote: > >> Regarding to the code you pasted, personally I think nothing is wrong. >> The problem is how it's executed. As you can see from the implementation of >> of StreamExecutionEnvironment.getExecutionEnvironment, it may created >> different StreamExecutionEnvironment implementations under different >> scenarios. Could you paste the full exception stack if it exists? It's >> difficult to figure out what's wrong with the current stack trace. >> >> Regards, >> Dian >> >> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >> >> Can it be the case that the threadLocal stuff in >> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609 >> does >> not behave deterministically when we submit job through a Kubernetes Flink >> operator ? Utils also selects the factory to create the context based on >> either Thread local storage or a static mutable variable. >> >> Can these be source of problems in our case ? >> >> regards. >> >> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <ghosh.debas...@gmail.com> >> wrote: >> >>> ah .. Ok .. I get the Throwable part. I am using >>> >>> import org.apache.flink.streaming.api.scala._ >>> val env = StreamExecutionEnvironment.getExecutionEnvironment >>> >>> How can this lead to a wrong StreamExecutionEnvironment ? Any >>> suggestion ? >>> >>> regards. >>> >>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com> wrote: >>> >>>> Hi Debasish, >>>> >>>> As I said before, the exception is caught in [1]. It catches the >>>> Throwable and so it could also catch " >>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding to the >>>> cause of this exception, I have the same feeling with Tison and I also >>>> think that the wrong StreamExecutionEnvironment is used. >>>> >>>> Regards, >>>> Dian >>>> >>>> [1] >>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>> >>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>> >>>> Hi Tison - >>>> >>>> This is the code that builds the computation graph. readStream reads >>>> from Kafka and writeStream writes to Kafka. >>>> >>>> override def buildExecutionGraph = { >>>> val rides: DataStream[TaxiRide] = >>>> readStream(inTaxiRide) >>>> .filter { ride ⇒ ride.getIsStart().booleanValue } >>>> .keyBy("rideId") >>>> >>>> val fares: DataStream[TaxiFare] = >>>> readStream(inTaxiFare) >>>> .keyBy("rideId") >>>> >>>> val processed: DataStream[TaxiRideFare] = >>>> rides >>>> .connect(fares) >>>> .flatMap(new EnrichmentFunction) >>>> >>>> writeStream(out, processed) >>>> } >>>> >>>> I also checked that my code enters this function >>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>> and >>>> then the exception is thrown. I tried to do a grep on the Flink code base >>>> to see where this exception is caught. If I take off the tests, I don't see >>>> any catch of this exception .. >>>> >>>> $ find . -name "*.java" | xargs grep >>>> "OptimizerPlanEnvironment.ProgramAbortException" >>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: >>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: >>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: >>>> throw new OptimizerPlanEnvironment.ProgramAbortException(); >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: >>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { >>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import >>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; >>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: >>>> @Test(expected = OptimizerPlanEnvironment.ProgramAbortException.class, >>>> timeout = 30_000) >>>> >>>> What am I missing here ? >>>> >>>> regards. >>>> >>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com> wrote: >>>> >>>>> Hi Debasish, >>>>> >>>>> As mentioned by Dian, it is an internal exception that should be >>>>> always caught by >>>>> Flink internally. I would suggest you share the job(abstractly). >>>>> Generally it is because >>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. >>>>> >>>>> Best, >>>>> tison. >>>>> >>>>> >>>>> Austin Cawley-Edwards <austin.caw...@gmail.com> 于2019年9月23日周一 >>>>> 上午5:09写道: >>>>> >>>>>> Have you reached out to the FlinkK8sOperator team on Slack? They’re >>>>>> usually pretty active on there. >>>>>> >>>>>> Here’s the link: >>>>>> >>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU >>>>>> >>>>>> >>>>>> >>>>>> Best, >>>>>> Austin >>>>>> >>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh < >>>>>> ghosh.debas...@gmail.com> wrote: >>>>>> >>>>>>> The problem is I am submitting Flink jobs to Kubernetes cluster >>>>>>> using a Flink Operator. Hence it's difficult to debug in the traditional >>>>>>> sense of the term. And all I get is the exception that I reported .. >>>>>>> >>>>>>> Caused by: >>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>> at >>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>> >>>>>>> I am thinking that this exception must be coming because of some >>>>>>> other exceptions, which are not reported BTW. I expected a Caused By >>>>>>> portion in the stack trace. Any clue as to which area I should look >>>>>>> into to >>>>>>> debug this. >>>>>>> >>>>>>> regards. >>>>>>> >>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh < >>>>>>> ghosh.debas...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks for the pointer .. I will try debugging. I am getting this >>>>>>>> exception running my application on Kubernetes using the Flink operator >>>>>>>> from Lyft. >>>>>>>> >>>>>>>> regards. >>>>>>>> >>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This exception is used internally to get the plan of a job before >>>>>>>>> submitting it for execution. It's thrown with special purpose and >>>>>>>>> will be >>>>>>>>> caught internally in [1] and will not be thrown to end users usually. >>>>>>>>> >>>>>>>>> You could check the following places to find out the cause to this >>>>>>>>> problem: >>>>>>>>> 1. Check the execution environment you used >>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if the type of >>>>>>>>> the env wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. >>>>>>>>> Usually it should be. >>>>>>>>> >>>>>>>>> Regards, >>>>>>>>> Dian >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 >>>>>>>>> [2] >>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 >>>>>>>>> >>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> Hi - >>>>>>>>> >>>>>>>>> When you get an exception stack trace like this .. >>>>>>>>> >>>>>>>>> Caused by: >>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >>>>>>>>> at >>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >>>>>>>>> >>>>>>>>> what is the recommended approach of debugging ? I mean what kind >>>>>>>>> of errors can potentially lead to such a stacktrace ? In my case it >>>>>>>>> starts >>>>>>>>> from env.execute(..) but does not give any information as to what can >>>>>>>>> go wrong. >>>>>>>>> >>>>>>>>> Any help will be appreciated. >>>>>>>>> >>>>>>>>> regards. >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Debasish Ghosh >>>>>>>>> http://manning.com/ghosh2 >>>>>>>>> http://manning.com/ghosh >>>>>>>>> >>>>>>>>> Twttr: @debasishg >>>>>>>>> Blog: http://debasishg.blogspot.com >>>>>>>>> Code: http://github.com/debasishg >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>> Sent from my iPhone >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Debasish Ghosh >>>>>>> http://manning.com/ghosh2 >>>>>>> http://manning.com/ghosh >>>>>>> >>>>>>> Twttr: @debasishg >>>>>>> Blog: http://debasishg.blogspot.com >>>>>>> Code: http://github.com/debasishg >>>>>>> >>>>>> >>>> >>>> -- >>>> Debasish Ghosh >>>> http://manning.com/ghosh2 >>>> http://manning.com/ghosh >>>> >>>> Twttr: @debasishg >>>> Blog: http://debasishg.blogspot.com >>>> Code: http://github.com/debasishg >>>> >>>> >>>> >>> >>> -- >>> Debasish Ghosh >>> http://manning.com/ghosh2 >>> http://manning.com/ghosh >>> >>> Twttr: @debasishg >>> Blog: http://debasishg.blogspot.com >>> Code: http://github.com/debasishg >>> >> >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 >> http://manning.com/ghosh >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com >> Code: http://github.com/debasishg >> >> >> > > -- > Debasish Ghosh > http://manning.com/ghosh2 > http://manning.com/ghosh > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com > Code: http://github.com/debasishg >