Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-18 Thread Guowei Ma
Hi,
After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
could receive the elements(trip). If that is the case
BoundedOutOfOrdernessTimestampExtractor, which does not receive element
would not send the WM. Since that the timeWindowAll operator could not be
triggered.
You could add a shuffle() before the assignTimestampsAndWatermarks in your
second case and check if the window is triggered.  If it could be triggered
you could check the distribution of elements generated by the source.

Best,
Guowei


an0...@gmail.com  于2019年4月19日周五 上午4:10写道:

> I don't think it is the watermark. I see the same watermarks from the two
> versions of code.
>
> The processing on the keyed stream doesn't change event time at all. I can
> simply change my code to use `map` on the keyed stream to return back the
> input data, so that the window operator receives the exactly same data. The
> only difference is when I do `assignTimestampsAndWatermarks`. The result is
> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> ```java
> DataStream trips =
> env.addSource(consumer).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> @Override
> public long extractTimestamp(Trip trip) {
> return trip.endTime.getTime();
> }
> });
> KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> DataStream featurizedUserTrips = userTrips.map(trip -> trip);
> AllWindowedStream windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
> Time.days(1));
> ```
>
> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> ```java
> DataStream trips = env.addSource(consumer);
> KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> DataStream featurizedUserTrips =
> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> @Override
> public long extractTimestamp(Trip trip) {
> return trip.endTime.getTime();
> }
> });
> AllWindowedStream windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
> Time.days(1));
> ```
>
> It feels a bug to me, but I want to confirm it before I file the bug
> report.
>
> On 2019/04/18 03:38:34, Paul Lam  wrote:
> > Hi,
> >
> > Could you check the watermark of the window operator? One possible
> situation would be some of the keys are not getting enough inputs, so their
> watermarks remain below the window end time and hold the window operator
> watermark back. IMO, it’s a good practice to assign watermark earlier in
> the data pipeline.
> >
> > Best,
> > Paul Lam
> >
> > > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > >
> > > `assignTimestampsAndWatermarks` before `keyBy` works:
> > > ```java
> > > DataStream trips =
> > >env.addSource(consumer).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > >@Override
> > >public long extractTimestamp(Trip trip) {
> > >return trip.endTime.getTime();
> > >}
> > >});
> > > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream featurizedUserTrips = userTrips.process(new
> Featurization());
> > > AllWindowedStream windowedUserTrips =
> > >featurizedUserTrips.timeWindowAll(Time.days(7),
> > >Time.days(1));
> > > ```
> > >
> > > But not after `keyBy` and `process`:
> > > ```java
> > > DataStream trips = env.addSource(consumer);
> > > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > > DataStream featurizedUserTrips =
> > >userTrips.process(new
> Featurization()).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> > >@Override
> > >public long extractTimestamp(FeaturizedTrip trip) {
> > >return trip.endTime.getTime();
> > >}
> > >});
> > > AllWindowedStream windowedUserTrips =
> > >featurizedUserTrips.timeWindowAll(Time.days(7),
> > >Time.days(1));
> > > ```
> > > Windows are never triggered.
> > >
> > > Is it a bug or expected behavior? If the latter, where is it
> documented?
> > >
> >
> >
>


Re: Identify orphan records after joining two streams

2019-04-18 Thread Averell
Thank you Hecheng.

I just tried to use Table API as your suggestion, and it almost worked (it
worked with two issues here below):
- I only get the output when my event-time watermark goes pass the end
of the tumbling window. But, because I know that there are maximum 2 records
per window (one from each stream), I would like to collect my output record
as soon as I received two input records. With low-level-API, I believe I can
do this with Trigger. Can I achieve a similar result with Table API?
- In the UDAggF document, I saw a recommendation to use Java instead of
Scala. Does this apply to the low-level-API functions as well? 

Thanks and best regards,
Averell



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


host name in REST API

2019-04-18 Thread burgesschen
Hi everyone,

I am having a problem and hope some one can provide insights here.
I am trying to make use of the queryable state feature. 
due to https://issues.apache.org/jira/browse/FLINK-10225, I have to know on
which task managers the job is deployed. 
My idea is to use REST API (/jobs/:jobid/vertices/:vertexid) , by looking up
the taskmanager's host name from one of the vertices id of that job to get
the address. however, for some reason the host name is an internal ip
address and it is not reachable from outside of the cluster. 
Is there a way to set the host name to be public ip?
I tried to use taskmanager.host but it did not help.

Thank you very much for you time!

Cheers,
- Chen



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-18 Thread an00na
I don't think it is the watermark. I see the same watermarks from the two 
versions of code.

The processing on the keyed stream doesn't change event time at all. I can 
simply change my code to use `map` on the keyed stream to return back the input 
data, so that the window operator receives the exactly same data. The only 
difference is when I do `assignTimestampsAndWatermarks`. The result is the 
same, `assignTimestampsAndWatermarks` before `keyBy` works:
```java
DataStream trips =
env.addSource(consumer).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
DataStream featurizedUserTrips = userTrips.map(trip -> trip);
AllWindowedStream windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```

`assignTimestampsAndWatermarks` after `keyBy` doesn't work:
```java
DataStream trips = env.addSource(consumer);
KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
DataStream featurizedUserTrips =
userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new 
BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
@Override
public long extractTimestamp(Trip trip) {
return trip.endTime.getTime();
}
});
AllWindowedStream windowedUserTrips = 
featurizedUserTrips.timeWindowAll(Time.days(7),
Time.days(1));
```

It feels a bug to me, but I want to confirm it before I file the bug report.

On 2019/04/18 03:38:34, Paul Lam  wrote: 
> Hi,
> 
> Could you check the watermark of the window operator? One possible situation 
> would be some of the keys are not getting enough inputs, so their watermarks 
> remain below the window end time and hold the window operator watermark back. 
> IMO, it’s a good practice to assign watermark earlier in the data pipeline.
> 
> Best,
> Paul Lam
> 
> > 在 2019年4月17日,23:04,an0...@gmail.com 写道:
> > 
> > `assignTimestampsAndWatermarks` before `keyBy` works:
> > ```java
> > DataStream trips =
> >env.addSource(consumer).assignTimestampsAndWatermarks(new 
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> >@Override
> >public long extractTimestamp(Trip trip) {
> >return trip.endTime.getTime();
> >}
> >});
> > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream featurizedUserTrips = userTrips.process(new 
> > Featurization());
> > AllWindowedStream windowedUserTrips =
> >featurizedUserTrips.timeWindowAll(Time.days(7),
> >Time.days(1));
> > ```
> > 
> > But not after `keyBy` and `process`:
> > ```java
> > DataStream trips = env.addSource(consumer);
> > KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> > DataStream featurizedUserTrips =
> >userTrips.process(new 
> > Featurization()).assignTimestampsAndWatermarks(new 
> > BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> >@Override
> >public long extractTimestamp(FeaturizedTrip trip) {
> >return trip.endTime.getTime();
> >}
> >});
> > AllWindowedStream windowedUserTrips =
> >featurizedUserTrips.timeWindowAll(Time.days(7),
> >Time.days(1));
> > ```
> > Windows are never triggered.
> > 
> > Is it a bug or expected behavior? If the latter, where is it documented?
> > 
> 
> 


PatternFlatSelectAdapter - Serialization issue after 1.8 upgrade

2019-04-18 Thread Oytun Tez
Hi all,

We are just migration from 1.6 to 1.8. I encountered a serialization error
which we didn't have before if memory serves: The implementation of the
*PatternFlatSelectAdapter* is not serializable. The object probably
contains or references non serializable fields.

The method below simply intakes a PatternStream from CEP.pattern() and
makes use of the sideoutput for timed-out events. Can you see anything
weird here (WorkerEvent is the input, but collectors collect Project
object)?

protected DataStream getPending(PatternStream
patternStream) {
OutputTag pendingProjectsTag = new *OutputTag*
("invitation-pending-projects"){};

return patternStream.*flatSelect*(
pendingProjectsTag,
new *PatternFlatTimeoutFunction*() {
@Override
public void *timeout*(Map> map, long l, Collector collector) {
}
},
new *PatternFlatSelectFunction*()
{
@Override
public void *flatSelect*(Map> pattern, Collector collector) {
}
}
).name("Select pending projects for invitation").*getSideOutput*
(pendingProjectsTag);
}

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


Re: Do flink have plans to support Deep Learning?

2019-04-18 Thread Manjusha Vuyyuru
Thanks Suneel for the information.

I see https://github.com/FlinkML/flink-tensorflow/wiki has support for
scala api.
Can you please confirm if java support is there for the same?

Regards,
Manju

On Thu, Apr 18, 2019 at 7:36 PM Suneel Marthi  wrote:

> that's a very open-ended question.
>
> There's been enough work done on using Flink for Deep Learning model
> inference - with TensorFlow (look at Eron Wright's Flink-Tensorflow
> project), with Amazon Sagemaker (i have code for that) or work from
> LightBend on Flink Model serving.
>
> So yes, there's enuf of that running in production and don't expect any of
> that yet to be part of flink codebase.
>
> On Thu, Apr 18, 2019 at 10:01 AM Manjusha Vuyyuru 
> wrote:
>
>> Hello,
>>
>> Do flink have any plans to support Deep Learning, in near future?
>>
>> Thanks,
>> Manju
>>
>>


Re: Do flink have plans to support Deep Learning?

2019-04-18 Thread Suneel Marthi
that's a very open-ended question.

There's been enough work done on using Flink for Deep Learning model
inference - with TensorFlow (look at Eron Wright's Flink-Tensorflow
project), with Amazon Sagemaker (i have code for that) or work from
LightBend on Flink Model serving.

So yes, there's enuf of that running in production and don't expect any of
that yet to be part of flink codebase.

On Thu, Apr 18, 2019 at 10:01 AM Manjusha Vuyyuru 
wrote:

> Hello,
>
> Do flink have any plans to support Deep Learning, in near future?
>
> Thanks,
> Manju
>
>


Do flink have plans to support Deep Learning?

2019-04-18 Thread Manjusha Vuyyuru
Hello,

Do flink have any plans to support Deep Learning, in near future?

Thanks,
Manju


Submit user application jar in a programming way

2019-04-18 Thread Hai
Hi,


Recently I met a issue which relevant to the class loader of user application 
and the flink ’s own class loader.


I want to solve this issue[1]byfind out the right class loader of the user jar. 
If anyone colud show me the Flink production’s start up class loader sequence. 
I would appreciate that and this issue may be simple to solve.


If not, I want to reproduce this issue by writing a local application that 
emulate the production’s submitting process. Would any one give a way or the 
best pricatise to use a programming way to submit my application fat jar ?


[1]https://issues.apache.org/jira/browse/FLINK-12163


Many thanks.

Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-18 Thread Till Rohrmann
Thanks for starting this discussion Jeff. I can see the need for additional
hooks for third party integrations.

The thing I'm wondering is whether we really need/want to expose a
JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
usually used by the user who writes the code and this person (I assume)
would not be really interested in these callbacks. If he would, then one
should rather think about a better programmatic job control where the
`ExecutionEnvironment#execute` call returns a `JobClient` instance.
Moreover, we would effectively make this part of the public API and every
implementation would need to offer it.

In your case, it could be sufficient to offer some hooks for the
ClusterClient or being able to provide a custom ClusterClient. The
ClusterClient is the component responsible for the job submission and
retrieval of the job result and, hence, would be able to signal when a job
has been submitted or completed.

Cheers,
Till

On Thu, Apr 18, 2019 at 8:57 AM vino yang  wrote:

> Hi Jeff,
>
> I personally like this proposal. From the perspective of programmability,
> the JobListener can make the third program more appreciable.
>
> The scene where I need the listener is the Flink cube engine for Apache
> Kylin. In the case, the Flink job program is embedded into the Kylin's
> executable context.
>
> If we could have this listener, it would be easier to integrate with Kylin.
>
> Best,
> Vino
>
> Jeff Zhang  于2019年4月18日周四 下午1:30写道:
>
>>
>> Hi All,
>>
>> I created FLINK-12214  for
>> adding JobListener (hook) in flink job lifecycle. Since this is a new
>> public api for flink, so I'd like to discuss it more widely in community to
>> get more feedback.
>>
>> The background and motivation is that I am integrating flink into apache
>> zeppelin (which is a notebook in case you
>> don't know). And I'd like to capture some job context (like jobId) in the
>> lifecycle of flink job (submission, executed, cancelled) so that I can
>> manipulate job in more fined grained control (e.g. I can capture the jobId
>> when job is submitted, and then associate it with one paragraph, and when
>> user click the cancel button, I can call the flink cancel api to cancel
>> this job)
>>
>> I believe other projects which integrate flink would need similar
>> mechanism. I plan to add api addJobListener in
>> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
>> customized hook in flink job lifecycle.
>>
>> Here's draft interface JobListener.
>>
>> public interface JobListener {
>>
>> void onJobSubmitted(JobID jobId);
>>
>> void onJobExecuted(JobExecutionResult jobResult);
>>
>> void onJobCanceled(JobID jobId, String savepointPath);
>> }
>>
>> Let me know your comment and concern, thanks.
>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>


Re: Service discovery on YARN - find out which port was dynamically assigned to the JobManager Web Interface

2019-04-18 Thread Till Rohrmann
Hi Olivier,

since version 1.8 you can set the rest bind port via `rest.bind-port` to a
single port or a range. This will now be respected by Yarn deployments.
With the next bug fix release of 1.7 you can do the same with `rest.port`
but this option only accepts a single port (might lead to port conflicts if
you have multiple Flink applications running on the same Yarn cluster with
the same configuration).

As Rong pointed out, the port might change if it is not fixed to a single
port value in case of container restarts. Thus, you would need to update
Consul now and then.

When Flink registers at the Yarn resource manager it sets its rest endpoint
as the forwarding destination when using Yarn's proxy. The problem is only
that some versions of Yarn only forward GET requests.

Additionally, Flink also registers the rest port at the Yarn resource
manager. You can retrieve it by querying the ApplicationReport from the
Yarn resource manager and then asking for the `getRpcPort`.

Cheers,
Till

On Thu, Apr 18, 2019 at 3:22 AM Rong Rong  wrote:

> As far as I know, the port will be set to random binding.
>
> Yarn actually have the ability to translate the proxy link to the right
> node/port.
> If your goal is trying to avoid going through the YARN rest proxy, this
> could be a problem: There's chances that the host/port will get changed by
> YARN without a backward notification to Consul (or is there a way I am not
> sure). This could happen either through YARN's native failure recovery
> mechanism (2nd attempt) or through HA.
>
> CCed till who might be able to answer more comprehensively regarding this.
>
> Thanks,
> Rong
>
> On Wed, Apr 17, 2019 at 7:14 AM Olivier Solliec 
> wrote:
>
>> Hello,
>>
>>
>> I want to be able to register a flink cluster into a service discovery
>> system (Consul in our case).
>>
>> This flink cluster is scheduled on YARN.
>>
>>
>> Is there a way to know which port was assigned to the rest interface ?
>>
>>
>> Via the rest API /jobmanager/config, I see a key "jobmanager.rpc.address"
>> which is the correct yarn node, but both "rest.port" and "web.port" have
>> "0" value.
>>
>>
>> The idea is to launch a cluster, retrieve its app id, use the yarn web ui
>> proxy to get the right node address/port, and register this into Consul.
>>
>>
>> Thank you,
>>
>>
>> Olivier
>>
>


Re: Fast restart of a job with a large state

2019-04-18 Thread Stefan Richter
Hi,

If rescaling is the problem, let me clarify that you can currently rescale from 
savepoints and all types of checkpoints (including incremental). If that was 
the only problem, then there is nothing to worry about - the documentation is 
only a bit conservative about this because we will not commit to an APU that 
all future types checkpoints will be resealable. But currently they are all, 
and this is also very unlikely to change anytime soon.

Paul, just to comment on your suggestion as well, local recovery would only 
help with failover. 1) It does not help for restarts by the user and 2) also 
does not work for rescaling (2) is a consequence of 1) because failover never 
rescales, only restarts).

Best,
Stefan

> On 18. Apr 2019, at 12:07, Paul Lam  wrote:
> 
> The URL in my previous mail is wrong, and it should be: 
> 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
>  
> 
> 
> Best,
> Paul Lam
> 
>> 在 2019年4月18日,18:04,Paul Lam > > 写道:
>> 
>> Hi,
>> 
>> Have you tried task local recovery [1]?
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>  
>> 
>> 
>> Best,
>> Paul Lam
>> 
>>> 在 2019年4月17日,17:46,Sergey Zhemzhitsky >> > 写道:
>>> 
>>> Hi Flinkers,
>>> 
>>> Operating different flink jobs I've discovered that job restarts with
>>> a pretty large state (in my case this is up to 100GB+) take quite a
>>> lot of time. For example, to restart a job (e.g. to update it) the
>>> savepoint is created, and in case of savepoints all the state seems to
>>> be pushed into the distributed store (hdfs in my case) when stopping a
>>> job and pulling this state back when starting the new version of the
>>> job.
>>> 
>>> What I've found by the moment trying to speed up job restarts is:
>>> - using external retained checkpoints [1]; the drawback is that the
>>> job cannot be rescaled during restart
>>> - using external state and storage with the stateless jobs; the
>>> drawback is the necessity of additional network hops to this storage.
>>> 
>>> So I'm wondering whether there are any best practices community knows
>>> and uses to cope with the cases like this?
>>> 
>>> [1] 
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>>  
>>> 
>> 
> 



Re: Fast restart of a job with a large state

2019-04-18 Thread Paul Lam
Hi,

Have you tried task local recovery [1]?

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints

Best,
Paul Lam

> 在 2019年4月17日,17:46,Sergey Zhemzhitsky  写道:
> 
> Hi Flinkers,
> 
> Operating different flink jobs I've discovered that job restarts with
> a pretty large state (in my case this is up to 100GB+) take quite a
> lot of time. For example, to restart a job (e.g. to update it) the
> savepoint is created, and in case of savepoints all the state seems to
> be pushed into the distributed store (hdfs in my case) when stopping a
> job and pulling this state back when starting the new version of the
> job.
> 
> What I've found by the moment trying to speed up job restarts is:
> - using external retained checkpoints [1]; the drawback is that the
> job cannot be rescaled during restart
> - using external state and storage with the stateless jobs; the
> drawback is the necessity of additional network hops to this storage.
> 
> So I'm wondering whether there are any best practices community knows
> and uses to cope with the cases like this?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints



Re: Fast restart of a job with a large state

2019-04-18 Thread Paul Lam
The URL in my previous mail is wrong, and it should be: 

https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/large_state_tuning.html#task-local-recovery
 


Best,
Paul Lam

> 在 2019年4月18日,18:04,Paul Lam  写道:
> 
> Hi,
> 
> Have you tried task local recovery [1]?
> 
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>  
> 
> 
> Best,
> Paul Lam
> 
>> 在 2019年4月17日,17:46,Sergey Zhemzhitsky > > 写道:
>> 
>> Hi Flinkers,
>> 
>> Operating different flink jobs I've discovered that job restarts with
>> a pretty large state (in my case this is up to 100GB+) take quite a
>> lot of time. For example, to restart a job (e.g. to update it) the
>> savepoint is created, and in case of savepoints all the state seems to
>> be pushed into the distributed store (hdfs in my case) when stopping a
>> job and pulling this state back when starting the new version of the
>> job.
>> 
>> What I've found by the moment trying to speed up job restarts is:
>> - using external retained checkpoints [1]; the drawback is that the
>> job cannot be rescaled during restart
>> - using external state and storage with the stateless jobs; the
>> drawback is the necessity of additional network hops to this storage.
>> 
>> So I'm wondering whether there are any best practices community knows
>> and uses to cope with the cases like this?
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints
>>  
>> 
> 



Re: Flink Metrics

2019-04-18 Thread Zhu Zhu
Hi Brian,

You can implement a new org.apache.flink.metrics.reporter.MetricReporter as
you like and register it to flink in flink conf.

e.g.

metrics.reporters:my_reporter
metrics.reporter.my_other_reporter.class: xxx
metrics.reporter.my_other_reporter.config1: yyy
metrics.reporter.my_other_reporter.config2: zzz


Thanks,
Zhu



Brian Ramprasad  于2019年4月18日周四 下午2:37写道:

> Hi,
>
> I am trying to profile my Flink job.  For example I want to output the
> results of the TaskIOMetricGroup to a log file. Does anyone know if there
> is a way to access this object at runtime and execute the methods to get
> the data from within my user code that I submit to the Flink to start a job?
>
>
>
> Thanks
> Brian R


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-18 Thread vino yang
Hi Jeff,

I personally like this proposal. From the perspective of programmability,
the JobListener can make the third program more appreciable.

The scene where I need the listener is the Flink cube engine for Apache
Kylin. In the case, the Flink job program is embedded into the Kylin's
executable context.

If we could have this listener, it would be easier to integrate with Kylin.

Best,
Vino

Jeff Zhang  于2019年4月18日周四 下午1:30写道:

>
> Hi All,
>
> I created FLINK-12214  for
> adding JobListener (hook) in flink job lifecycle. Since this is a new
> public api for flink, so I'd like to discuss it more widely in community to
> get more feedback.
>
> The background and motivation is that I am integrating flink into apache
> zeppelin (which is a notebook in case you
> don't know). And I'd like to capture some job context (like jobId) in the
> lifecycle of flink job (submission, executed, cancelled) so that I can
> manipulate job in more fined grained control (e.g. I can capture the jobId
> when job is submitted, and then associate it with one paragraph, and when
> user click the cancel button, I can call the flink cancel api to cancel
> this job)
>
> I believe other projects which integrate flink would need similar
> mechanism. I plan to add api addJobListener in
> ExecutionEnvironment/StreamExecutionEnvironment so that user can add
> customized hook in flink job lifecycle.
>
> Here's draft interface JobListener.
>
> public interface JobListener {
>
> void onJobSubmitted(JobID jobId);
>
> void onJobExecuted(JobExecutionResult jobResult);
>
> void onJobCanceled(JobID jobId, String savepointPath);
> }
>
> Let me know your comment and concern, thanks.
>
>
> --
> Best Regards
>
> Jeff Zhang
>


Flink Metrics

2019-04-18 Thread Brian Ramprasad
Hi,

I am trying to profile my Flink job.  For example I want to output the results 
of the TaskIOMetricGroup to a log file. Does anyone know if there is a way to 
access this object at runtime and execute the methods to get the data from 
within my user code that I submit to the Flink to start a job?



Thanks
Brian R