Re: Wrong result of MATCH_RECOGNIZE clause

2019-09-06 Thread Dian Fu
I have created ticket https://issues.apache.org/jira/browse/FLINK-13999 
 to track it.

> 在 2019年9月6日,下午8:58,Dian Fu  写道:
> 
> Hi Dongwon,
> 
> I guess you are right and the example is wrong. The new matching sequence 
> should start from line "18". 
> 
> Regards,
> Dian
> 
>> 在 2019年9月5日,下午8:28,Dongwon Kim > > 写道:
>> 
>> Oops, I think I explained something wrong in the previous email.
>> B means not A.
>> Therefore, after the completed match, there must be no new partial match 
>> starting from there.
>> There's nothing wrong with the implementation, but the example in [2] is 
>> wrong.
>> 
>> Am I right?
>> 
>> Best,
>> Dongwon
>> 
>> On Thu, Sep 5, 2019 at 9:15 PM Dongwon Kim > > wrote:
>> Hi, 
>> I'm using Flink 1.9 and testing MATCH_RECOGNIZE by following [1].
>> While testing the query in [2] on myself, I've got the different result from 
>> [2]
>> The query result from [2] is as follows:
>>  symbol   start_tstamp   end_tstamp  avgPrice
>> =  ==  ==  
>> ACME   01-APR-11 10:00:00  01-APR-11 10:00:03 14.5
>> ACME   01-APR-11 10:00:04  01-APR-11 10:00:09 13.5
>> The other query result from the attached maven project (which only contains 
>> a sample program that executes the query in [2]) is as follows:
>> ACME,1970-01-01 00:00:01.0,1970-01-01 00:00:04.0,14.5
>> There's just one entry, not two.
>> (As you might notice, the time of the first record in the attached maven 
>> project is 1970-01-01 00:00:01 for testing. The other numbers are the same.)
>> 
>> I dug into the internal implementation of CepOperator and got the followings:
>> INPUT : ACME,1000,12.0,1
>> PARTIAL MATCH : [A*1]
>> 
>> INPUT : ACME,2000,17.0,2
>> PARTIAL MATCH : [A*2]
>> 
>> INPUT : ACME,3000,13.0,1
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*1]
>> 
>> INPUT : ACME,4000,16.0,3
>> PARTIAL MATCH : [A*4]
>> PARTIAL MATCH : [A*2]
>> 
>> INPUT : ACME,5000,25.0,2
>> COMPLETED MATCH : [A*4, B*1]
>> 
>> INPUT : ACME,6000,2.0,1
>> PARTIAL MATCH : [A*1]
>> 
>> INPUT : ACME,7000,4.0,1
>> PARTIAL MATCH : [A*2]
>> PARTIAL MATCH : [A*1]
>> 
>> INPUT : ACME,8000,10.0,2
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*2]
>> PARTIAL MATCH : [A*1]
>> 
>> INPUT : ACME,9000,15.0,2
>> PARTIAL MATCH : [A*4]
>> PARTIAL MATCH : [A*3]
>> PARTIAL MATCH : [A*2]
>> 
>> INPUT : ACME,1,25.0,2
>> PARTIAL MATCH : [A*5]
>> PARTIAL MATCH : [A*4]
>> 
>> INPUT : ACME,11000,30.0,1
>> PARTIAL MATCH : [A*6]
>> 
>> My observation is that, when "ACME,5000,25.0,2" comes in (line 15), we get a 
>> completed match (line 16) but no partial match (which is [A*1] in my 
>> notation) starting from it.
>> According to the definition of "AFTER MATCH SKIP TO FIRST B", as 
>> "ACME,5000,25,2" is B, a new match should start from "ACME,5000,25.0,2".
>> However, a new match starts from the next one (line 18, 19) in the above 
>> trace.
>> Therefore, when the last one "ACME,11000,30.0,1" comes in, the average at 
>> that point is 14.3(=2+4+10+15+25+30/6) which is less than 15 so 
>> "ACME,11000,30.0,1" belongs to A, not B as shown in the example.
>> 
>> Is it a bug? or did I miss something conceptually?
>> 
>> p.s. how do you load rows from a local csv file with rowtime configured? I 
>> don't like the way I implemented my custom table source in the attached file 
>> which I use for testing.
>> 
>> Best,
>> Dongwon
>> 
>> [1] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html
>>  
>> 
>> [2] 
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/match_recognize.html#aggregations
>>  
>> 



Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zili Chen
Attach the missing link.

[1]
https://docs.google.com/document/d/1E-8UjOLz4QPUTxetGWbU23OlsIH9VIdodpTsxwoQTs0/edit#heading=h.s8r4pkyalskt
[2]
https://lists.apache.org/x/thread.html/ce99cba4a10b9dc40eb729d39910f315ae41d80ec74f09a356c73938@


Zili Chen  于2019年9月7日周六 上午12:52写道:

> Besides, if you submit the job by Jar Run REST API, it is also
> OptimizerPlanEnvironment to be used. So again, _no_ post
> processing support at the moment.
>
>
> Zili Chen  于2019年9月7日周六 上午12:51写道:
>
>> Hi spoganshev,
>>
>> If you deploy in per-job mode, OptimizerPlanEnvironment would be used,
>> and thus
>> as you pointed out, there is _no_ way to post processing
>> JobExecutionResult.
>> We the community regard this situation as a shortcoming and work on an
>> enhancement
>> progress to enable you get a JobClient as return value of #execute in all
>> deployment
>> and execution mode. Take a look at [1] and [2] for a preview and feel
>> free to describe
>> your requirement so that the following version can satisfy your demand.
>>
>> Besides, if you deploy in session mode, which might be more natural in
>> batch cases,
>> at the moment ContextEnvironment is used, which execute normally and
>> return the
>> JobExecutionResult that you can make use of.
>>
>> Simply sum up, you can try out session mode deployment to see if it
>> satisfy your
>> requirement on post processing.
>>
>> Best,
>> tison.
>>
>>
>> Zhu Zhu  于2019年9月7日周六 上午12:07写道:
>>
>>> Hi spoganshev,
>>>
>>> The *OptimizerPlanEnvironment* is for creating optimized plan only, as
>>> described in the javadoc
>>> "An {@link ExecutionEnvironment} that never executes a job but only
>>> creates the optimized plan."
>>> It execute() is invoked with some internal handling so that it only
>>> generates optimized plan and do not actually submit a job.
>>> Some other execution environment will execute the job instead.
>>>
>>> Not sure how you created your ExecutionEnvironment?
>>> Usually for DataSet jobs, it should be created in the way as below.
>>> "final ExecutionEnvironment env =
>>> ExecutionEnvironment.getExecutionEnvironment();"
>>>
>>> Thanks,
>>> Zhu Zhu
>>>
>>> spoganshev  于2019年9月6日周五 下午11:39写道:
>>>
 Due to OptimizerPlanEnvironment.execute() throwing exception on the
 last line
 there is not way to post-process batch job execution result, like:

 JobExecutionResult r = env.execute(); // execute batch job
 analyzeResult(r); // this will never get executed due to plan
 optimization


 https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54

 Is there any way to allow such post-processing in batch jobs?




 --
 Sent from:
 http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/

>>>


Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zili Chen
Besides, if you submit the job by Jar Run REST API, it is also
OptimizerPlanEnvironment to be used. So again, _no_ post
processing support at the moment.


Zili Chen  于2019年9月7日周六 上午12:51写道:

> Hi spoganshev,
>
> If you deploy in per-job mode, OptimizerPlanEnvironment would be used, and
> thus
> as you pointed out, there is _no_ way to post processing
> JobExecutionResult.
> We the community regard this situation as a shortcoming and work on an
> enhancement
> progress to enable you get a JobClient as return value of #execute in all
> deployment
> and execution mode. Take a look at [1] and [2] for a preview and feel free
> to describe
> your requirement so that the following version can satisfy your demand.
>
> Besides, if you deploy in session mode, which might be more natural in
> batch cases,
> at the moment ContextEnvironment is used, which execute normally and
> return the
> JobExecutionResult that you can make use of.
>
> Simply sum up, you can try out session mode deployment to see if it
> satisfy your
> requirement on post processing.
>
> Best,
> tison.
>
>
> Zhu Zhu  于2019年9月7日周六 上午12:07写道:
>
>> Hi spoganshev,
>>
>> The *OptimizerPlanEnvironment* is for creating optimized plan only, as
>> described in the javadoc
>> "An {@link ExecutionEnvironment} that never executes a job but only
>> creates the optimized plan."
>> It execute() is invoked with some internal handling so that it only
>> generates optimized plan and do not actually submit a job.
>> Some other execution environment will execute the job instead.
>>
>> Not sure how you created your ExecutionEnvironment?
>> Usually for DataSet jobs, it should be created in the way as below.
>> "final ExecutionEnvironment env =
>> ExecutionEnvironment.getExecutionEnvironment();"
>>
>> Thanks,
>> Zhu Zhu
>>
>> spoganshev  于2019年9月6日周五 下午11:39写道:
>>
>>> Due to OptimizerPlanEnvironment.execute() throwing exception on the last
>>> line
>>> there is not way to post-process batch job execution result, like:
>>>
>>> JobExecutionResult r = env.execute(); // execute batch job
>>> analyzeResult(r); // this will never get executed due to plan
>>> optimization
>>>
>>>
>>> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54
>>>
>>> Is there any way to allow such post-processing in batch jobs?
>>>
>>>
>>>
>>>
>>> --
>>> Sent from:
>>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>>
>>


Re: Post-processing batch JobExecutionResult

2019-09-06 Thread Zhu Zhu
Hi spoganshev,

The *OptimizerPlanEnvironment* is for creating optimized plan only, as
described in the javadoc
"An {@link ExecutionEnvironment} that never executes a job but only creates
the optimized plan."
It execute() is invoked with some internal handling so that it only
generates optimized plan and do not actually submit a job.
Some other execution environment will execute the job instead.

Not sure how you created your ExecutionEnvironment?
Usually for DataSet jobs, it should be created in the way as below.
"final ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();"

Thanks,
Zhu Zhu

spoganshev  于2019年9月6日周五 下午11:39写道:

> Due to OptimizerPlanEnvironment.execute() throwing exception on the last
> line
> there is not way to post-process batch job execution result, like:
>
> JobExecutionResult r = env.execute(); // execute batch job
> analyzeResult(r); // this will never get executed due to plan optimization
>
>
> https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54
>
> Is there any way to allow such post-processing in batch jobs?
>
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Post-processing batch JobExecutionResult

2019-09-06 Thread spoganshev
Due to OptimizerPlanEnvironment.execute() throwing exception on the last line
there is not way to post-process batch job execution result, like:

JobExecutionResult r = env.execute(); // execute batch job
analyzeResult(r); // this will never get executed due to plan optimization

https://github.com/apache/flink/blob/master/flink-clients/src/main/java/org/apache/flink/client/program/OptimizerPlanEnvironment.java#L54

Is there any way to allow such post-processing in batch jobs?




--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Making broadcast state queryable?

2019-09-06 Thread Oytun Tez
Hi Yu,

Excuse my late reply... We simply want Flink to be our centralized stream
analysis platform, where we 1) consume data, 2) generate analysis, 3)
present analysis. I honestly don't want "stream analysis" to spill out to
other components in our ecosystem (e.g., sinking insights into a DB-like
place).

So the case for QS for us is centralization of input, output, presentation.
State Processor API for instance also counts as a presentation tool for us
(on top of migration tool).

This kind of all-in-one (in, out, ui) packaging helps with small teams.

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Wed, Aug 14, 2019 at 3:13 AM Yu Li  wrote:

> Good to know your thoughts and the coming talk in Flink Forward Berlin
> Oytun, interesting topic and great job! And it's great to hear the voice
> from application perspective.
>
> Could you share, if possible, the reason why you need to open the state to
> outside instead of writing the result to sink and directly query there? In
> another thread there's a case that sink writes to different kafka topics so
> state is the only place to get a global view, is this the same case you're
> facing? Or some different requirements? I believe more attention will be
> drawn to QS if more and more user requirements emerge (smile).
>
> Thanks.
>
> Best Regards,
> Yu
>
>
> On Wed, 14 Aug 2019 at 00:50, Oytun Tez  wrote:
>
>> Thank you for the honest response, Yu!
>>
>> There is so much that comes to mind when we look at Flink as a
>> "application framework" (my talk
>> 
>> in Flink Forward in Berlin will be about this). QS is one of them
>> (need-wise, not QS itself necessarily). I opened the design doc Vino Yang
>> created, I'll try to contribute to that.
>>
>> Meanwhile, for the sake of opening our state to outside, we will put a
>> stupid simple operator in between to keep a *duplicate* of the state...
>>
>> Thanks again!
>>
>>
>>
>>
>>
>> ---
>> Oytun Tez
>>
>> *M O T A W O R D*
>> The World's Fastest Human Translation Platform.
>> oy...@motaword.com — www.motaword.com
>>
>>
>> On Tue, Aug 13, 2019 at 6:29 PM Yu Li  wrote:
>>
>>> Hi Oytun,
>>>
>>> Sorry but TBH such support will probably not be added in the foreseeable
>>> future due to lack of committer bandwidth (not only support queryable
>>> broadcast state but all about QueryableState module) as pointed out in
>>> other threads [1] [2].
>>>
>>> However, I think you could open a JIRA for this so when things changed
>>> it could be taken care of. Thanks.
>>>
>>> [1] https://s.apache.org/MaOl
>>> [2] https://s.apache.org/r8k8a
>>>
>>> Best Regards,
>>> Yu
>>>
>>>
>>> On Tue, 13 Aug 2019 at 20:34, Oytun Tez  wrote:
>>>
 Hi there,

 Can we set a broadcast state as queryable? I've looked around, not much
 to find about it. I am receiving UnknownKvStateLocation when I try to query
 with the descriptor/state name I give to the broadcast state.

 If it doesn't work, what could be the alternative? My mind goes around
 ctx.getBroadcastState and making it queryable in the operator level (I'd
 rather not).

 ---
 Oytun Tez

 *M O T A W O R D*
 The World's Fastest Human Translation Platform.
 oy...@motaword.com — www.motaword.com

>>>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Kenny Gorman
Grats Kostas! ^5^5

 

-kg

 

From: Fabian Hueske 
Date: Friday, September 6, 2019 at 7:56 AM
To: dev , user 
Subject: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

 

Hi everyone,

 

I'm very happy to announce that Kostas Kloudas is joining the Flink PMC.

Kostas is contributing to Flink for many years and puts lots of effort in 
helping our users and growing the Flink community.

 

Please join me in congratulating Kostas!

 

Cheers,

Fabian



Re: understanding task manager logs

2019-09-06 Thread Vishwas Siravara
Thanks, I'll check it out.

On Thu, Sep 5, 2019 at 5:22 AM Fabian Hueske  wrote:

> Hi Vishwas,
>
> This is a log statement from Kafka [1].
> Not sure how when AppInfoParser is created (the log message is written by
> the constructor).
>
> For Kafka versions > 1.0, I'd recommend the universal connector [2].
>
> Not sure how well it works if producers and consumers have different
> versions.
> Maybe Gordon (in CC) has some experience with that.
>
> Best, Fabian
>
> [1]
> https://github.com/axbaretto/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/utils/AppInfoParser.java#L117
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/connectors/kafka.html#kafka-100-connector
>
> Am Di., 3. Sept. 2019 um 04:04 Uhr schrieb Vishwas Siravara <
> vsirav...@gmail.com>:
>
>> Hi guys,
>> I am using flink 1.7.2 and my application consumes from a kafka topic and
>> publish to another kafka topic which is in its own kafka environment
>> running a different kafka version,. I am using FlinkKafkaConsumer010 from
>> this dependency
>> *"org.apache.flink" %% "flink-connector-kafka-0.10" % flinkVersion. *
>>
>> In the task manager log I see these lines:
>>
>> 2019-09-02 02:57:59,840 INFO  
>> org.apache.kafka.common.security.authenticator.AbstractLogin  - Successfully 
>> logged in.
>> 2019-09-02 02:57:59,841 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh 
>> thread started.
>> 2019-09-02 02:57:59,842 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT valid 
>> starting at: Mon Sep 02 02:57:59 GMT 2019
>> 2019-09-02 02:57:59,843 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT expires: Mon 
>> Sep 02 12:57:59 GMT 2019
>> 2019-09-02 02:57:59,843 INFO  
>> org.apache.kafka.common.security.kerberos.KerberosLogin   - 
>> [Principal=kafka/sl73rspapd035.visa@corpdev.visa.com]: TGT refresh 
>> sleeping until: Mon Sep 02 11:14:13 GMT 2019
>> 2019-09-02 02:57:59,919 WARN  
>> org.apache.kafka.clients.consumer.ConsumerConfig  - The 
>> configuration 'zookeeper.connect' was supplied but isn't a known 
>> config.*2019-09-02 02:57:59,919 INFO  
>> org.apache.kafka.common.utils.AppInfoParser   - Kafka 
>> version : 0.10.2.0
>> *2019-09-02 02:57:59,919 INFO  org.apache.kafka.common.utils.AppInfoParser   
>> - Kafka commitId : 576d93a8dc0cf421
>>
>> Here if you see the Kafka version is 0.10.2.0. Is this the version the 
>> broker is running or is this coming from flink ? I have forced the 
>> kafka-client version
>>
>> to be 2.2.0
>>
>> "org.apache.kafka" % "kafka-clients" % "2.2.0" force()
>>
>> I also don't see 0.10.2.0 in the dependency tree of my build.
>>
>> Also will flink-connector-kafka-0.10 work for kafka versions > 1.0 ? What 
>> should I do if the consumer broker and producer broker are on different 
>> versions of kafka ?
>>
>>
>> Thanks,
>>
>> Vishwas
>>
>>
>> Thanks,
>>
>> Vishwas
>>
>>
>>
>>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Jeff Zhang
Congrats Klou!

Zili Chen  于2019年9月6日周五 下午9:51写道:

> Congrats Klou!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年9月6日周五 下午9:23写道:
>
>> Congrats Klou!
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>>
>>> Congratulations Kostas!
>>>
>>> Regards,
>>> Dian
>>>
>>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>>> >
>>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>>> PMC.
>>> >> Kostas is contributing to Flink for many years and puts lots of
>>> effort in helping our users and growing the Flink community.
>>> >> Please join me in congratulating Kostas!
>>> >
>>> > congratulation Kostas!
>>> >
>>> > regards.
>>>
>>>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Oytun Tez
It was only natural! I already thought Kostas was PMC member! 

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Fri, Sep 6, 2019 at 9:51 AM Zili Chen  wrote:

> Congrats Klou!
>
> Best,
> tison.
>
>
> Till Rohrmann  于2019年9月6日周五 下午9:23写道:
>
>> Congrats Klou!
>>
>> Cheers,
>> Till
>>
>> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>>
>>> Congratulations Kostas!
>>>
>>> Regards,
>>> Dian
>>>
>>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>>> >
>>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>>> PMC.
>>> >> Kostas is contributing to Flink for many years and puts lots of
>>> effort in helping our users and growing the Flink community.
>>> >> Please join me in congratulating Kostas!
>>> >
>>> > congratulation Kostas!
>>> >
>>> > regards.
>>>
>>>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Zili Chen
Congrats Klou!

Best,
tison.


Till Rohrmann  于2019年9月6日周五 下午9:23写道:

> Congrats Klou!
>
> Cheers,
> Till
>
> On Fri, Sep 6, 2019 at 3:00 PM Dian Fu  wrote:
>
>> Congratulations Kostas!
>>
>> Regards,
>> Dian
>>
>> > 在 2019年9月6日,下午8:58,Wesley Peng  写道:
>> >
>> > On 2019/9/6 8:55 下午, Fabian Hueske wrote:
>> >> I'm very happy to announce that Kostas Kloudas is joining the Flink
>> PMC.
>> >> Kostas is contributing to Flink for many years and puts lots of effort
>> in helping our users and growing the Flink community.
>> >> Please join me in congratulating Kostas!
>> >
>> > congratulation Kostas!
>> >
>> > regards.
>>
>>


Re: Implementing CheckpointableInputFormat

2019-09-06 Thread Fabian Hueske
Hi,

CheckpointableInputFormat is only relevant if you plan to use the
InputFormat in a MonitoringFileSource, i.e., in a streaming application.
If you plan to use it in a DataSet (batch) program, InputFormat is fine.

Btw. the latest release Flink 1.9.0 has major improvements for the recovery
of batch jobs.

Best, Fabian

Am Do., 5. Sept. 2019 um 19:01 Uhr schrieb Lu Niu :

> Hi, Team
>
> I am implementing a custom InputFormat. Shall I
> implement CheckpointableInputFormat interface? If I don't, does that mean
> the whole job has to restart given only one task fails? I ask because I
> found all InputFormat implements CheckpointableInputFormat, which makes me
> confused. Thank you!
>
> Best
> Lu
>


Re: suggestion of FLINK-10868

2019-09-06 Thread Till Rohrmann
Hi Anyang,

thanks for your suggestions.

1) I guess one needs to make this interval configurable. A session cluster
could theoretically execute batch as well as streaming tasks and, hence, I
doubt that there is an optimal value. Maybe the default could be a bit
longer than 1 min, though.

2) Which component to do you want to let terminate immediately?

I think we can consider your input while reviewing the PR. If it would be a
bigger change, then it would be best to create a follow up issue once
FLINK-10868 has been merged.

Cheers,
Till

On Fri, Sep 6, 2019 at 11:42 AM Anyang Hu  wrote:

> Thank you for the reply and look forward to the advice of Till.
>
> Anyang
>
> Peter Huang  于2019年9月5日周四 下午11:53写道:
>
>> Hi Anyang,
>>
>> Thanks for raising it up. I think it is reasonable as what you requested
>> is needed for batch. Let's wait for Till to give some more input.
>>
>>
>>
>> Best Regards
>> Peter Huang
>>
>> On Thu, Sep 5, 2019 at 7:02 AM Anyang Hu  wrote:
>>
>>> Hi Peter:
>>>
>>> As commented in the issue
>>> ,We have introduced
>>> the FLINK-10868  patch
>>> (mainly batch tasks) online, what do you think of the following two
>>> suggestions:
>>>
>>> 1) Parameter control time interval. At present, the default time
>>> interval of 1 min is used, which is too short for batch tasks;
>>>
>>> 2)Parameter Control When the failed Container number reaches
>>> MAXIMUM_WORKERS_FAILURE_RATE and JM disconnects whether to perform
>>> OnFatalError so that the batch tasks can exit as soon as possible.
>>>
>>> Best regards,
>>> Anyang
>>>
>>


Re: TABLE API + DataStream outsourcing schema or Pojo?

2019-09-06 Thread Fabian Hueske
Hi Steve,

The memory catalog does not persist metadata and needs to be repopulated
everytime.
However, you can implement a catalog that persists the metadata to a file
or a database.

There is an effort to implement a Catalog interface of Hive's metastore.
A preview is available in the latest release (1.9.0)

Best, Fabian

Am Do., 5. Sept. 2019 um 14:52 Uhr schrieb Steve Robert <
contact.steverob...@gmail.com>:

> Hi Fabian ,
>
> thank you for your answer it is indeed the solution that I am currently
> testing
> i use TypeInformation convert =
> JsonRowSchemaConverter.convert(JSON_SCHEMA); provided by the
> flink-json  and provide the TypeFormation to the operatorStream
> its look like to work :) with this solution my schema can be outside my
> package
>
> one additional question about .  GenericMemoryCatalog
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/catalogs.html
>  .
> catalog can be use accross multiple job running on the same cluster ? or
> the catalog are  scoped on the job session only ?
>
> DataStream dataStreamJson = dataStream.map(new MapFunction JsonNode>() {
> @Override
> public JsonNode map(String s) throws Exception {
> ObjectMapper objectMapper = new ObjectMapper();
> JsonNode node = objectMapper.readTree(s);
> return node;
> }
> });
> DataStream dataStreamRow = dataStreamJson.map(new MapFunction Row>() {
> @Override
> public Row map(JsonNode jsonNode) throws Exception {
> int pos = 0;
> Row row = new Row(jsonNode.size());
> Iterator iterator = jsonNode.fieldNames();
> while (iterator.hasNext()) {
> String key = iterator.next();
> row.setField(pos, jsonNode.get(key).asText());
> pos++;
> }
> return row;
> }
> }).returns(convert);
>
> Table tableA = tEnv.fromDataStream(dataStreamRow);
>
>
> Le jeu. 5 sept. 2019 à 13:23, Fabian Hueske  a écrit :
>
>> Hi Steve,
>>
>> Maybe you could implement a custom TableSource that queries the data from
>> the rest API and converts the JSON directly into a Row data type.
>> This would also avoid going through the DataStream API just for ingesting
>> the data.
>>
>> Best, Fabian
>>
>> Am Mi., 4. Sept. 2019 um 15:57 Uhr schrieb Steve Robert <
>> contact.steverob...@gmail.com>:
>>
>>> Hi guys ,
>>>
>>> It's been a while since I'm studying TABLE APIs for integration into my
>>> system.
>>> when i take a look on this documentation
>>> :
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/connect.html#connectors
>>>
>>>
>>> I understand that it is possible to apply a JSON FORMAT on the connector
>>> and apply a JSON-SCHEMA without any hardcoded  java pojo
>>> .jsonSchema(
>>>   "{" +
>>>   "  type: 'object'," +
>>>   "  properties: {" +
>>>   "lon: {" +
>>>   "  type: 'number'" +
>>>   "}," +
>>>   "rideTime: {" +
>>>   "  type: 'string'," +
>>>   "  format: 'date-time'" +
>>>   "}" +
>>>   "  }" +
>>>   "}"
>>> )
>>>
>>>
>>> but my problematic is the following my data comes from REST-API , so
>>> I have to process the data and transmit it via a DataStream
>>> the problem is that between the conversation of a dataStream and a
>>> table must pass through a Java Pojo. Datastream  input
>>>  Table table=tEnv.fromDataStream(input);
>>> I tried a trick while making a conversation from my JSON to AVRO
>>> using a GenericRecord but it does not seem possible .
>>>
>>> my user case and being able to add REST-API processing  in runtime
>>> and be able to outsource and dynamically load my Pojo / Schema without
>>> harcode an Java-Pojo object
>>>
>>>
>>> Do you have an approach to suggest me ?
>>>
>>>
>>> Thank a lot
>>>
>>


Re: [ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Wesley Peng

On 2019/9/6 8:55 下午, Fabian Hueske wrote:

I'm very happy to announce that Kostas Kloudas is joining the Flink PMC.
Kostas is contributing to Flink for many years and puts lots of effort 
in helping our users and growing the Flink community.


Please join me in congratulating Kostas!


congratulation Kostas!

regards.


[ANNOUNCE] Kostas Kloudas joins the Flink PMC

2019-09-06 Thread Fabian Hueske
Hi everyone,

I'm very happy to announce that Kostas Kloudas is joining the Flink PMC.
Kostas is contributing to Flink for many years and puts lots of effort in
helping our users and growing the Flink community.

Please join me in congratulating Kostas!

Cheers,
Fabian


Re: 希望获得一份编译后的1.6版本以上flink源码

2019-09-06 Thread Zili Chen
你好,官方下载页面有编译后的二进制版本,我看你的要求是 1.6 版本*以上*,
官网提供了1.7.2 1.8.1 和 1.9.0 的二进制版本。

具体见 https://flink.apache.org/downloads.html

Best,
tison.


Wesley Peng  于2019年9月6日周五 下午5:16写道:

> Helo
>
> guaishushu1...@163.com wrote:
> > 因windows系统搭载虚拟机环境,总是不能成功编译flink源码,所以希望能得到一份编译后的1.6版本以上的flink源码
>
> there is docker image for flink 1.9. since you are using a virtual
> machine, then docker might be used.
>
> regards
>


Re: suggestion of FLINK-10868

2019-09-06 Thread Anyang Hu
Thank you for the reply and look forward to the advice of Till.

Anyang

Peter Huang  于2019年9月5日周四 下午11:53写道:

> Hi Anyang,
>
> Thanks for raising it up. I think it is reasonable as what you requested
> is needed for batch. Let's wait for Till to give some more input.
>
>
>
> Best Regards
> Peter Huang
>
> On Thu, Sep 5, 2019 at 7:02 AM Anyang Hu  wrote:
>
>> Hi Peter:
>>
>> As commented in the issue
>> ,We have introduced
>> the FLINK-10868  patch
>> (mainly batch tasks) online, what do you think of the following two
>> suggestions:
>>
>> 1) Parameter control time interval. At present, the default time
>> interval of 1 min is used, which is too short for batch tasks;
>>
>> 2)Parameter Control When the failed Container number reaches
>> MAXIMUM_WORKERS_FAILURE_RATE and JM disconnects whether to perform
>> OnFatalError so that the batch tasks can exit as soon as possible.
>>
>> Best regards,
>> Anyang
>>
>


Re: checkpoint failure suddenly even state size less than 1 mb

2019-09-06 Thread Sushant Sawant
Hi Yun,
Have captured the heap dump which includes thread stack.
There is an lock in thread in elasticsearch sink operator.
Screenshot of Jprofiler
https://github.com/sushantbprise/flink-dashboard/tree/master/thread-blocked

And I see lot many threads in waiting state.

I found this link which is kind a similar to my problem.
http://mail-archives.apache.org/mod_mbox/flink-user/201710.mbox/%3cfdd0a701-d6fe-433f-b343-19fd24cb3...@data-artisans.com%3e

How could I over come this condition?


Thanks & Regards,
Sushant Sawant

On Fri, 30 Aug 2019, 12:48 Yun Tang,  wrote:

> Hi Sushant
>
> What confuse me is that why source task cannot complete checkpoint in 3
> minutes [1]. If no sub-task has ever completed the checkpoint, which means
> even source task cannot complete. Actually source task would not need to
> buffer the data. From what I see, it might be affected by acquiring the
> lock which hold by stream task main thread to process elements [2]. Could
> you use jstack to capture your java process' threads to know what happened
> when checkpoint failed?
>
> [1]
> https://github.com/sushantbprise/flink-dashboard/blob/master/failed-checkpointing/state2.png
> [2]
> https://github.com/apache/flink/blob/ccc7eb431477059b32fb924104c17af953620c74/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L758
>
> Best
> Yun Tang
> --
> *From:* Sushant Sawant 
> *Sent:* Tuesday, August 27, 2019 15:01
> *To:* user 
> *Subject:* Re: checkpoint failure suddenly even state size less than 1 mb
>
> Hi team,
> Anyone for help/suggestion, now we have stopped all input in kafka, there
> is no processing, no sink but checkpointing is failing.
> Is it like once checkpoint fails it keeps failing forever until job
> restart.
>
> Help appreciated.
>
> Thanks & Regards,
> Sushant Sawant
>
> On 23 Aug 2019 12:56 p.m., "Sushant Sawant" 
> wrote:
>
> Hi all,
> m facing two issues which I believe are co-related though.
> 1. Kafka source shows high back pressure.
> 2. Sudden checkpoint failure for entire day until restart.
>
> My job does following thing,
> a. Read from Kafka
> b. Asyncio to external system
> c. Dumping in Cassandra, Elasticsearch
>
> Checkpointing is using file system.
> This flink job is proven under high load,
> around 5000/sec throughput.
> But recently we scaled down parallelism since, there wasn't any load in
> production and these issues started.
>
> Please find the status shown by flink dashboard.
> The github folder contains image where there was high back pressure and
> checkpoint failure
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/failed-checkpointing
> and  after restart, "everything is fine" images in this folder,
>
> https://github.com/sushantbprise/flink-dashboard/tree/master/working-checkpointing
>
> --
> Could anyone point me towards direction what would have went wrong/
> trouble shooting??
>
>
> Thanks & Regards,
> Sushant Sawant
>
>
>


Re: Flink Savepoint 超时

2019-09-06 Thread SJMSTER
找了一圈 没有看到其他的错误.就只有上面我贴出来的异常了..
因为这个是CLI执行时报的错...

On Fri, Sep 6, 2019 at 4:51 PM Wesley Peng  wrote:

>
>
> SJMSTER wrote:
> > Checkpoints一直都是成功的。
> > 今天重新尝试了一下cancle job with savepoint又成功了..
> > 不知道之前为什么试了几次都是超时的..
>
> are there any log items for diagnosis?
>
> regards.
>


Re: 希望获得一份编译后的1.6版本以上flink源码

2019-09-06 Thread Wesley Peng

Helo

guaishushu1...@163.com wrote:

因windows系统搭载虚拟机环境,总是不能成功编译flink源码,所以希望能得到一份编译后的1.6版本以上的flink源码


there is docker image for flink 1.9. since you are using a virtual 
machine, then docker might be used.


regards


希望获得一份编译后的1.6版本以上flink源码

2019-09-06 Thread guaishushu1...@163.com

因windows系统搭载虚拟机环境,总是不能成功编译flink源码,所以希望能得到一份编译后的1.6版本以上的flink源码


guaishushu1...@163.com


Re: Flink Savepoint 超时

2019-09-06 Thread Wesley Peng




SJMSTER wrote:

Checkpoints一直都是成功的。
今天重新尝试了一下cancle job with savepoint又成功了..
不知道之前为什么试了几次都是超时的..


are there any log items for diagnosis?

regards.


How do I start a Flink application on my Flink+Mesos cluster?

2019-09-06 Thread Felipe Gutierrez
Hi,

I am running Mesos without DC/OS [1] and Flink on it. Whe I start my
cluster I receive some messages that I suppose everything was started.
However, I see 0 slats available on the Flink web dashboard. But I suppose
that Mesos will allocate Slots and Task Managers dynamically. Is that right?

$ ./bin/mesos-appmaster.sh &
[1] 16723
flink@r03:~/flink-1.9.0$ I0906 10:22:45.080328 16943 sched.cpp:239]
Version: 1.9.0
I0906 10:22:45.082672 16996 sched.cpp:343] New master detected at
mas...@xxx.xxx.xxx.xxx:5050
I0906 10:22:45.083276 16996 sched.cpp:363] No credentials provided.
Attempting to register without authentication
I0906 10:22:45.086840 16997 sched.cpp:751] Framework registered with
22f6a553-e8ac-42d4-9a90-96a8d5f002f0-0003

Then I deploy my Flink application. When I use the first command to deploy
the application starts. However, the tasks remain CREATED until Flink
throws a timeout exception. In other words, it never turns to RUNNING.
When I use the second comman to deploy the application it does not start
and I receive the exception of "Could not allocate all requires slots
within timeout of 30 ms. Slots required: 2". The full stacktrace is
below.

$ /home/flink/flink-1.9.0/bin/flink run
/home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &
$ ./bin/mesos-appmaster-job.sh run
/home/flink/hello-flink-mesos/target/hello-flink-mesos.jar &


[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/mesos.html#mesos-without-dcos
ps.: my application runs normally on a standalone Flink cluster.


 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: Job failed.
(JobID: 7ad8d71faaceb1ac469353452c43dc2a)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338)
at
org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:60)
at
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
at org.hello_flink_mesos.App.(App.java:35)
at org.hello_flink_mesos.App.main(App.java:285)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:576)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:274)
at
org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:746)
at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:273)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
at
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
at
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
execution failed.
at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
at
org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:259)
... 22 more
Caused by:
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
Could not allocate all requires slots within timeout of 30 ms. Slots
required: 2, slots allocated: 0, previous allocation IDs: [], execution
status: completed exceptionally: java.util.concurrent.CompletionException:
java.util.concurrent.CompletionException:
java.util.concurrent.TimeoutException/java.util.concurrent.CompletableFuture@b520de3[Completed
exceptionally], incomplete: java.util.concurrent.CompletableFuture@36f3d30c[Not
completed, 1 dependents]
at
org.apache.flink.runtime.executiongraph.SchedulingUtils.lambda$scheduleEager$1(SchedulingUtils.java:194)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
at

Flink Savepoint 超时

2019-09-06 Thread Jimmy.Shao
请问下有谁遇到过在CLI手动触发Flink的Savepoint的时候遇到超时的异常吗?
或者尝试把Job Cancel With Savepoint也是一样的超时错误.
Savepoint是已经配置了存到HDFS上的,
Flink本身Run在Yarn上.
在官网看到一个参数“akka.client.timeout”不知道是不是针对这个的,
但是这个参数生效是要配置在flink-conf.yml里的,
也没办法CLI传递进去.
这样Job没法Cancel, Flink Cluster也就没法重启,死循环了.
感谢!

Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR was set.
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/flink-1.6.0-hdp/lib/phoenix-4.7.0.2.6.3.0-235-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/flink-1.6.0-hdp/lib/slf4j-log4j12-1.7.7.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> 2019-09-05 10:45:41,807 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hive.
> 2019-09-05 10:45:41,807 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - Found Yarn
> properties file under /tmp/.yarn-properties-hive.
> 2019-09-05 10:45:42,056 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN
> properties set default parallelism to 1
> 2019-09-05 10:45:42,056 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - YARN
> properties set default parallelism to 1
> YARN properties set default parallelism to 1
> 2019-09-05 10:45:42,269 INFO  org.apache.hadoop.yarn.client.AHSProxy
>  - Connecting to Application History server at
> ac13ghdpt2m01.lab-rot.saas.sap.corp/10.116.201.103:10200
> 2019-09-05 10:45:42,276 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-09-05 10:45:42,276 INFO
>  org.apache.flink.yarn.cli.FlinkYarnSessionCli - No path
> for the flink jar passed. Using the location of class
> org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2019-09-05 10:45:42,282 WARN
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Neither
> the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The
> Flink YARN Client needs one of these to be set to properly load the Hadoop
> configuration for accessing YARN.
> 2019-09-05 10:45:42,284 INFO
>  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  -
> Looking for the active RM in [rm1, rm2]...
> 2019-09-05 10:45:42,341 INFO
>  org.apache.hadoop.yarn.client.RequestHedgingRMFailoverProxyProvider  -
> Found active RM [rm1]
> 2019-09-05 10:45:42,345 INFO
>  org.apache.flink.yarn.AbstractYarnClusterDescriptor   - Found
> application JobManager host name 'ac13ghdpt2dn01.lab-rot.saas.sap.corp' and
> port '40192' from supplied application id 'application_1559153472177_52202'
> 2019-09-05 10:45:42,689 WARN
>  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory   - The
> short-circuit local reads feature cannot be used because libhadoop cannot
> be loaded.
> Triggering savepoint for job 6399ec2e8fdf4cb7d8481890019554f6.
> Waiting for response...
> 
>  The program finished with the following exception:
> org.apache.flink.util.FlinkException: Triggering a savepoint for the job
> 6399ec2e8fdf4cb7d8481890019554f6 failed.
> at
> org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:714)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:692)
> at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:979)
> at
> org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:689)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1059)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1120)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1866)
> at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1120)
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Exception is not retryable.
> at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
> at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
> at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
> 

Re:Re: Flink 1.9 Blink planner 时间字段问题

2019-09-06 Thread hb
不行, 
Caused by: org.apache.flink.table.api.ValidationException: Rowtime attribute 
'_rowtime' is not of type SQL_TIMESTAMP.



在 2019-09-06 10:48:02,"Jark Wu"  写道:
>可能是因为你在 schema 中把 eventTime 声明成了 timestamp 类型,你可以声明成 long 试试。 
>.field("_rowtime", Types.LONG())
>
>> 在 2019年9月5日,15:11,hb <343122...@163.com> 写道:
>> 
>> 实际应用中, 时间字段最常用的就是Long类型的毫秒时间戳, 难道这个不支持么.
>


Re: 如何统计数据处理延迟Delay情况

2019-09-06 Thread Jary Zhen
hi,
 
首先,我的理解这个差值准确描述应该是”延迟摄入时间“,一般说处理时间应该是数据摄入系统到处理完的时间段,也就是(addSource到addSink,这个时间段)。
关于统计延迟数据,你可以看看Side out api [1]

[1]:
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/side_output.html

On Thu, 5 Sep 2019 at 16:26, 陈赋赟  wrote:

> HI ALL
>   目前想对Flink Job添加一个统计数据处理延迟情况的Metric,目前的想法是拿到数据携带的时间(Event
> Time)于当前节点的时间(System.getCurrentTime)相减,得出的值即数据延迟处理的时间,但不确定这个想法是否正确且可行,求各位大佬提供思路和想法~