Hi Zili,

Great to hear that!
Hope to see the new client soon!

Thanks,
Biao /'bɪ.aʊ/



On Tue, 24 Sep 2019 at 19:23, Zili Chen <wander4...@gmail.com> wrote:

> Actually there is an ongoing client API refactoring on this stuff[1] and
> one of the main purpose is
> eliminating hijacking env.execute...
>
> Best,
> tison.
>
> [1]
> https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@%3Cdev.flink.apache.org%3E
>
>
> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 下午7:12写道:
>
>> So I believe (I did't test it) the solution for this case is keeping the
>> original exception thrown from `env.execute()` and throwing this exception
>> out of main method.
>> It's a bit tricky, maybe we could have a better design of this scenario.
>>
>> Thanks,
>> Biao /'bɪ.aʊ/
>>
>>
>>
>> On Tue, 24 Sep 2019 at 18:55, Biao Liu <mmyy1...@gmail.com> wrote:
>>
>>> The key point of this case is in `PackagedProgram#callMainMethod`.
>>> The `ProgramAbortException` is expected when executing the main method
>>> here. This `ProgramAbortException` thrown is wrapped with
>>> `InvocationTargetException` by Java reflection layer [1]. There is a piece
>>> of codes handling `InvocationTargetException`.
>>>
>>> try {
>>>   mainMethod.invoke(null, (Object) args);
>>> }
>>> catch (...
>>> catch (InvocationTargetException e) {
>>>   Throwable exceptionInMethod = e.getTargetException();
>>>   if (exceptionInMethod instanceof Error) {
>>>     throw (Error) exceptionInMethod;        *------>* 
>>> *`ProgramAbortException`
>>> would be caught expectedly here.*
>>>   } else if (exceptionInMethod instanceof
>>> ProgramParametrizationException) {
>>>     throw (ProgramParametrizationException) exceptionInMethod;
>>>   } else if (exceptionInMethod instanceof ProgramInvocationException) {
>>>     throw (ProgramInvocationException) exceptionInMethod;
>>>   } else {     *------> If I'm right, the wrapped exception (Boxed
>>> Error or something else) change the exception, it is caught here*
>>>     throw new ProgramInvocationException("The main method caused an
>>> error: " + exceptionInMethod.getMessage(), exceptionInMethod);
>>>   }
>>>
>>> The `ProgramInvocationException` is handled specially in
>>> `OptimizerPlanEnvironment`.
>>>
>>> try {
>>>   prog.invokeInteractiveModeForExecution();
>>> }
>>> catch (ProgramInvocationException e) {
>>>   throw e;       *------> The submission is failed here in this case*
>>> }
>>> catch (Throwable t) {
>>>   // the invocation gets aborted with the preview plan
>>>   if (optimizerPlan != null) {
>>>     return optimizerPlan;                    *------> Normally it
>>> should be here*
>>>   } else {
>>>     throw new ProgramInvocationException("The program caused an error:
>>> ", t);
>>>   } ...
>>>
>>> 1.
>>> https://stackoverflow.com/questions/6020719/what-could-cause-java-lang-reflect-invocationtargetexception
>>>
>>> Thanks,
>>> Biao /'bɪ.aʊ/
>>>
>>>
>>>
>>> On Tue, 24 Sep 2019 at 17:35, Debasish Ghosh <ghosh.debas...@gmail.com>
>>> wrote:
>>>
>>>> Well, I think I got the solution though I am not yet sure of the
>>>> problem .. The original code looked like this ..
>>>>
>>>> Try {
>>>>   // from a parent class called Runner which runs a streamlet
>>>>   // run returns an abstraction which completes a Promise depending on
>>>> whether
>>>>   // the Job was successful or not
>>>>   val streamletExecution =
>>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>>
>>>>   // the runner waits for the execution to complete
>>>>   // In normal circumstances it will run forever for streaming data
>>>> source unless
>>>>   // being stopped forcibly or any of the queries faces an exception
>>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>>> } match { //..
>>>>
>>>> and then the streamlet.run(..) in turn finally invoked the following ..
>>>>
>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>
>>>> // creates datastreams and read from / writes to Kafka
>>>> // I pasted the body of this earlier in the thread
>>>> buildExecutionGraph()
>>>>
>>>> env.execute(..)
>>>>
>>>> This DID NOT run and failed with the exception I reported earlier. But
>>>> when I change the code to get the run statement out of the Try block,
>>>> things run fine .. like this ..
>>>>
>>>> // from a parent class called Runner which runs a streamlet
>>>> // run returns an abstraction which completes a Promise depending on
>>>> whether
>>>> // the Job was successful or not
>>>> val streamletExecution =
>>>> loadedStreamlet.streamlet.run(withPodRuntimeConfig)
>>>>
>>>> Try {
>>>>   // the runner waits for the execution to complete
>>>>   // In normal circumstances it will run forever for streaming data
>>>> source unless
>>>>   // being stopped forcibly or any of the queries faces an exception
>>>>   Await.result(streamletExecution.completed, Duration.Inf)
>>>> } match { //..
>>>>
>>>> Apparently it looks like the exception that I was facing earlier leaked
>>>> through the Flink engine and Try caught it and it got logged. But removing
>>>> it out of Try now enables Flink to catch it back and follow the course that
>>>> it should. But I am not sure if this is a cogent explanation and looking
>>>> forward to some more accurate one from the experts. Note there is no
>>>> asynchrony of concurrency going on here - the Runner code may look a bit
>>>> over-engineered but there is a context to this. The Runner code handles not
>>>> only Flink but other types of streaming engines as well like Spark and Akka
>>>> Streams.
>>>>
>>>> regards.
>>>>
>>>>
>>>> On Tue, Sep 24, 2019 at 10:17 AM Biao Liu <mmyy1...@gmail.com> wrote:
>>>>
>>>>> Hi Zili,
>>>>>
>>>>> Thanks for pointing that out.
>>>>> I didn't realize that it's a REST API based case. Debasish's case has
>>>>> been discussed not only in this thread...
>>>>>
>>>>> It's really hard to analyze the case without the full picture.
>>>>>
>>>>> I think the reason of why `ProgramAbortException` is not caught is
>>>>> that he did something outside `env.execute`. Like executing this piece of
>>>>> codes inside a Scala future.
>>>>>
>>>>> I guess the scenario is that he is submitting job through REST API.
>>>>> But in the main method, he wraps `env.execute` with Scala future, not
>>>>> executing it directly.
>>>>> The reason of env has been set to `StreamPlanEnvironment` is
>>>>> `JarHandlerUtils` retrieves job graph through it.
>>>>> And the `ProgramAbortException` is not thrown out, because the Scala
>>>>> future tackles this exception.
>>>>> So retrieving job graph fails due to an unrecognized exception (Boxed
>>>>> Error).
>>>>>
>>>>> Thanks,
>>>>> Biao /'bɪ.aʊ/
>>>>>
>>>>>
>>>>>
>>>>> On Tue, 24 Sep 2019 at 10:44, Zili Chen <wander4...@gmail.com> wrote:
>>>>>
>>>>>> Hi Biao,
>>>>>>
>>>>>> The log below already infers that the job was submitted via REST API
>>>>>> and I don't think it matters.
>>>>>>
>>>>>> at org.apache.flink.runtime.webmonitor.handlers.utils.
>>>>>> JarHandlerUtils$JarHandlerContext.toJobGraph(
>>>>>> JarHandlerUtils.java:126)
>>>>>> at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$
>>>>>> getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>
>>>>>> What I don't understand it that flink DOES catch the exception at the
>>>>>> point it is reported thrown...
>>>>>>
>>>>>> Best,
>>>>>> tison.
>>>>>>
>>>>>>
>>>>>> Biao Liu <mmyy1...@gmail.com> 于2019年9月24日周二 上午10:34写道:
>>>>>>
>>>>>>>
>>>>>>> > We submit the code through Kubernetes Flink Operator which uses
>>>>>>> the REST API to submit the job to the Job Manager
>>>>>>>
>>>>>>> So you are submitting job through REST API, not Flink client? Could
>>>>>>> you explain more about this?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Biao /'bɪ.aʊ/
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, 24 Sep 2019 at 03:44, Debasish Ghosh <
>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hi Dian -
>>>>>>>>
>>>>>>>> We submit one job through the operator. We just use the following
>>>>>>>> to complete a promise when the job completes ..
>>>>>>>>
>>>>>>>>       Try {
>>>>>>>>         createLogic.executeStreamingQueries(ctx.env)
>>>>>>>>       }.fold(
>>>>>>>>         th ⇒ completionPromise.tryFailure(th),
>>>>>>>>         _ ⇒ completionPromise.trySuccess(Dun)
>>>>>>>>       )
>>>>>>>>
>>>>>>>> If we totally do away with the promise and future stuff then we
>>>>>>>> don't get the boxed error - only the exception reported in Caused By.
>>>>>>>>
>>>>>>>> regards.
>>>>>>>>
>>>>>>>> On Mon, Sep 23, 2019 at 10:20 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Hi Debasish,
>>>>>>>>>
>>>>>>>>> In which case will the exception occur? Does it occur when you
>>>>>>>>> submit one job at a time or when multiple jobs are submitted at the 
>>>>>>>>> same
>>>>>>>>> time? I'm asking this because I noticed that you used Future to 
>>>>>>>>> execute the
>>>>>>>>> job unblocking. I guess ThreadLocal doesn't work well in this case.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Dian
>>>>>>>>>
>>>>>>>>> 在 2019年9月23日,下午11:57,Debasish Ghosh <ghosh.debas...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> Hi tison -
>>>>>>>>>
>>>>>>>>> Please find my response below in >>.
>>>>>>>>>
>>>>>>>>> regards.
>>>>>>>>>
>>>>>>>>> On Mon, Sep 23, 2019 at 6:20 PM Zili Chen <wander4...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Debasish,
>>>>>>>>>>
>>>>>>>>>> The OptimizerPlanEnvironment.ProgramAbortException should be
>>>>>>>>>> caught at OptimizerPlanEnvironment#getOptimizedPlan
>>>>>>>>>> in its catch (Throwable t) branch.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> true but what I get is a StreamPlanEnvironment. From my code I
>>>>>>>>> am only doing val env =
>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> It should always throw a ProgramInvocationException instead of
>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException if any
>>>>>>>>>> exception thrown in the main method of your code.
>>>>>>>>>>
>>>>>>>>>> Another important problem is how the code is executed, (set
>>>>>>>>>> context environment should be another flink internal operation)
>>>>>>>>>> but given that you submit the job via flink k8s operator it might
>>>>>>>>>> require time to take a look at k8s operator implementation.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> We submit the code through Kubernetes Flink Operator which uses
>>>>>>>>> the REST API to submit the job to the Job Manager
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> However, given we catch Throwable in the place this exception
>>>>>>>>>> thrown, I highly suspect whether it is executed by an official
>>>>>>>>>> flink release.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> It is an official Flink release 1.9.0
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> A completed version of the code and the submission process is
>>>>>>>>>> helpful. Besides, what is buildExecutionGraph return type,
>>>>>>>>>> I think it is not ExecutionGraph in flink...
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> buildExecutionGraph is our function which returns a Unit. It's
>>>>>>>>> not ExecutionGraph. It builds the DataStream s by reading from Kafka 
>>>>>>>>> and
>>>>>>>>> then finally writes to Kafka.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> tison.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Debasish Ghosh <ghosh.debas...@gmail.com> 于2019年9月23日周一 下午8:21写道:
>>>>>>>>>>
>>>>>>>>>>> This is the complete stack trace which we get from execution on
>>>>>>>>>>> Kubernetes using the Flink Kubernetes operator .. The boxed error 
>>>>>>>>>>> comes
>>>>>>>>>>> from the fact that we complete a Promise with Success when it 
>>>>>>>>>>> returns a
>>>>>>>>>>> JobExecutionResult and with Failure when we get an exception. And 
>>>>>>>>>>> here we r
>>>>>>>>>>> getting an exception. So the real stack trace we have is the one 
>>>>>>>>>>> below in
>>>>>>>>>>> Caused By.
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ExecutionException: Boxed Error
>>>>>>>>>>> at scala.concurrent.impl.Promise$.resolver(Promise.scala:87)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.impl.Promise$.scala$concurrent$impl$Promise$$resolveTry(Promise.scala:79)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
>>>>>>>>>>> at scala.concurrent.Promise.tryFailure(Promise.scala:112)
>>>>>>>>>>> at scala.concurrent.Promise.tryFailure$(Promise.scala:112)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:187)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3(FlinkStreamlet.scala:186)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$3$adapted(FlinkStreamlet.scala:186)
>>>>>>>>>>> at scala.util.Failure.fold(Try.scala:240)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:187)
>>>>>>>>>>> at pipelines.flink.FlinkStreamlet.run(FlinkStreamlet.scala:153)
>>>>>>>>>>> at pipelines.runner.Runner$.$anonfun$run$3(Runner.scala:44)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>>>> at pipelines.runner.Runner$.run(Runner.scala:43)
>>>>>>>>>>> at pipelines.runner.Runner$.main(Runner.scala:30)
>>>>>>>>>>> at pipelines.runner.Runner.main(Runner.scala)
>>>>>>>>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>>>>>> at
>>>>>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>>>>>> at
>>>>>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>>>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toJobGraph(JarHandlerUtils.java:126)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$6(JarRunHandler.java:142)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>>>>>> at
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>>>>>> at java.lang.Thread.run(Thread.java:748)
>>>>>>>>>>> Caused by:
>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamletLogic.executeStreamingQueries(FlinkStreamlet.scala:320)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.$anonfun$execute$2(FlinkStreamlet.scala:184)
>>>>>>>>>>> at scala.util.Try$.apply(Try.scala:213)
>>>>>>>>>>> at
>>>>>>>>>>> pipelines.flink.FlinkStreamlet$ClusterFlinkJobExecutor$.execute(FlinkStreamlet.scala:184)
>>>>>>>>>>> ... 20 more
>>>>>>>>>>>
>>>>>>>>>>> regards.
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Sep 23, 2019 at 5:36 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Regarding to the code you pasted, personally I think nothing is
>>>>>>>>>>>> wrong. The problem is how it's executed. As you can see from the
>>>>>>>>>>>> implementation of of 
>>>>>>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment, it
>>>>>>>>>>>> may created different StreamExecutionEnvironment implementations 
>>>>>>>>>>>> under
>>>>>>>>>>>> different scenarios. Could you paste the full exception stack if 
>>>>>>>>>>>> it exists?
>>>>>>>>>>>> It's difficult to figure out what's wrong with the current stack 
>>>>>>>>>>>> trace.
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Dian
>>>>>>>>>>>>
>>>>>>>>>>>> 在 2019年9月23日,下午6:55,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>>> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Can it be the case that the threadLocal stuff in
>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java#L1609
>>>>>>>>>>>>  does
>>>>>>>>>>>> not behave deterministically when we submit job through a 
>>>>>>>>>>>> Kubernetes Flink
>>>>>>>>>>>> operator ? Utils also selects the factory to create the context 
>>>>>>>>>>>> based on
>>>>>>>>>>>> either Thread local storage or a static mutable variable.
>>>>>>>>>>>>
>>>>>>>>>>>> Can these be source of problems in our case ?
>>>>>>>>>>>>
>>>>>>>>>>>> regards.
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:58 PM Debasish Ghosh <
>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> ah .. Ok .. I get the Throwable part. I am using
>>>>>>>>>>>>>
>>>>>>>>>>>>> import org.apache.flink.streaming.api.scala._
>>>>>>>>>>>>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>>>>>>>>>>>
>>>>>>>>>>>>> How can this lead to a wrong StreamExecutionEnvironment ? Any
>>>>>>>>>>>>> suggestion ?
>>>>>>>>>>>>>
>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Sep 23, 2019 at 3:53 PM Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As I said before, the exception is caught in [1]. It catches
>>>>>>>>>>>>>> the Throwable and so it could also catch "
>>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException". Regarding
>>>>>>>>>>>>>> to the cause of this exception, I have the same feeling with 
>>>>>>>>>>>>>> Tison and I
>>>>>>>>>>>>>> also think that the wrong StreamExecutionEnvironment is used.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 在 2019年9月23日,下午6:08,Debasish Ghosh <ghosh.debas...@gmail.com>
>>>>>>>>>>>>>> 写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Tison -
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is the code that builds the computation graph.
>>>>>>>>>>>>>> readStream reads from Kafka and writeStream writes to Kafka.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>     override def buildExecutionGraph = {
>>>>>>>>>>>>>>       val rides: DataStream[TaxiRide] =
>>>>>>>>>>>>>>         readStream(inTaxiRide)
>>>>>>>>>>>>>>           .filter { ride ⇒ ride.getIsStart().booleanValue }
>>>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       val fares: DataStream[TaxiFare] =
>>>>>>>>>>>>>>         readStream(inTaxiFare)
>>>>>>>>>>>>>>           .keyBy("rideId")
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       val processed: DataStream[TaxiRideFare] =
>>>>>>>>>>>>>>         rides
>>>>>>>>>>>>>>           .connect(fares)
>>>>>>>>>>>>>>           .flatMap(new EnrichmentFunction)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>       writeStream(out, processed)
>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I also checked that my code enters this function
>>>>>>>>>>>>>> https://github.com/apache/flink/blob/release-1.9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>  and
>>>>>>>>>>>>>> then the exception is thrown. I tried to do a grep on the Flink 
>>>>>>>>>>>>>> code base
>>>>>>>>>>>>>> to see where this exception is caught. If I take off the tests, 
>>>>>>>>>>>>>> I don't see
>>>>>>>>>>>>>> any catch of this exception ..
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> $ find . -name "*.java" | xargs grep
>>>>>>>>>>>>>> "OptimizerPlanEnvironment.ProgramAbortException"
>>>>>>>>>>>>>> ./flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java:
>>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>>> ./flink-python/src/main/java/org/apache/flink/client/python/PythonDriver.java:
>>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>>> ./flink-clients/src/main/java/org/apache/flink/client/program/PreviewPlanEnvironment.java:
>>>>>>>>>>>>>> throw new OptimizerPlanEnvironment.ProgramAbortException();
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/PreviewPlanDumpTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/jsonplan/DumpCompiledPlanTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/KMeansSingleStepTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/examples/RelationalQueryCompilerTest.java:
>>>>>>>>>>>>>> } catch (OptimizerPlanEnvironment.ProgramAbortException pae) {
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/optimizer/iterations/ConnectedComponentsCoGroupTest.java:import
>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.ProgramAbortException;
>>>>>>>>>>>>>> ./flink-tests/src/test/java/org/apache/flink/test/planning/LargePlanTest.java:
>>>>>>>>>>>>>> @Test(expected = 
>>>>>>>>>>>>>> OptimizerPlanEnvironment.ProgramAbortException.class,
>>>>>>>>>>>>>> timeout = 30_000)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> What am I missing here ?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Sep 23, 2019 at 7:50 AM Zili Chen <
>>>>>>>>>>>>>> wander4...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Debasish,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As mentioned by Dian, it is an internal exception that
>>>>>>>>>>>>>>> should be always caught by
>>>>>>>>>>>>>>> Flink internally. I would suggest you share the
>>>>>>>>>>>>>>> job(abstractly). Generally it is because
>>>>>>>>>>>>>>> you use StreamPlanEnvironment/OptimizerPlanEnvironment
>>>>>>>>>>>>>>> directly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>> tison.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Austin Cawley-Edwards <austin.caw...@gmail.com>
>>>>>>>>>>>>>>> 于2019年9月23日周一 上午5:09写道:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Have you reached out to the FlinkK8sOperator team on Slack?
>>>>>>>>>>>>>>>> They’re usually pretty active on there.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Here’s the link:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> https://join.slack.com/t/flinkk8soperator/shared_invite/enQtNzIxMjc5NDYxODkxLTEwMThmN2I0M2QwYjM3ZDljYTFhMGRiNDUzM2FjZGYzNTRjYWNmYTE1NzNlNWM2YWM5NzNiNGFhMTkxZjA4OGU
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Austin
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Sep 22, 2019 at 12:38 PM Debasish Ghosh <
>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> The problem is I am submitting Flink jobs to Kubernetes
>>>>>>>>>>>>>>>>> cluster using a Flink Operator. Hence it's difficult to debug 
>>>>>>>>>>>>>>>>> in the
>>>>>>>>>>>>>>>>> traditional sense of the term. And all I get is the exception 
>>>>>>>>>>>>>>>>> that I
>>>>>>>>>>>>>>>>> reported ..
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am thinking that this exception must be coming because
>>>>>>>>>>>>>>>>> of some other exceptions, which are not reported BTW. I 
>>>>>>>>>>>>>>>>> expected a Caused
>>>>>>>>>>>>>>>>> By portion in the stack trace. Any clue as to which area I 
>>>>>>>>>>>>>>>>> should look into
>>>>>>>>>>>>>>>>> to debug this.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sat, Sep 21, 2019 at 8:10 AM Debasish Ghosh <
>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the pointer .. I will try debugging. I am
>>>>>>>>>>>>>>>>>> getting this exception running my application on Kubernetes 
>>>>>>>>>>>>>>>>>> using the Flink
>>>>>>>>>>>>>>>>>> operator from Lyft.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, 21 Sep 2019 at 6:44 AM, Dian Fu <
>>>>>>>>>>>>>>>>>> dian0511...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This exception is used internally to get the plan of a
>>>>>>>>>>>>>>>>>>> job before submitting it for execution. It's thrown with 
>>>>>>>>>>>>>>>>>>> special purpose
>>>>>>>>>>>>>>>>>>> and will be caught internally in [1] and will not be thrown 
>>>>>>>>>>>>>>>>>>> to end users
>>>>>>>>>>>>>>>>>>> usually.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> You could check the following places to find out the
>>>>>>>>>>>>>>>>>>> cause to this problem:
>>>>>>>>>>>>>>>>>>> 1. Check the execution environment you used
>>>>>>>>>>>>>>>>>>> 2. If you can debug, set a breakpoint at[2] to see if
>>>>>>>>>>>>>>>>>>> the type of the env wrapped in StreamPlanEnvironment is 
>>>>>>>>>>>>>>>>>>> OptimizerPlanEnvironment.
>>>>>>>>>>>>>>>>>>> Usually it should be.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>>>>>>>> Dian
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/e2728c0dddafcfe7fac0652084be6c7fd9714d85/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L76
>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>> https://github.com/apache/flink/blob/15a7f978a2e591451c194c41408a12e64d04e9ba/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java#L57
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 在 2019年9月21日,上午4:14,Debasish Ghosh <
>>>>>>>>>>>>>>>>>>> ghosh.debas...@gmail.com> 写道:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi -
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> When you get an exception stack trace like this ..
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Caused by:
>>>>>>>>>>>>>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment$ProgramAbortException
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamPlanEnvironment.execute(StreamPlanEnvironment.java:66)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
>>>>>>>>>>>>>>>>>>> at
>>>>>>>>>>>>>>>>>>> org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:654)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> what is the recommended approach of debugging ? I mean
>>>>>>>>>>>>>>>>>>> what kind of errors can potentially lead to such a 
>>>>>>>>>>>>>>>>>>> stacktrace ? In my case
>>>>>>>>>>>>>>>>>>> it starts from env.execute(..) but does not give any 
>>>>>>>>>>>>>>>>>>> information as to what
>>>>>>>>>>>>>>>>>>> can go wrong.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Any help will be appreciated.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> regards.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Sent from my iPhone
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>>
>>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>>
>>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Debasish Ghosh
>>>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>>>
>>>>>>>>>>> Twttr: @debasishg
>>>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Debasish Ghosh
>>>>>>>>> http://manning.com/ghosh2
>>>>>>>>> http://manning.com/ghosh
>>>>>>>>>
>>>>>>>>> Twttr: @debasishg
>>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Debasish Ghosh
>>>>>>>> http://manning.com/ghosh2
>>>>>>>> http://manning.com/ghosh
>>>>>>>>
>>>>>>>> Twttr: @debasishg
>>>>>>>> Blog: http://debasishg.blogspot.com
>>>>>>>> Code: http://github.com/debasishg
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Debasish Ghosh
>>>> http://manning.com/ghosh2
>>>> http://manning.com/ghosh
>>>>
>>>> Twttr: @debasishg
>>>> Blog: http://debasishg.blogspot.com
>>>> Code: http://github.com/debasishg
>>>>
>>>

Reply via email to