Hi Debasish,

As I said before, the exception is caught in [1]. It catches the Throwable and 
so it could also catch "OptimizerPlanEnvironment.ProgramAbortException". 
Regarding to the cause of this exception, I have the same feeling with Tison 
and I also think that the wrong StreamExecutionEnvironment is used.

Regards,
Dian

[1] 
https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
 
<https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76>
> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
> 
> Hi Tison -
> 
> This is the code that builds the computation graph. readStream reads from 
> Kafka and writeStream writes to Kafka.
> 
>     override def buildExecutionGraph = {
>       val rides: DataStream[TaxiRide] =
>         readStream(inTaxiRide)
>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>           .keyBy("rideId")
> 
>       val fares: DataStream[TaxiFare] =
>         readStream(inTaxiFare)
>           .keyBy("rideId")
> 
>       val processed: DataStream[TaxiRideFare] =
>         rides
>           .connect(fares)
>           .flatMap(new EnrichmentFunction)
> 
>       writeStream(out, processed)
>     }
> 
> I also checked that my code enters this function 
> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>  
> <https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57>
>  and then the exception is thrown. I tried to do a grep on the Flink code 
> base to see where this exception is caught. If I take off the tests, I don't 
> see any catch of this exception ..
> 
> $ find . -name "*.java" | xargs grep 
> "OptimizerPlanEnvironment.ProgramAbortException"
> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>    throw new OptimizerPlanEnvironment.ProgramAbortException();
> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>         throw new OptimizerPlanEnvironment.ProgramAbortException();
> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>     throw new OptimizerPlanEnvironment.ProgramAbortException();
> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>     throw new OptimizerPlanEnvironment.ProgramAbortException();
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>        } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>         } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>  
> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>         @Test(expected = 
> OptimizerPlanEnvironment.ProgramAbortException.class, timeout = 30_000)
> 
> What am I missing here ?
> 
> regards.
> 
> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <wander4...@gmail.com 
> <mailto:wander4...@gmail.com>> wrote:
> Hi Debasish,
> 
> As mentioned by Dian, it is an internal exception that should be always 
> caught by
> Flink internally. I would suggest you share the job(abstractly). Generally it 
> is because
> you use StreamPlanEnvironment/OptimizerPlanEnvironment directly.
> 
> Best,
> tison.
> 
> 
> Austin Cawley-Edwards <austin.caw...@gmail.com 
> <mailto:austin.caw...@gmail.com>> 于2019年9月23日周一 上午5:09写道:
> Have you reached out to the FlinkK8sOperator team on Slack? They’re usually 
> pretty active on there. 
> 
> Here’s the link:
> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>  
> <https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU>
>  
> 
> 
> Best,
> Austin
> 
> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <ghosh.debas...@gmail.com 
> <mailto:ghosh.debas...@gmail.com>> wrote:
> The problem is I am submitting Flink jobs to Kubernetes cluster using a Flink 
> Operator. Hence it's difficult to debug in the traditional sense of the term. 
> And all I get is the exception that I reported ..
> 
> Caused by: 
> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
> at 
> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
> at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
> at 
> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
> 
> I am thinking that this exception must be coming because of some other 
> exceptions, which are not reported BTW. I expected a Caused By portion in the 
> stack trace. Any clue as to which area I should look into to debug this.
> 
> regards.
> 
> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <ghosh.debas...@gmail.com 
> <mailto:ghosh.debas...@gmail.com>> wrote:
> Thanks for the pointer .. I will try debugging. I am getting this exception 
> running my application on Kubernetes using the Flink operator from Lyft. 
> 
> regards.
> 
> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <dian0511...@gmail.com 
> <mailto:dian0511...@gmail.com>> wrote:
> This exception is used internally to get the plan of a job before submitting 
> it for execution. It's thrown with special purpose and will be caught 
> internally in [1] and will not be thrown to end users usually. 
> 
> You could check the following places to find out the cause to this problem:
> 1. Check the execution environment you used
> 2. If you can debug, set a breakpoint at[2] to see if the type of the env 
> wrapped in StreamPlanEnvironment is OptimizerPlanEnvironment. Usually it 
> should be.
> 
> Regards,
> Dian
> 
> [1] 
> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>  
> <https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76>
> [2] 
> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>  
> <https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57>
> 
>> 在 2019年9月21日,上午4:14,Debasish Ghosh <ghosh.debas...@gmail.com 
>> <mailto:ghosh.debas...@gmail.com>> 写道:
>> 
>> Hi -
>> 
>> When you get an exception stack trace like this ..
>> 
>> Caused by: 
>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>> at 
>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>> at 
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>> at 
>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>> 
>> what is the recommended approach of debugging ? I mean what kind of errors 
>> can potentially lead to such a stacktrace ? In my case it starts from 
>> env.execute(..) but does not give any information as to what can go wrong.
>> 
>> Any help will be appreciated.
>> 
>> regards.
>> 
>> -- 
>> Debasish Ghosh
>> http://manning.com/ghosh2 <http://manning.com/ghosh2>
>> http://manning.com/ghosh <http://manning.com/ghosh>
>> 
>> Twttr: @debasishg
>> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
>> Code: http://github.com/debasishg <http://github.com/debasishg>
> -- 
> Sent from my iPhone
> 
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 <http://manning.com/ghosh2>
> http://manning.com/ghosh <http://manning.com/ghosh>
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> Code: http://github.com/debasishg <http://github.com/debasishg>
> 
> -- 
> Debasish Ghosh
> http://manning.com/ghosh2 <http://manning.com/ghosh2>
> http://manning.com/ghosh <http://manning.com/ghosh>
> 
> Twttr: @debasishg
> Blog: http://debasishg.blogspot.com <http://debasishg.blogspot.com/>
> Code: http://github.com/debasishg <http://github.com/debasishg>

Reply via email to