My final analysis is that the RestClusterClient lack of many methods
(jarUpload, jarRun, getExceptions for example) and that the submitJob (and
the JobSubmitHandler endpoint) is bugged or should be deprecated (because
it does not call the job listeners).
Indeed, if the JarRunHandler endpoint is invoked (e.b. from the Web UI) the
job listeners are invoked correctly.
In order to do what I want I see the following options (correct me if I'm
wrong):

   1. Wait for JobSubmitHandler to be fixed (if the fact that the job
   listeners are not called is a bug..maybe it is not but this should be
   documented at least)
   2. Extend the RestClusterClient in order to replace the call to the
   submitJob with jarUpload + jarRun (+jarDelete maybe)
   3. Obtain a JobID from ClientUtils.executeProgram()..but how to do that
   is not clear at all to me..is that possibile?

Thanks in advance for any support,
Flavio


On Fri, Nov 20, 2020 at 10:09 PM Flavio Pompermaier <pomperma...@okkam.it>
wrote:

> I think that the problem is that my REST service submits the job to
> the Flink standalone cluster and responds to the client with the
> submitted job ID.
> To achieve this, I was using the
> RestClusterClient<StandaloneClusterId> because with that I can use the
> following code and retrieve the JobID:
>
>     (1) JobID flinkJobId =
>
> client.submitJob(jobGraph).thenApply(DetachedJobExecutionResult::new).get().getJobID();
>
> Unfortunately this does not activate the job listener (that is quite
> surprising to me...I thought that such a listener was triggered by the
> JobManager).
> So, after Aljoscha answer I take a deeper look into the Flink CLI code
> and what it does is basically this:
>
>     (2) ClientUtils.executeProgram(new DefaultExecutorServiceLoader(),
> flinkConf, packagedProgram, false, false);
>
> That works as expected (I wasn't aware of the ThreadLocal mechanism
> used by the ContextEnvironment and StreamContextEnvironment: a very
> advanced programming technique) but it does not allow to get back the
> job id that I need..I can live with that because I can save the Flink
> Job ID in an external service when the job listener triggers the
> onJobSubmitted method but I think this mechanism is quite weird..
>
> So my question is: is there a simple way to achieve my goal? Am I
> doing something wrong?
> At the moment I had to implement a job-status polling thread after the
> line (1) but this looks like a  workaround to me..
>
> Best,
> Flavio
>
> On Thu, Nov 19, 2020 at 4:28 PM Flavio Pompermaier <pomperma...@okkam.it>
> wrote:
> >
> > You're right..I removed my flink dir and I re-extracted it and now it
> > works. Unfortunately I didn't keep the old version to understand what
> > were the difference but the error was probably caused by the fact that
> > I had a previous version of the WordCount.jar (without the listener)
> > in the flink lib dir.. (in another dev session I was experimenting in
> > running the job having the user jar in the lib dir). Sorry for the
> > confusion.
> > Just one last question: is the listener executed on the client or on
> > the job server? This is not entirely clear to me..
> >
> > Best,
> > Flavio
> >
> > On Thu, Nov 19, 2020 at 1:53 PM Andrey Zagrebin <azagre...@apache.org>
> wrote:
> > >
> > > I also tried 1.11.0 and 1.11.2, both work for me.
> > >
> > > On Thu, Nov 19, 2020 at 3:39 PM Aljoscha Krettek <aljos...@apache.org>
> wrote:
> > >>
> > >> Hmm, there was this issue:
> > >> https://issues.apache.org/jira/browse/FLINK-17744 But it should be
> fixed
> > >> in your version.
> > >>
> > >> On 19.11.20 12:58, Flavio Pompermaier wrote:
> > >> > Which version are you using?
> > >> > I used the exact same commands on Flink 1.11.0 and I didn't get the
> job
> > >> > listener output..
> > >> >
> > >> > Il gio 19 nov 2020, 12:53 Andrey Zagrebin <azagre...@apache.org>
> ha scritto:
> > >> >
> > >> >> Hi Flavio and Aljoscha,
> > >> >>
> > >> >> Sorry for the late heads up. I could not actually reproduce the
> reported
> > >> >> problem with 'flink run' and local standalone cluster on master.
> > >> >> I get the expected output with the suggested modification of
> WordCount
> > >> >> program:
> > >> >>
> > >> >> $ bin/start-cluster.sh
> > >> >>
> > >> >> $ rm -rf out; bin/flink run
> > >> >> flink/flink-examples/flink-examples-batch/target/WordCount.jar
> --output
> > >> >> flink/build-target/out
> > >> >>
> > >> >> Executing WordCount example with default input data set.
> > >> >> Use --input to specify file input.
> > >> >> **************** SUBMITTED
> > >> >> Job has been submitted with JobID c454a894d0524ccb69943b95838eea07
> > >> >> Program execution finished
> > >> >> Job with JobID c454a894d0524ccb69943b95838eea07 has finished.
> > >> >> Job Runtime: 139 ms
> > >> >>
> > >> >> **************** EXECUTED
> > >> >>
> > >> >> Best,
> > >> >> Andrey
> > >> >>
> > >> >> On Thu, Nov 19, 2020 at 2:40 PM Aljoscha Krettek <
> aljos...@apache.org>
> > >> >> wrote:
> > >> >>
> > >> >>> JobListener.onJobExecuted() is only invoked in
> > >> >>> ExecutionEnvironment.execute() and ContextEnvironment.execute().
> If none
> > >> >>> of these is still in the call chain with that setup then the
> listener
> > >> >>> will not be invoked.
> > >> >>>
> > >> >>> Also, this would only happen on the client, not on the broker (in
> your
> > >> >>> case) or the server (JobManager).
> > >> >>>
> > >> >>> Does that help to debug the problem?
> > >> >>>
> > >> >>> Aljoscha
> > >> >>>
> > >> >>> On 19.11.20 09:49, Flavio Pompermaier wrote:
> > >> >>>> I have a spring boot job server that act as a broker towards our
> > >> >>>> application and a Flink session cluster. To submit a job I use
> the
> > >> >>>> FlinkRestClient (that is also the one used in the CLI client
> when I use
> > >> >>> the
> > >> >>>> run action it if I'm not wrong). However both methods don't
> trigger the
> > >> >>> job
> > >> >>>> listener.
> > >> >>>>
> > >> >>>> Il gio 19 nov 2020, 09:39 Aljoscha Krettek <aljos...@apache.org>
> ha
> > >> >>> scritto:
> > >> >>>>
> > >> >>>>> @Flavio, when you're saying you're using the RestClusterClient,
> you are
> > >> >>>>> not actually using that manually, right? You're just submitting
> your
> > >> >>> job
> > >> >>>>> via "bin/flink run ...", right?
> > >> >>>>>
> > >> >>>>> What's the exact invocation of "bin/flink run" that you're
> using?
> > >> >>>>>
> > >> >>>>> On 19.11.20 09:29, Andrey Zagrebin wrote:
> > >> >>>>>> Hi Flavio,
> > >> >>>>>>
> > >> >>>>>> I think I can reproduce what you are reporting (assuming you
> also pass
> > >> >>>>>> '--output' to 'flink run').
> > >> >>>>>> I am not sure why it behaves like this. I would suggest filing
> a Jira
> > >> >>>>>> ticket for this.
> > >> >>>>>>
> > >> >>>>>> Best,
> > >> >>>>>> Andrey
> > >> >>>>>>
> > >> >>>>>> On Wed, Nov 18, 2020 at 9:45 AM Flavio Pompermaier <
> > >> >>> pomperma...@okkam.it
> > >> >>>>>>
> > >> >>>>>> wrote:
> > >> >>>>>>
> > >> >>>>>>> is this a bug or is it a documentation problem...?
> > >> >>>>>>>
> > >> >>>>>>> Il sab 14 nov 2020, 18:44 Flavio Pompermaier <
> pomperma...@okkam.it>
> > >> >>> ha
> > >> >>>>>>> scritto:
> > >> >>>>>>>
> > >> >>>>>>>> I've also verified that the problem persist also using a
> modified
> > >> >>>>> version
> > >> >>>>>>>> of the WordCount class.
> > >> >>>>>>>> If you add the code pasted at the end of this email at the
> end of
> > >> >>> its
> > >> >>>>>>>> main method you can verify that the listener is called if
> you run
> > >> >>> the
> > >> >>>>>>>> program from the IDE, but it's not called if you submit the
> job
> > >> >>> using
> > >> >>>>> the
> > >> >>>>>>>> CLI client using the command
> > >> >>>>>>>>
> > >> >>>>>>>>       - bin/flink run
> > >> >>>>>>>>
> > >> >>>>>
> > >> >>>
>  
> /home/okkam/git/flink/flink-examples/flink-examples-batch/target/WordCount.jar
> > >> >>>>>>>>
> > >> >>>>>>>> Maybe this is an expected result but I didn't find any
> > >> >>> documentation of
> > >> >>>>>>>> this behaviour (neither in the Javadoc or in the flink web
> site,
> > >> >>> where
> > >> >>>>> I
> > >> >>>>>>>> can't find any documentation about JobListener at all).
> > >> >>>>>>>>
> > >> >>>>>>>> [Code to add to main()]
> > >> >>>>>>>>        // emit result
> > >> >>>>>>>>        if (params.has("output")) {
> > >> >>>>>>>>          counts.writeAsCsv(params.get("output"), "\n", " ");
> > >> >>>>>>>>          // execute program
> > >> >>>>>>>>          env.registerJobListener(new JobListener() {
> > >> >>>>>>>>
> > >> >>>>>>>>            @Override
> > >> >>>>>>>>            public void onJobSubmitted(JobClient arg0,
> Throwable
> > >> >>> arg1) {
> > >> >>>>>>>>              System.out.println("****************
> SUBMITTED");
> > >> >>>>>>>>            }
> > >> >>>>>>>>
> > >> >>>>>>>>            @Override
> > >> >>>>>>>>            public void onJobExecuted(JobExecutionResult arg0,
> > >> >>> Throwable
> > >> >>>>>>>> arg1) {
> > >> >>>>>>>>              System.out.println("**************** EXECUTED");
> > >> >>>>>>>>            }
> > >> >>>>>>>>          });
> > >> >>>>>>>>          env.execute("WordCount Example");
> > >> >>>>>>>>        } else {
> > >> >>>>>>>>          System.out.println("Printing result to stdout. Use
> --output
> > >> >>> to
> > >> >>>>>>>> specify output path.");
> > >> >>>>>>>>          counts.print();
> > >> >>>>>>>>        }
> > >> >>>>>>>>
> > >> >>>>>>>> On Fri, Nov 13, 2020 at 4:25 PM Flavio Pompermaier <
> > >> >>>>> pomperma...@okkam.it>
> > >> >>>>>>>> wrote:
> > >> >>>>>>>>
> > >> >>>>>>>>> see inline
> > >> >>>>>>>>>
> > >> >>>>>>>>> Il ven 13 nov 2020, 14:31 Matthias Pohl <
> matth...@ververica.com>
> > >> >>> ha
> > >> >>>>>>>>> scritto:
> > >> >>>>>>>>>
> > >> >>>>>>>>>> Hi Flavio,
> > >> >>>>>>>>>> thanks for sharing this with the Flink community. Could
> you answer
> > >> >>>>> the
> > >> >>>>>>>>>> following questions, please:
> > >> >>>>>>>>>> - What's the code of your Job's main method?
> > >> >>>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> it's actually very simple...the main class creates a batch
> > >> >>> execution
> > >> >>>>> env
> > >> >>>>>>>>> using ExecutionEnvironment.getExecutionEnvironment(), I
> register a
> > >> >>> job
> > >> >>>>>>>>> listener to the env and I do some stuff before calling
> > >> >>> env.execute().
> > >> >>>>>>>>> The listener is executed correctly but if I use the
> > >> >>> RestClusterClient
> > >> >>>>> to
> > >> >>>>>>>>> sibmit the jobGraph exyracted from that main contained in a
> jar,
> > >> >>> the
> > >> >>>>>>>>> program is executed as usual but the job listener is not
> called.
> > >> >>>>>>>>>
> > >> >>>>>>>>> - What cluster backend and application do you use to
> execute the
> > >> >>> job?
> > >> >>>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> I use a standalone session cluster for the moment
> > >> >>>>>>>>>
> > >> >>>>>>>>> - Is there anything suspicious you can find in the logs
> that might
> > >> >>> be
> > >> >>>>>>>>>> related?
> > >> >>>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>> no unfortunately..
> > >> >>>>>>>>>
> > >> >>>>>>>>>
> > >> >>>>>>>>>> Best,
> > >> >>>>>>>>>> Matthias
> > >> >>>>>>>>>>
> > >> >>>>>>>>>> On Thu, Nov 12, 2020 at 7:48 PM Flavio Pompermaier <
> > >> >>>>>>>>>> pomperma...@okkam.it> wrote:
> > >> >>>>>>>>>>
> > >> >>>>>>>>>>> Actually what I'm experiencing is that the JobListener is
> > >> >>> executed
> > >> >>>>>>>>>>> successfully if I run my main class from the IDE, while
> the job
> > >> >>>>> listener is
> > >> >>>>>>>>>>> not fired at all if I submit the JobGraph of the
> application to a
> > >> >>>>> cluster
> > >> >>>>>>>>>>> using the RestClusterClient..
> > >> >>>>>>>>>>> Am I doing something wrong?
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> My main class ends with the env.execute() and i do
> > >> >>>>>>>>>>> env.registerJobListener() when I create the Exceution env
> > >> >>>>>>>>>>> via ExecutionEnvironment.getExecutionEnvironment().
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> Thanks in advance for any help,
> > >> >>>>>>>>>>> Flavio
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>> On Thu, Nov 12, 2020 at 2:13 PM Flavio Pompermaier <
> > >> >>>>>>>>>>> pomperma...@okkam.it> wrote:
> > >> >>>>>>>>>>>
> > >> >>>>>>>>>>>> Hello everybody,
> > >> >>>>>>>>>>>> I'm trying to use the JobListener to track when a job
> finishes
> > >> >>>>> (with
> > >> >>>>>>>>>>>> Flink 1.11.0).
> > >> >>>>>>>>>>>> It works great but I have the problem that logs inside
> > >> >>>>>>>>>>>> the onJobExecuted are not logged anywhere..is it normal?
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>> Best,
> > >> >>>>>>>>>>>> Flavio
> > >> >>>>>>>>>>>>
> > >> >>>>>>>>>>>
> > >> >>>>>>
> > >> >>>>>
> > >> >>>>>
> > >> >>>>
> > >> >>>
> > >> >>>
> > >> >
> > >>

Reply via email to