Hi Debasish, As I said before, the exception is caught in [1]. It catches the Throwable and so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". Regarding to the cause of this exception, I have the same feeling with Tison and I also think that the wrong StreamExecutionEnvironment is used.
Regards, Dian [1] https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76> > 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道: > > Hi Tison - > > This is the code that builds the computation graph. readStream reads from > Kafka and writeStream writes to Kafka. > > override def buildExecutionGraph = { > val rides: DataStream[TaxiRide] = > readStream(inTaxiRide) > .filter { ride ⇒ ride.getIsStart().booleanValue } > .keyBy("rideId") > > val fares: DataStream[TaxiFare] = > readStream(inTaxiFare) > .keyBy("rideId") > > val processed: DataStream[TaxiRideFare] = > rides > .connect(fares) > .flatMap(new EnrichmentFunction) > > writeStream(out, processed) > } > > I also checked that my code enters this function > https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 > > <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57> > and then the exception is thrown. I tried to do a grep on the Flink code > base to see where this exception is caught. If I take off the tests, I don't > see any catch of this exception .. > > $ find . -name "*.java" | xargs grep > "OptimizerPlanEnvironment.ProgramAbortException" > ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java: > throw new OptimizerPlanEnvironment.ProgramAbortException(); > ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java: > throw new OptimizerPlanEnvironment.ProgramAbortException(); > ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: > throw new OptimizerPlanEnvironment.ProgramAbortException(); > ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java: > throw new OptimizerPlanEnvironment.ProgramAbortException(); > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java: > } catch (OptimizerPlanEnvironment.ProgramAbortException pae) { > ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import > > org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException; > ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java: > @Test(expected = > OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000) > > What am I missing here ? > > regards. > > On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com > <mailto:wander4...@gmail.com>> wrote: > Hi Debasish, > > As mentioned by Dian, it is an internal exception that should be always > caught by > Flink internally. I would suggest you share the job(abstractly). Generally it > is because > you use StreamPlanEnvironment/OptimizerPlanEnvironment directly. > > Best, > tison. > > > Austin Cawley-Edwards <austin.caw...@gmail.com > <mailto:austin.caw...@gmail.com>> 于2019年9月23日周一 上午5:09写道: > Have you reached out to the FlinkK8sOperator team on Slack? They’re usually > pretty active on there. > > Here’s the link: > https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU > > <https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU> > > > > Best, > Austin > > On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com > <mailto:ghosh.debas...@gmail.com>> wrote: > The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink > Operator. Hence it's difficult to debug in the traditional sense of the term. > And all I get is the exception that I reported .. > > Caused by: > org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException > at > org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) > at > org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) > > I am thinking that this exception must be coming because of some other > exceptions, which are not reported BTW. I expected a Caused By portion in the > stack trace. Any clue as to which area I should look into to debug this. > > regards. > > On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com > <mailto:ghosh.debas...@gmail.com>> wrote: > Thanks for the pointer .. I will try debugging. I am getting this exception > running my application on Kubernetes using the Flink operator from Lyft. > > regards. > > On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> wrote: > This exception is used internally to get the plan of a job before submitting > it for execution. It's thrown with special purpose and will be caught > internally in [1] and will not be thrown to end users usually. > > You could check the following places to find out the cause to this problem: > 1. Check the execution environment you used > 2. If you can debug, set a breakpoint at[2] to see if the type of the env > wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it > should be. > > Regards, > Dian > > [1] > https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76 > > <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76> > [2] > https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57 > > <https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57> > >> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com >> <mailto:ghosh.debas...@gmail.com>> 写道: >> >> Hi - >> >> When you get an exception stack trace like this .. >> >> Caused by: >> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException >> at >> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66) >> at >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507) >> at >> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654) >> >> what is the recommended approach of debugging ? I mean what kind of errors >> can potentially lead to such a stacktrace ? In my case it starts from >> env.execute(..) but does not give any information as to what can go wrong. >> >> Any help will be appreciated. >> >> regards. >> >> -- >> Debasish Ghosh >> http://manning.com/ghosh2 <http://manning.com/ghosh2> >> http://manning.com/ghosh <http://manning.com/ghosh> >> >> Twttr: @debasishg >> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> >> Code: http://github.com/debasishg <http://github.com/debasishg> > -- > Sent from my iPhone > > > -- > Debasish Ghosh > http://manning.com/ghosh2 <http://manning.com/ghosh2> > http://manning.com/ghosh <http://manning.com/ghosh> > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> > Code: http://github.com/debasishg <http://github.com/debasishg> > > -- > Debasish Ghosh > http://manning.com/ghosh2 <http://manning.com/ghosh2> > http://manning.com/ghosh <http://manning.com/ghosh> > > Twttr: @debasishg > Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/> > Code: http://github.com/debasishg <http://github.com/debasishg>