Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-11-06 Thread Flavio Pompermaier
I think it's ok.. I suggest also to add JobStatus to onJobExecuted() so you
can immediately know if the job finished successfully or if it is was
failed or canceled.

Thanks for the help,
Flavio

On Fri, Nov 6, 2020 at 10:41 AM Kostas Kloudas  wrote:

> Hi Flavio,
>
> Coould this https://issues.apache.org/jira/browse/FLINK-20020 help?
>
> Cheers,
> Kostas
>
> On Thu, Nov 5, 2020 at 9:39 PM Flavio Pompermaier 
> wrote:
> >
> > Hi everybody,
> > I was trying to use the JobListener in my job but onJobExecuted() on
> Flink 1.11.0 but I can't understand if the job succeeded or not.
> > If I look at the Javadoc of the JobListener.onJobExecute() [1] says
> "Callback on job execution finished, successfully or unsuccessfully"
> > but I can't find any simple way to infer if the job has finished
> successfully or not.
> > Do I need to perform another remote call from the client to get the job
> details using the job id?
> > I'm quite surprised that the execution result (FINISHED / CANCELED /
> FAILED) in not available in the JobExecutionResult.
> > Another strange thing is that the
> jobExecutionResult.getJobExecutionResult() returns itself..is it correct?
> >
> > Thanks in advance,
> > Flavio
> >
> > [1]
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
> >
> > On Fri, Oct 9, 2020 at 1:09 PM Matthias  wrote:
> >>
> >> Reviving this thread again after I came across FLINK-12214 [1] since
> there
> >> are use cases which might benefit from this feature. Was there some
> >> conclusion on public APIs in the meantime? Should we proceed with the
> >> discussion here?
> >>
> >> Best,
> >> Matthias
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-12214
> >>
> >>
> >>
> >> --
> >> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> >
> >
> >
> > --
> > Flavio Pompermaier
> > Development Department
> >
> > OKKAM S.r.l.
> > Tel. +(39) 0461 041809


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-11-06 Thread Kostas Kloudas
Hi Flavio,

Coould this https://issues.apache.org/jira/browse/FLINK-20020 help?

Cheers,
Kostas

On Thu, Nov 5, 2020 at 9:39 PM Flavio Pompermaier  wrote:
>
> Hi everybody,
> I was trying to use the JobListener in my job but onJobExecuted() on Flink 
> 1.11.0 but I can't understand if the job succeeded or not.
> If I look at the Javadoc of the JobListener.onJobExecute() [1] says "Callback 
> on job execution finished, successfully or unsuccessfully"
> but I can't find any simple way to infer if the job has finished successfully 
> or not.
> Do I need to perform another remote call from the client to get the job 
> details using the job id?
> I'm quite surprised that the execution result (FINISHED / CANCELED / FAILED) 
> in not available in the JobExecutionResult.
> Another strange thing is that the jobExecutionResult.getJobExecutionResult() 
> returns itself..is it correct?
>
> Thanks in advance,
> Flavio
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html
>
> On Fri, Oct 9, 2020 at 1:09 PM Matthias  wrote:
>>
>> Reviving this thread again after I came across FLINK-12214 [1] since there
>> are use cases which might benefit from this feature. Was there some
>> conclusion on public APIs in the meantime? Should we proceed with the
>> discussion here?
>>
>> Best,
>> Matthias
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-12214
>>
>>
>>
>> --
>> Sent from: 
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>
>
>
> --
> Flavio Pompermaier
> Development Department
>
> OKKAM S.r.l.
> Tel. +(39) 0461 041809


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-11-05 Thread Flavio Pompermaier
Hi everybody,
I was trying to use the JobListener in my job but onJobExecuted() on Flink
1.11.0 but I can't understand if the job succeeded or not.
If I look at the Javadoc of the JobListener.onJobExecute() [1] says
"Callback on job execution finished, successfully or unsuccessfully"
but I can't find any simple way to infer if the job has finished
successfully or not.
Do I need to perform another remote call from the client to get the job
details using the job id?
I'm quite surprised that the execution result (FINISHED / CANCELED /
FAILED) in not available in the JobExecutionResult.
Another strange thing is that
the jobExecutionResult.getJobExecutionResult() returns itself..is it
correct?

Thanks in advance,
Flavio

[1]
https://ci.apache.org/projects/flink/flink-docs-master/api/java/org/apache/flink/core/execution/JobListener.html

On Fri, Oct 9, 2020 at 1:09 PM Matthias  wrote:

> Reviving this thread again after I came across FLINK-12214 [1] since there
> are use cases which might benefit from this feature. Was there some
> conclusion on public APIs in the meantime? Should we proceed with the
> discussion here?
>
> Best,
> Matthias
>
> [1] https://issues.apache.org/jira/browse/FLINK-12214
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


-- 
Flavio Pompermaier
Development Department

OKKAM S.r.l.
Tel. +(39) 0461 041809


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2020-10-09 Thread Matthias
Reviving this thread again after I came across FLINK-12214 [1] since there
are use cases which might benefit from this feature. Was there some
conclusion on public APIs in the meantime? Should we proceed with the
discussion here?

Best,
Matthias

[1] https://issues.apache.org/jira/browse/FLINK-12214



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-05-24 Thread Chesnay Schepler
This issue is another case where we have problems figuring out the 
boundaries and responsibilities between the ExecutionEnvironments and 
the ClusterClient.


I believe we should figure this out first, and decide whether the 
ClusterClient (or anything based on it) should be made public to 
accomodate use-cases such as this.


Personally, I believe the environments to be overloaded as is and would 
very much not want more features to be added to them.


On 09/05/2019 10:13, Flavio Pompermaier wrote:

Hi everybody,
any news on this? For us would be VERY helpful to have such a feature
because we need to execute a call to a REST service once a job ends.
Right now we do this after the env.execute() but this works only if the job
is submitted via the CLI client, the REST client doesn't execute anything
after env.execute().

Best,
Flavio




On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang  wrote:


Hi  Beckett,

Thanks for your feedback, See my comments inline


  How do user specify the listener? *

What I proposal is to register JobListener in ExecutionEnvironment. I
don't think we should make ClusterClient as public api.


Where should the listener run? *

I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.


What should be reported to the Listener? *

I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.


What can the listeners do on notifications? *

Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin  于2019年4月25日周四 下午7:40写道:


Thanks for the proposal, Jeff. Adding a listener to allow users handle
events during the job lifecycle makes a lot of sense to me.

Here are my two cents.

* How do user specify the listener? *
It is not quite clear to me whether we consider ClusterClient as a public
interface? From what I understand ClusterClient is not a public interface
right now. In contrast, ExecutionEnvironment is the de facto interface for
administrative work. After job submission, it is essentially bound to a job
as an administrative handle. Given this current state, personally I feel
acceptable to have the listener registered to the ExecutionEnvironment.

* Where should the listener run? *
If the listener runs on the client side, the client have to be always
connected to the Flink cluster. This does not quite work if the Job is a
streaming job. Should we provide the option to run the listener in
JobMaster as well?

* What should be reported to the Listener? *
Besides the proposed APIs, does it make sense to also report events such
as failover?

* What can the listeners do on notifications? *
If the listeners are expected to do anything on the job, should some
helper class to manipulate the jobs be passed to the listener method?
Otherwise users may not be able to easily take action.

Thanks,

Jiangjie (Becket) Qin




On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:


Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient)
at the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done
by users. But IIUC, your suggestion of using custom ClusterClient seems
mixing these 2 steps together. Say I'd like to add new hooks, I have to
implement a new custom ClusterClient, add new hooks and call them in the
custom ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is
not supposed to understand the mechanism of ClusterClient, and should not
touch ClusterClient. What do you think ?




Till Rohrmann  于2019年4月23日周二 下午4:24写道:


I think we should not expose the ClusterClient configuration via the
ExecutionEnvironment (env.getClusterClient().addJobListener) because this
is effectively the same as exposing the JobListener interface directly on
the ExecutionEnvironment. Instead I think it could be possible to provide a
ClusterClient factory which is picked up from the Configuration or some
other mechanism for example. That way it would not need to be exposed via
the ExecutionEnvironment at all.

Cheers,
Till

On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:


  The ExecutionEnvironment is usually used by the user who writes

the code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the 

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-05-09 Thread Flavio Pompermaier
Hi everybody,
any news on this? For us would be VERY helpful to have such a feature
because we need to execute a call to a REST service once a job ends.
Right now we do this after the env.execute() but this works only if the job
is submitted via the CLI client, the REST client doesn't execute anything
after env.execute().

Best,
Flavio




On Thu, Apr 25, 2019 at 3:12 PM Jeff Zhang  wrote:

> Hi  Beckett,
>
> Thanks for your feedback, See my comments inline
>
> >>>  How do user specify the listener? *
> What I proposal is to register JobListener in ExecutionEnvironment. I
> don't think we should make ClusterClient as public api.
>
> >>> Where should the listener run? *
> I don't think it is proper to run listener in JobMaster. The listener is
> user code, and usually it is depends on user's other component. So running
> it in client side make more sense to me.
>
> >>> What should be reported to the Listener? *
> I am open to add other api in this JobListener. But for now, I am afraid
> the ExecutionEnvironment is not aware of failover, so it is not possible to
> report failover event.
>
> >>> What can the listeners do on notifications? *
> Do you mean to pass JobGraph to these methods ? like following ( I am
> afraid JobGraph is not a public and stable api, we should not expose it to
> users)
>
> public interface JobListener {
>
> void onJobSubmitted(JobGraph graph, JobID jobId);
>
> void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);
>
> void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
> }
>
>
> Becket Qin  于2019年4月25日周四 下午7:40写道:
>
>> Thanks for the proposal, Jeff. Adding a listener to allow users handle
>> events during the job lifecycle makes a lot of sense to me.
>>
>> Here are my two cents.
>>
>> * How do user specify the listener? *
>> It is not quite clear to me whether we consider ClusterClient as a public
>> interface? From what I understand ClusterClient is not a public interface
>> right now. In contrast, ExecutionEnvironment is the de facto interface for
>> administrative work. After job submission, it is essentially bound to a job
>> as an administrative handle. Given this current state, personally I feel
>> acceptable to have the listener registered to the ExecutionEnvironment.
>>
>> * Where should the listener run? *
>> If the listener runs on the client side, the client have to be always
>> connected to the Flink cluster. This does not quite work if the Job is a
>> streaming job. Should we provide the option to run the listener in
>> JobMaster as well?
>>
>> * What should be reported to the Listener? *
>> Besides the proposed APIs, does it make sense to also report events such
>> as failover?
>>
>> * What can the listeners do on notifications? *
>> If the listeners are expected to do anything on the job, should some
>> helper class to manipulate the jobs be passed to the listener method?
>> Otherwise users may not be able to easily take action.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>>
>> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>>
>>> Hi Till,
>>>
>>> IMHO, allow adding hooks involves 2 steps.
>>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>>> at the right place. This should be done by framework (flink)
>>> 2. Implement new hook implementation and add/register them into
>>> framework(flink)
>>>
>>> What I am doing is step 1 which should be done by flink, step 2 is done
>>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>>> implement a new custom ClusterClient, add new hooks and call them in the
>>> custom ClusterClient at the right place.
>>> This doesn't make sense to me. For a user who want to add hooks, he is
>>> not supposed to understand the mechanism of ClusterClient, and should not
>>> touch ClusterClient. What do you think ?
>>>
>>>
>>>
>>>
>>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>>
 I think we should not expose the ClusterClient configuration via the
 ExecutionEnvironment (env.getClusterClient().addJobListener) because this
 is effectively the same as exposing the JobListener interface directly on
 the ExecutionEnvironment. Instead I think it could be possible to provide a
 ClusterClient factory which is picked up from the Configuration or some
 other mechanism for example. That way it would not need to be exposed via
 the ExecutionEnvironment at all.

 Cheers,
 Till

 On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:

> >>>  The ExecutionEnvironment is usually used by the user who writes
> the code and this person (I assume) would not be really interested in 
> these
> callbacks.
>
> Usually ExecutionEnvironment is used by the user who write the code,
> but it doesn't needs to be created and configured by this person. e.g. in
> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Jeff Zhang
Hi  Beckett,

Thanks for your feedback, See my comments inline

>>>  How do user specify the listener? *
What I proposal is to register JobListener in ExecutionEnvironment. I don't
think we should make ClusterClient as public api.

>>> Where should the listener run? *
I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.

>>> What should be reported to the Listener? *
I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.

>>> What can the listeners do on notifications? *
Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin  于2019年4月25日周四 下午7:40写道:

> Thanks for the proposal, Jeff. Adding a listener to allow users handle
> events during the job lifecycle makes a lot of sense to me.
>
> Here are my two cents.
>
> * How do user specify the listener? *
> It is not quite clear to me whether we consider ClusterClient as a public
> interface? From what I understand ClusterClient is not a public interface
> right now. In contrast, ExecutionEnvironment is the de facto interface for
> administrative work. After job submission, it is essentially bound to a job
> as an administrative handle. Given this current state, personally I feel
> acceptable to have the listener registered to the ExecutionEnvironment.
>
> * Where should the listener run? *
> If the listener runs on the client side, the client have to be always
> connected to the Flink cluster. This does not quite work if the Job is a
> streaming job. Should we provide the option to run the listener in
> JobMaster as well?
>
> * What should be reported to the Listener? *
> Besides the proposed APIs, does it make sense to also report events such
> as failover?
>
> * What can the listeners do on notifications? *
> If the listeners are expected to do anything on the job, should some
> helper class to manipulate the jobs be passed to the listener method?
> Otherwise users may not be able to easily take action.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>
>> Hi Till,
>>
>> IMHO, allow adding hooks involves 2 steps.
>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>> at the right place. This should be done by framework (flink)
>> 2. Implement new hook implementation and add/register them into
>> framework(flink)
>>
>> What I am doing is step 1 which should be done by flink, step 2 is done
>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>> implement a new custom ClusterClient, add new hooks and call them in the
>> custom ClusterClient at the right place.
>> This doesn't make sense to me. For a user who want to add hooks, he is
>> not supposed to understand the mechanism of ClusterClient, and should not
>> touch ClusterClient. What do you think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>
>>> I think we should not expose the ClusterClient configuration via the
>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>>> is effectively the same as exposing the JobListener interface directly on
>>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>>> ClusterClient factory which is picked up from the Configuration or some
>>> other mechanism for example. That way it would not need to be exposed via
>>> the ExecutionEnvironment at all.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>>
 >>>  The ExecutionEnvironment is usually used by the user who writes
 the code and this person (I assume) would not be really interested in these
 callbacks.

 Usually ExecutionEnvironment is used by the user who write the code,
 but it doesn't needs to be created and configured by this person. e.g. in
 Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
 use ExecutionEnvironment to write flink program.  You are right that the
 end user would not be interested in these callback, but the third party
 library that integrate with zeppelin would be interested in these 
 callbacks.

 >>> In your case, it could be sufficient to offer some hooks for the
 ClusterClient or being able to provide a custom ClusterClient.

 Actually in my initial PR (https://github.com/apache/flink/pull/8190),
 I do pass JobListener to ClusterClient and invoke it there.
 

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Becket Qin
Thanks for the proposal, Jeff. Adding a listener to allow users handle
events during the job lifecycle makes a lot of sense to me.

Here are my two cents.

* How do user specify the listener? *
It is not quite clear to me whether we consider ClusterClient as a public
interface? From what I understand ClusterClient is not a public interface
right now. In contrast, ExecutionEnvironment is the de facto interface for
administrative work. After job submission, it is essentially bound to a job
as an administrative handle. Given this current state, personally I feel
acceptable to have the listener registered to the ExecutionEnvironment.

* Where should the listener run? *
If the listener runs on the client side, the client have to be always
connected to the Flink cluster. This does not quite work if the Job is a
streaming job. Should we provide the option to run the listener in
JobMaster as well?

* What should be reported to the Listener? *
Besides the proposed APIs, does it make sense to also report events such as
failover?

* What can the listeners do on notifications? *
If the listeners are expected to do anything on the job, should some helper
class to manipulate the jobs be passed to the listener method? Otherwise
users may not be able to easily take action.

Thanks,

Jiangjie (Becket) Qin




On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:

> Hi Till,
>
> IMHO, allow adding hooks involves 2 steps.
> 1. Provide hook interface, and call these hook in flink (ClusterClient) at
> the right place. This should be done by framework (flink)
> 2. Implement new hook implementation and add/register them into
> framework(flink)
>
> What I am doing is step 1 which should be done by flink, step 2 is done by
> users. But IIUC, your suggestion of using custom ClusterClient seems mixing
> these 2 steps together. Say I'd like to add new hooks, I have to implement
> a new custom ClusterClient, add new hooks and call them in the custom
> ClusterClient at the right place.
> This doesn't make sense to me. For a user who want to add hooks, he is not
> supposed to understand the mechanism of ClusterClient, and should not touch
> ClusterClient. What do you think ?
>
>
>
>
> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>
>> I think we should not expose the ClusterClient configuration via the
>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>> is effectively the same as exposing the JobListener interface directly on
>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>> ClusterClient factory which is picked up from the Configuration or some
>> other mechanism for example. That way it would not need to be exposed via
>> the ExecutionEnvironment at all.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>
>>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>>> code and this person (I assume) would not be really interested in these
>>> callbacks.
>>>
>>> Usually ExecutionEnvironment is used by the user who write the code, but
>>> it doesn't needs to be created and configured by this person. e.g. in
>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>>> use ExecutionEnvironment to write flink program.  You are right that the
>>> end user would not be interested in these callback, but the third party
>>> library that integrate with zeppelin would be interested in these callbacks.
>>>
>>> >>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient.
>>>
>>> Actually in my initial PR (https://github.com/apache/flink/pull/8190),
>>> I do pass JobListener to ClusterClient and invoke it there.
>>> But IMHO, ClusterClient is not supposed be a public api for users.
>>> Instead JobClient is the public api that user should use to control job. So
>>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>>  env.getClusterClient().addJobListener(jobListener)
>>> but I don't see its benefit compared to this.
>>>  env.addJobListener(jobListener)
>>>
>>> Overall, I think adding hooks is orthogonal with fine grained job
>>> control. And I agree that we should refactor the flink client component,
>>> but I don't think it would affect the JobListener interface. What do you
>>> think ?
>>>
>>>
>>>
>>>
>>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>>
 Thanks for starting this discussion Jeff. I can see the need for
 additional hooks for third party integrations.

 The thing I'm wondering is whether we really need/want to expose a
 JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
 usually used by the user who writes the code and this person (I assume)
 would not be really interested in these callbacks. If he would, then one
 should rather think about a better programmatic job control where the
 

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-24 Thread Jeff Zhang
Hi Till,

IMHO, allow adding hooks involves 2 steps.
1. Provide hook interface, and call these hook in flink (ClusterClient) at
the right place. This should be done by framework (flink)
2. Implement new hook implementation and add/register them into
framework(flink)

What I am doing is step 1 which should be done by flink, step 2 is done by
users. But IIUC, your suggestion of using custom ClusterClient seems mixing
these 2 steps together. Say I'd like to add new hooks, I have to implement
a new custom ClusterClient, add new hooks and call them in the custom
ClusterClient at the right place.
This doesn't make sense to me. For a user who want to add hooks, he is not
supposed to understand the mechanism of ClusterClient, and should not touch
ClusterClient. What do you think ?




Till Rohrmann  于2019年4月23日周二 下午4:24写道:

> I think we should not expose the ClusterClient configuration via the
> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
> is effectively the same as exposing the JobListener interface directly on
> the ExecutionEnvironment. Instead I think it could be possible to provide a
> ClusterClient factory which is picked up from the Configuration or some
> other mechanism for example. That way it would not need to be exposed via
> the ExecutionEnvironment at all.
>
> Cheers,
> Till
>
> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>
>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>> code and this person (I assume) would not be really interested in these
>> callbacks.
>>
>> Usually ExecutionEnvironment is used by the user who write the code, but
>> it doesn't needs to be created and configured by this person. e.g. in
>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>> use ExecutionEnvironment to write flink program.  You are right that the
>> end user would not be interested in these callback, but the third party
>> library that integrate with zeppelin would be interested in these callbacks.
>>
>> >>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient.
>>
>> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
>> do pass JobListener to ClusterClient and invoke it there.
>> But IMHO, ClusterClient is not supposed be a public api for users.
>> Instead JobClient is the public api that user should use to control job. So
>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>  env.getClusterClient().addJobListener(jobListener)
>> but I don't see its benefit compared to this.
>>  env.addJobListener(jobListener)
>>
>> Overall, I think adding hooks is orthogonal with fine grained job
>> control. And I agree that we should refactor the flink client component,
>> but I don't think it would affect the JobListener interface. What do you
>> think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>
>>> Thanks for starting this discussion Jeff. I can see the need for
>>> additional hooks for third party integrations.
>>>
>>> The thing I'm wondering is whether we really need/want to expose a
>>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>>> usually used by the user who writes the code and this person (I assume)
>>> would not be really interested in these callbacks. If he would, then one
>>> should rather think about a better programmatic job control where the
>>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>>> Moreover, we would effectively make this part of the public API and every
>>> implementation would need to offer it.
>>>
>>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient. The
>>> ClusterClient is the component responsible for the job submission and
>>> retrieval of the job result and, hence, would be able to signal when a job
>>> has been submitted or completed.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>>
 Hi Jeff,

 I personally like this proposal. From the perspective of
 programmability, the JobListener can make the third program more
 appreciable.

 The scene where I need the listener is the Flink cube engine for Apache
 Kylin. In the case, the Flink job program is embedded into the Kylin's
 executable context.

 If we could have this listener, it would be easier to integrate with
 Kylin.

 Best,
 Vino

 Jeff Zhang  于2019年4月18日周四 下午1:30写道:

>
> Hi All,
>
> I created FLINK-12214
>  for adding
> JobListener (hook) in flink job lifecycle. Since this is a new public api
> for flink, so I'd like to discuss it more widely in community to get more
> feedback.
>
> The background 

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-23 Thread Till Rohrmann
I think we should not expose the ClusterClient configuration via the
ExecutionEnvironment (env.getClusterClient().addJobListener) because this
is effectively the same as exposing the JobListener interface directly on
the ExecutionEnvironment. Instead I think it could be possible to provide a
ClusterClient factory which is picked up from the Configuration or some
other mechanism for example. That way it would not need to be exposed via
the ExecutionEnvironment at all.

Cheers,
Till

On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:

> >>>  The ExecutionEnvironment is usually used by the user who writes the
> code and this person (I assume) would not be really interested in these
> callbacks.
>
> Usually ExecutionEnvironment is used by the user who write the code, but
> it doesn't needs to be created and configured by this person. e.g. in
> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
> use ExecutionEnvironment to write flink program.  You are right that the
> end user would not be interested in these callback, but the third party
> library that integrate with zeppelin would be interested in these callbacks.
>
> >>> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient.
>
> Actually in my initial PR (https://github.com/apache/flink/pull/8190), I
> do pass JobListener to ClusterClient and invoke it there.
> But IMHO, ClusterClient is not supposed be a public api for users. Instead
> JobClient is the public api that user should use to control job. So adding
> hooks to ClusterClient directly and provide a custom ClusterClient doesn't
> make sense to me. IIUC, you are suggesting the following approach
>  env.getClusterClient().addJobListener(jobListener)
> but I don't see its benefit compared to this.
>  env.addJobListener(jobListener)
>
> Overall, I think adding hooks is orthogonal with fine grained job
> control. And I agree that we should refactor the flink client component,
> but I don't think it would affect the JobListener interface. What do you
> think ?
>
>
>
>
> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>
>> Thanks for starting this discussion Jeff. I can see the need for
>> additional hooks for third party integrations.
>>
>> The thing I'm wondering is whether we really need/want to expose a
>> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
>> usually used by the user who writes the code and this person (I assume)
>> would not be really interested in these callbacks. If he would, then one
>> should rather think about a better programmatic job control where the
>> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
>> Moreover, we would effectively make this part of the public API and every
>> implementation would need to offer it.
>>
>> In your case, it could be sufficient to offer some hooks for the
>> ClusterClient or being able to provide a custom ClusterClient. The
>> ClusterClient is the component responsible for the job submission and
>> retrieval of the job result and, hence, would be able to signal when a job
>> has been submitted or completed.
>>
>> Cheers,
>> Till
>>
>> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>>
>>> Hi Jeff,
>>>
>>> I personally like this proposal. From the perspective of
>>> programmability, the JobListener can make the third program more
>>> appreciable.
>>>
>>> The scene where I need the listener is the Flink cube engine for Apache
>>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>>> executable context.
>>>
>>> If we could have this listener, it would be easier to integrate with
>>> Kylin.
>>>
>>> Best,
>>> Vino
>>>
>>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>>

 Hi All,

 I created FLINK-12214
  for adding
 JobListener (hook) in flink job lifecycle. Since this is a new public api
 for flink, so I'd like to discuss it more widely in community to get more
 feedback.

 The background and motivation is that I am integrating flink into apache
 zeppelin (which is a notebook in case you
 don't know). And I'd like to capture some job context (like jobId) in the
 lifecycle of flink job (submission, executed, cancelled) so that I can
 manipulate job in more fined grained control (e.g. I can capture the jobId
 when job is submitted, and then associate it with one paragraph, and when
 user click the cancel button, I can call the flink cancel api to cancel
 this job)

 I believe other projects which integrate flink would need similar
 mechanism. I plan to add api addJobListener in
 ExecutionEnvironment/StreamExecutionEnvironment so that user can add
 customized hook in flink job lifecycle.

 Here's draft interface JobListener.

 public interface JobListener {

 void onJobSubmitted(JobID jobId);

 void 

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-19 Thread Jeff Zhang
>>>  The ExecutionEnvironment is usually used by the user who writes the
code and this person (I assume) would not be really interested in these
callbacks.

Usually ExecutionEnvironment is used by the user who write the code, but it
doesn't needs to be created and configured by this person. e.g. in Zeppelin
notebook, ExecutionEnvironment is created by Zeppelin, user just use
ExecutionEnvironment to write flink program.  You are right that the end
user would not be interested in these callback, but the third party library
that integrate with zeppelin would be interested in these callbacks.

>>> In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient.

Actually in my initial PR (https://github.com/apache/flink/pull/8190), I do
pass JobListener to ClusterClient and invoke it there.
But IMHO, ClusterClient is not supposed be a public api for users. Instead
JobClient is the public api that user should use to control job. So adding
hooks to ClusterClient directly and provide a custom ClusterClient doesn't
make sense to me. IIUC, you are suggesting the following approach
 env.getClusterClient().addJobListener(jobListener)
but I don't see its benefit compared to this.
 env.addJobListener(jobListener)

Overall, I think adding hooks is orthogonal with fine grained job control.
And I agree that we should refactor the flink client component, but I don't
think it would affect the JobListener interface. What do you think ?




Till Rohrmann  于2019年4月18日周四 下午8:57写道:

> Thanks for starting this discussion Jeff. I can see the need for
> additional hooks for third party integrations.
>
> The thing I'm wondering is whether we really need/want to expose a
> JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
> usually used by the user who writes the code and this person (I assume)
> would not be really interested in these callbacks. If he would, then one
> should rather think about a better programmatic job control where the
> `ExecutionEnvironment#execute` call returns a `JobClient` instance.
> Moreover, we would effectively make this part of the public API and every
> implementation would need to offer it.
>
> In your case, it could be sufficient to offer some hooks for the
> ClusterClient or being able to provide a custom ClusterClient. The
> ClusterClient is the component responsible for the job submission and
> retrieval of the job result and, hence, would be able to signal when a job
> has been submitted or completed.
>
> Cheers,
> Till
>
> On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:
>
>> Hi Jeff,
>>
>> I personally like this proposal. From the perspective of programmability,
>> the JobListener can make the third program more appreciable.
>>
>> The scene where I need the listener is the Flink cube engine for Apache
>> Kylin. In the case, the Flink job program is embedded into the Kylin's
>> executable context.
>>
>> If we could have this listener, it would be easier to integrate with
>> Kylin.
>>
>> Best,
>> Vino
>>
>> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>>
>>>
>>> Hi All,
>>>
>>> I created FLINK-12214
>>>  for adding
>>> JobListener (hook) in flink job lifecycle. Since this is a new public api
>>> for flink, so I'd like to discuss it more widely in community to get more
>>> feedback.
>>>
>>> The background and motivation is that I am integrating flink into apache
>>> zeppelin (which is a notebook in case you
>>> don't know). And I'd like to capture some job context (like jobId) in the
>>> lifecycle of flink job (submission, executed, cancelled) so that I can
>>> manipulate job in more fined grained control (e.g. I can capture the jobId
>>> when job is submitted, and then associate it with one paragraph, and when
>>> user click the cancel button, I can call the flink cancel api to cancel
>>> this job)
>>>
>>> I believe other projects which integrate flink would need similar
>>> mechanism. I plan to add api addJobListener in
>>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>>> customized hook in flink job lifecycle.
>>>
>>> Here's draft interface JobListener.
>>>
>>> public interface JobListener {
>>>
>>> void onJobSubmitted(JobID jobId);
>>>
>>> void onJobExecuted(JobExecutionResult jobResult);
>>>
>>> void onJobCanceled(JobID jobId, String savepointPath);
>>> }
>>>
>>> Let me know your comment and concern, thanks.
>>>
>>>
>>> --
>>> Best Regards
>>>
>>> Jeff Zhang
>>>
>>

-- 
Best Regards

Jeff Zhang


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-18 Thread Till Rohrmann
Thanks for starting this discussion Jeff. I can see the need for additional
hooks for third party integrations.

The thing I'm wondering is whether we really need/want to expose a
JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
usually used by the user who writes the code and this person (I assume)
would not be really interested in these callbacks. If he would, then one
should rather think about a better programmatic job control where the
`ExecutionEnvironment#execute` call returns a `JobClient` instance.
Moreover, we would effectively make this part of the public API and every
implementation would need to offer it.

In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient. The
ClusterClient is the component responsible for the job submission and
retrieval of the job result and, hence, would be able to signal when a job
has been submitted or completed.

Cheers,
Till

On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:

> Hi Jeff,
>
> I personally like this proposal. From the perspective of programmability,
> the JobListener can make the third program more appreciable.
>
> The scene where I need the listener is the Flink cube engine for Apache
> Kylin. In the case, the Flink job program is embedded into the Kylin's
> executable context.
>
> If we could have this listener, it would be easier to integrate with Kylin.
>
> Best,
> Vino
>
> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>
>>
>> Hi All,
>>
>> I created FLINK-12214  for
>> adding JobListener (hook) in flink job lifecycle. Since this is a new
>> public api for flink, so I'd like to discuss it more widely in community to
>> get more feedback.
>>
>> The background and motivation is that I am integrating flink into apache
>> zeppelin (which is a notebook in case you
>> don't know). And I'd like to capture some job context (like jobId) in the
>> lifecycle of flink job (submission, executed, cancelled) so that I can
>> manipulate job in more fined grained control (e.g. I can capture the jobId
>> when job is submitted, and then associate it with one paragraph, and when
>> user click the cancel button, I can call the flink cancel api to cancel
>> this job)
>>
>> I believe other projects which integrate flink would need similar
>> mechanism. I plan to add api addJobListener in
>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>> customized hook in flink job lifecycle.
>>
>> Here's draft interface JobListener.
>>
>> public interface JobListener {
>>
>> void onJobSubmitted(JobID jobId);
>>
>> void onJobExecuted(JobExecutionResult jobResult);
>>
>> void onJobCanceled(JobID jobId, String savepointPath);
>> }
>>
>> Let me know your comment and concern, thanks.
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-18 Thread vino yang
Hi Jeff,

I personally like this proposal. From the perspective of programmability,
the JobListener can make the third program more appreciable.

The scene where I need the listener is the Flink cube engine for Apache
Kylin. In the case, the Flink job program is embedded into the Kylin's
executable context.

If we could have this listener, it would be easier to integrate with Kylin.

Best,
Vino

Jeff Zhang  于2019年4月18日周四 下午1:30写道:

>
> Hi All,
>
> I created FLINK-12214  for
> adding JobListener (hook) in flink job lifecycle. Since this is a new
> public api for flink, so I'd like to discuss it more widely in community to
> get more feedback.
>
> The background and motivation is that I am integrating flink into apache
> zeppelin (which is a notebook in case you
> don't know). And I'd like to capture some job context (like jobId) in the
> lifecycle of flink job (submission, executed, cancelled) so that I can
> manipulate job in more fined grained control (e.g. I can capture the jobId
> when job is submitted, and then associate it with one paragraph, and when
> user click the cancel button, I can call the flink cancel api to cancel
> this job)
>
> I believe other projects which integrate flink would need similar
> mechanism. I plan to add api addJobListener in
> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
> customized hook in flink job lifecycle.
>
> Here's draft interface JobListener.
>
> public interface JobListener {
>
> void onJobSubmitted(JobID jobId);
>
> void onJobExecuted(JobExecutionResult jobResult);
>
> void onJobCanceled(JobID jobId, String savepointPath);
> }
>
> Let me know your comment and concern, thanks.
>
>
> --
> Best Regards
>
> Jeff Zhang
>


[Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-17 Thread Jeff Zhang
Hi All,

I created FLINK-12214  for
adding JobListener (hook) in flink job lifecycle. Since this is a new
public api for flink, so I'd like to discuss it more widely in community to
get more feedback.

The background and motivation is that I am integrating flink into apache
zeppelin (which is a notebook in case you
don't know). And I'd like to capture some job context (like jobId) in the
lifecycle of flink job (submission, executed, cancelled) so that I can
manipulate job in more fined grained control (e.g. I can capture the jobId
when job is submitted, and then associate it with one paragraph, and when
user click the cancel button, I can call the flink cancel api to cancel
this job)

I believe other projects which integrate flink would need similar
mechanism. I plan to add api addJobListener in
ExecutionEnvironment/StreamExecutionEnvironment so that user can add
customized hook in flink job lifecycle.

Here's draft interface JobListener.

public interface JobListener {

void onJobSubmitted(JobID jobId);

void onJobExecuted(JobExecutionResult jobResult);

void onJobCanceled(JobID jobId, String savepointPath);
}

Let me know your comment and concern, thanks.


-- 
Best Regards

Jeff Zhang