Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Wei Zhong
Thanks Hequn for being the release manager. Great work!

Best,
Wei

> 在 2019年12月12日,15:27,Jingsong Li  写道:
> 
> Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very 
> useful to users.
> Great work!
> 
> Best,
> Jingsong Lee
> 
> On Thu, Dec 12, 2019 at 3:25 PM jincheng sun  > wrote:
> Thanks for being the release manager and the great work Hequn :)
> Also thanks to the community making this release possible!
> 
> Best,
> Jincheng
> 
> Jark Wu mailto:imj...@gmail.com>> 于2019年12月12日周四 下午3:23写道:
> Thanks Hequn for helping out this release and being the release manager.
> Great work!
> 
> Best,
> Jark
> 
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  > wrote:
> 
> > Great work, Hequn
> >
> > Dian Fu mailto:dian0511...@gmail.com>> 
> > 于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  >> > 写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html 
> >> 
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html 
> >> 
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112 
> >> 
> >>
> >> We would like to thank all contributors of the Apache Flink community who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
> 
> 
> -- 
> Best, Jingsong Lee



Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-11 Thread ouywl






HI yang,   Could you give more info detail? log4j.properties content, and The k8s yaml. Is use the dockerfile in flink-container? When I test it use the default per-job yaml in flick-container? It is only show logs in docker infos. And not logs in /opt/flink/log.






  










ouywl




ou...@139.com








签名由
网易邮箱大师
定制

 


On 12/12/2019 13:47,Yang Wang wrote: 


Hi Peng,What i mean is to use `docker exec` into the running pod and `ps` to get the realcommand that is running for jobmanager. Do you have checked the /opt/flink/conf/log4j.properties is right?I have tested standalone per-job on my kubernetes cluster, the logs show up as expected.Best,YangLi Peng  于2019年12月12日周四 上午2:59写道:Hey Yang, here are the commands:"/opt/flink/bin/taskmanager.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dtaskmanager.numberOfTaskSlots=1""/opt/flink/bin/standalone-job.sh","start-foreground","-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager","-Dparallelism.default={{ .Values.task.replicaCount }}"Yes it's very curious that I don't see any logs actually written to /opt/flink/log. On Tue, Dec 10, 2019 at 11:17 PM Yang Wang  wrote:Could you find the logs under /opt/flink/log/jobmanager.log? If not, please share thecommands the JobManager and TaskManager are using? If the command is correctand the log4j under /opt/flink/conf is expected, it is so curious why we could not get the logs.Best,YangLi Peng  于2019年12月11日周三 下午1:24写道:Ah I see. I think the Flink app is reading files from /opt/flink/conf correctly as it is, since changes I make to flink-conf are picked up as expected, it's just the log4j properties that are either not being used, or don't apply to stdout or whatever source k8 uses for its logs? Given that the pods don't seem to have logs written to file anywhere, contrary to the properties, I'm inclined to say it's the former and that the log4j properties just aren't being picked up. Still have no idea why though.On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:







Sure, /opt/flink/conf is mounted as a volume from the configmap.
 
Best
Yun Tang
 

From: Li Peng 
Date: Wednesday, December 11, 2019 at 9:37 AM
To: Yang Wang 
Cc: vino yang , user 
Subject: Re: Flink on Kubernetes seems to ignore log4j.properties


 


1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and /opt/flink/bin/taskmanager.sh on my job and task managers respectively. It's based on the setup described here: http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/ .
 I haven't tried the configmap approach yet, does it also replace the conf files in /opt/flink/conf?

2. Hey Vino, here's a sample of the kubernetes: https://pastebin.com/fqJrgjZu  I didn't change any patterns from the default, so the string patterns should look the same, but as you can see it's full of info checkpoint
 logs that I originally was trying to suppress. Based on my log4j.properties, the level should be set to WARN. I couldn't actually find any .out files on the pod, this is from the kubectl logs command. I also didn't see any files in /opt/flink/log, which I
 thought my log4j was specified to do, hence me thinking that the properties weren't actually being consumed. I also have the same properties in my src/main/resources folder.

3. Hey Yang, yes this is a standalone session cluster. I did specify in the docker file to copy the log4j.properties to the /opt/flink/conf folder on the image, and I confirmed that the properties are correct when I bash'd into the pod and viewed them manually.


 


Incidentally, I also tried passing the -Dlog4j.configuration argument to the programs, and it doesn't work either. And based on what I'm reading on jira, that option is not really supported anymore?

 


Thanks for your responses, folks!


Li



 


On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:



Hi Li Peng, 

 


You are running standalone session cluster or per-job cluster on kubernetes. Right?


If so, i think you need to check your log4j.properties in the image, not local. The log is


stored to /opt/flink/log/jobmanager.log by default. 


 


If you are running active Kubernetes integration for a fresh taste. The following cli option


could be used to 

Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jingsong Li
Thanks Hequn for your driving, 1.8.3 fixed a lot of issues and it is very
useful to users.
Great work!

Best,
Jingsong Lee

On Thu, Dec 12, 2019 at 3:25 PM jincheng sun 
wrote:

> Thanks for being the release manager and the great work Hequn :)
> Also thanks to the community making this release possible!
>
> Best,
> Jincheng
>
> Jark Wu  于2019年12月12日周四 下午3:23写道:
>
>> Thanks Hequn for helping out this release and being the release manager.
>> Great work!
>>
>> Best,
>> Jark
>>
>> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>>
>> > Great work, Hequn
>> >
>> > Dian Fu  于2019年12月12日周四 下午2:32写道:
>> >
>> >> Thanks Hequn for being the release manager and everyone who contributed
>> >> to this release.
>> >>
>> >> Regards,
>> >> Dian
>> >>
>> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>> >>
>> >> Hi,
>> >>
>> >> The Apache Flink community is very happy to announce the release of
>> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
>> Flink
>> >> 1.8 series.
>> >>
>> >> Apache Flink® is an open-source stream processing framework for
>> >> distributed, high-performing, always-available, and accurate data
>> streaming
>> >> applications.
>> >>
>> >> The release is available for download at:
>> >> https://flink.apache.org/downloads.html
>> >>
>> >> Please check out the release blog post for an overview of the
>> >> improvements for this bugfix release:
>> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>> >>
>> >> The full release notes are available in Jira:
>> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>> >>
>> >> We would like to thank all contributors of the Apache Flink community
>> who
>> >> made this release possible!
>> >> Great thanks to @Jincheng as a mentor during this release.
>> >>
>> >> Regards,
>> >> Hequn
>> >>
>> >>
>> >>
>> >
>> > --
>> > Best Regards
>> >
>> > Jeff Zhang
>> >
>>
>

-- 
Best, Jingsong Lee


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread jincheng sun
Thanks for being the release manager and the great work Hequn :)
Also thanks to the community making this release possible!

Best,
Jincheng

Jark Wu  于2019年12月12日周四 下午3:23写道:

> Thanks Hequn for helping out this release and being the release manager.
> Great work!
>
> Best,
> Jark
>
> On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:
>
> > Great work, Hequn
> >
> > Dian Fu  于2019年12月12日周四 下午2:32写道:
> >
> >> Thanks Hequn for being the release manager and everyone who contributed
> >> to this release.
> >>
> >> Regards,
> >> Dian
> >>
> >> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
> >>
> >> Hi,
> >>
> >> The Apache Flink community is very happy to announce the release of
> >> Apache Flink 1.8.3, which is the third bugfix release for the Apache
> Flink
> >> 1.8 series.
> >>
> >> Apache Flink® is an open-source stream processing framework for
> >> distributed, high-performing, always-available, and accurate data
> streaming
> >> applications.
> >>
> >> The release is available for download at:
> >> https://flink.apache.org/downloads.html
> >>
> >> Please check out the release blog post for an overview of the
> >> improvements for this bugfix release:
> >> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
> >>
> >> The full release notes are available in Jira:
> >> https://issues.apache.org/jira/projects/FLINK/versions/12346112
> >>
> >> We would like to thank all contributors of the Apache Flink community
> who
> >> made this release possible!
> >> Great thanks to @Jincheng as a mentor during this release.
> >>
> >> Regards,
> >> Hequn
> >>
> >>
> >>
> >
> > --
> > Best Regards
> >
> > Jeff Zhang
> >
>


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jark Wu
Thanks Hequn for helping out this release and being the release manager.
Great work!

Best,
Jark

On Thu, 12 Dec 2019 at 15:02, Jeff Zhang  wrote:

> Great work, Hequn
>
> Dian Fu  于2019年12月12日周四 下午2:32写道:
>
>> Thanks Hequn for being the release manager and everyone who contributed
>> to this release.
>>
>> Regards,
>> Dian
>>
>> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>>
>> Hi,
>>
>> The Apache Flink community is very happy to announce the release of
>> Apache Flink 1.8.3, which is the third bugfix release for the Apache Flink
>> 1.8 series.
>>
>> Apache Flink® is an open-source stream processing framework for
>> distributed, high-performing, always-available, and accurate data streaming
>> applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the
>> improvements for this bugfix release:
>> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>>
>> We would like to thank all contributors of the Apache Flink community who
>> made this release possible!
>> Great thanks to @Jincheng as a mentor during this release.
>>
>> Regards,
>> Hequn
>>
>>
>>
>
> --
> Best Regards
>
> Jeff Zhang
>


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Jeff Zhang
Great work, Hequn

Dian Fu  于2019年12月12日周四 下午2:32写道:

> Thanks Hequn for being the release manager and everyone who contributed to
> this release.
>
> Regards,
> Dian
>
> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
>
> Hi,
>
> The Apache Flink community is very happy to announce the release of Apache
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8
> series.
>
> Apache Flink® is an open-source stream processing framework for
> distributed, high-performing, always-available, and accurate data streaming
> applications.
>
> The release is available for download at:
> https://flink.apache.org/downloads.html
>
> Please check out the release blog post for an overview of the improvements
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html
>
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112
>
> We would like to thank all contributors of the Apache Flink community who
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>
> Regards,
> Hequn
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [ANNOUNCE] Apache Flink 1.8.3 released

2019-12-11 Thread Dian Fu
Thanks Hequn for being the release manager and everyone who contributed to this 
release.

Regards,
Dian

> 在 2019年12月12日,下午2:24,Hequn Cheng  写道:
> 
> Hi,
>  
> The Apache Flink community is very happy to announce the release of Apache 
> Flink 1.8.3, which is the third bugfix release for the Apache Flink 1.8 
> series.
>  
> Apache Flink® is an open-source stream processing framework for distributed, 
> high-performing, always-available, and accurate data streaming applications.
>  
> The release is available for download at:
> https://flink.apache.org/downloads.html 
> 
>  
> Please check out the release blog post for an overview of the improvements 
> for this bugfix release:
> https://flink.apache.org/news/2019/12/11/release-1.8.3.html 
> 
>  
> The full release notes are available in Jira:
> https://issues.apache.org/jira/projects/FLINK/versions/12346112 
> 
>  
> We would like to thank all contributors of the Apache Flink community who 
> made this release possible!
> Great thanks to @Jincheng as a mentor during this release.
>  
> Regards,
> Hequn 



Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-11 Thread Yang Wang
Hi Peng,

What i mean is to use `docker exec` into the running pod and `ps` to get
the real
command that is running for jobmanager.
Do you have checked the /opt/flink/conf/log4j.properties is right?

I have tested standalone per-job on my kubernetes cluster, the logs show up
as expected.


Best,
Yang

Li Peng  于2019年12月12日周四 上午2:59写道:

> Hey Yang, here are the commands:
>
> "/opt/flink/bin/taskmanager.sh",
> "start-foreground",
> "-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
> "-Dtaskmanager.numberOfTaskSlots=1"
>
> "/opt/flink/bin/standalone-job.sh",
> "start-foreground",
> "-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
> "-Dparallelism.default={{ .Values.task.replicaCount }}"
>
> Yes it's very curious that I don't see any logs actually written to
> /opt/flink/log.
>
> On Tue, Dec 10, 2019 at 11:17 PM Yang Wang  wrote:
>
>> Could you find the logs under /opt/flink/log/jobmanager.log? If not,
>> please share the
>> commands the JobManager and TaskManager are using? If the command is
>> correct
>> and the log4j under /opt/flink/conf is expected, it is so curious why we
>> could not get the logs.
>>
>>
>> Best,
>> Yang
>>
>> Li Peng  于2019年12月11日周三 下午1:24写道:
>>
>>> Ah I see. I think the Flink app is reading files from
>>> /opt/flink/conf correctly as it is, since changes I make to flink-conf are
>>> picked up as expected, it's just the log4j properties that are either not
>>> being used, or don't apply to stdout or whatever source k8 uses for its
>>> logs? Given that the pods don't seem to have logs written to file
>>> anywhere, contrary to the properties, I'm inclined to say it's the former
>>> and that the log4j properties just aren't being picked up. Still have no
>>> idea why though.
>>>
>>> On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:
>>>
 Sure, /opt/flink/conf is mounted as a volume from the configmap.



 Best

 Yun Tang



 *From: *Li Peng 
 *Date: *Wednesday, December 11, 2019 at 9:37 AM
 *To: *Yang Wang 
 *Cc: *vino yang , user 
 *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties



 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
 /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
 It's based on the setup described here:
 http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
  .
 I haven't tried the configmap approach yet, does it also replace the conf
 files in /opt/flink/conf?

 2. Hey Vino, here's a sample of the kubernetes:
 https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
 default, so the string patterns should look the same, but as you can see
 it's full of info checkpoint logs that I originally was trying to suppress.
 Based on my log4j.properties, the level should be set to WARN. I couldn't
 actually find any .out files on the pod, this is from the kubectl logs
 command. I also didn't see any files in /opt/flink/log, which I thought my
 log4j was specified to do, hence me thinking that the properties weren't
 actually being consumed. I also have the same properties in my
 src/main/resources folder.

 3. Hey Yang, yes this is a standalone session cluster. I did specify in
 the docker file to copy the log4j.properties to the /opt/flink/conf folder
 on the image, and I confirmed that the properties are correct when I bash'd
 into the pod and viewed them manually.



 Incidentally, I also tried passing the -Dlog4j.configuration argument
 to the programs, and it doesn't work either. And based on what I'm reading
 on jira, that option is not really supported anymore?



 Thanks for your responses, folks!

 Li



 On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:

 Hi Li Peng,



 You are running standalone session cluster or per-job cluster on
 kubernetes. Right?

 If so, i think you need to check your log4j.properties in the image,
 not local. The log is

 stored to /opt/flink/log/jobmanager.log by default.



 If you are running active Kubernetes integration for a fresh taste. The
 following cli option

 could be used to remove the redirect.

 -Dkubernetes.container-start-command-template="%java% %classpath%
 %jvmmem% %jvmopts% %logging% %class% %args%"



 Best,

 Yang



 vino yang  于2019年12月10日周二 上午10:55写道:

 Hi Li,



 A potential reason could be conflicting logging frameworks. Can you
 share the log in your .out file and let us know if the print format of the
 log is the same as the configuration file you gave.



 Best,

 Vino



 Li Peng  于2019年12月10日周二 上午10:09写道:

 Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl

回复: flink持续查询过去30分钟登录网站的人数

2019-12-11 Thread Yuan,Youjun
首先通过一个自定义表函数(table function),将每条输入的消息变成3条消息,假设输入消息的时间为ts,产出的三条消息时间分别为:(ts-1, 
0), (ts+1, 1), (ts+31, 0),
然后再用30分钟的range over窗口对前面的数值部分(0或者1)求SUM

袁尤军

-邮件原件-
发件人: 陈帅  
发送时间: Wednesday, December 11, 2019 9:31 PM
收件人: user-zh@flink.apache.org
主题: flink持续查询过去30分钟登录网站的人数

例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无

那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41 (5), 
12:46 (4), 13:16 (0)

即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。

用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?


Re: Flink on Kubernetes seems to ignore log4j.properties

2019-12-11 Thread Li Peng
Hey Yang, here are the commands:

"/opt/flink/bin/taskmanager.sh",
"start-foreground",
"-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
"-Dtaskmanager.numberOfTaskSlots=1"

"/opt/flink/bin/standalone-job.sh",
"start-foreground",
"-Djobmanager.rpc.address={{ .Chart.Name }}-job-manager",
"-Dparallelism.default={{ .Values.task.replicaCount }}"

Yes it's very curious that I don't see any logs actually written to
/opt/flink/log.

On Tue, Dec 10, 2019 at 11:17 PM Yang Wang  wrote:

> Could you find the logs under /opt/flink/log/jobmanager.log? If not,
> please share the
> commands the JobManager and TaskManager are using? If the command is
> correct
> and the log4j under /opt/flink/conf is expected, it is so curious why we
> could not get the logs.
>
>
> Best,
> Yang
>
> Li Peng  于2019年12月11日周三 下午1:24写道:
>
>> Ah I see. I think the Flink app is reading files from
>> /opt/flink/conf correctly as it is, since changes I make to flink-conf are
>> picked up as expected, it's just the log4j properties that are either not
>> being used, or don't apply to stdout or whatever source k8 uses for its
>> logs? Given that the pods don't seem to have logs written to file
>> anywhere, contrary to the properties, I'm inclined to say it's the former
>> and that the log4j properties just aren't being picked up. Still have no
>> idea why though.
>>
>> On Tue, Dec 10, 2019 at 6:56 PM Yun Tang  wrote:
>>
>>> Sure, /opt/flink/conf is mounted as a volume from the configmap.
>>>
>>>
>>>
>>> Best
>>>
>>> Yun Tang
>>>
>>>
>>>
>>> *From: *Li Peng 
>>> *Date: *Wednesday, December 11, 2019 at 9:37 AM
>>> *To: *Yang Wang 
>>> *Cc: *vino yang , user 
>>> *Subject: *Re: Flink on Kubernetes seems to ignore log4j.properties
>>>
>>>
>>>
>>> 1. Hey Yun, I'm calling /opt/flink/bin/standalone-job.sh and
>>> /opt/flink/bin/taskmanager.sh on my job and task managers respectively.
>>> It's based on the setup described here:
>>> http://shzhangji.com/blog/2019/08/24/deploy-flink-job-cluster-on-kubernetes/
>>>  .
>>> I haven't tried the configmap approach yet, does it also replace the conf
>>> files in /opt/flink/conf?
>>>
>>> 2. Hey Vino, here's a sample of the kubernetes:
>>> https://pastebin.com/fqJrgjZu  I didn't change any patterns from the
>>> default, so the string patterns should look the same, but as you can see
>>> it's full of info checkpoint logs that I originally was trying to suppress.
>>> Based on my log4j.properties, the level should be set to WARN. I couldn't
>>> actually find any .out files on the pod, this is from the kubectl logs
>>> command. I also didn't see any files in /opt/flink/log, which I thought my
>>> log4j was specified to do, hence me thinking that the properties weren't
>>> actually being consumed. I also have the same properties in my
>>> src/main/resources folder.
>>>
>>> 3. Hey Yang, yes this is a standalone session cluster. I did specify in
>>> the docker file to copy the log4j.properties to the /opt/flink/conf folder
>>> on the image, and I confirmed that the properties are correct when I bash'd
>>> into the pod and viewed them manually.
>>>
>>>
>>>
>>> Incidentally, I also tried passing the -Dlog4j.configuration argument to
>>> the programs, and it doesn't work either. And based on what I'm reading on
>>> jira, that option is not really supported anymore?
>>>
>>>
>>>
>>> Thanks for your responses, folks!
>>>
>>> Li
>>>
>>>
>>>
>>> On Mon, Dec 9, 2019 at 7:10 PM Yang Wang  wrote:
>>>
>>> Hi Li Peng,
>>>
>>>
>>>
>>> You are running standalone session cluster or per-job cluster on
>>> kubernetes. Right?
>>>
>>> If so, i think you need to check your log4j.properties in the image, not
>>> local. The log is
>>>
>>> stored to /opt/flink/log/jobmanager.log by default.
>>>
>>>
>>>
>>> If you are running active Kubernetes integration for a fresh taste. The
>>> following cli option
>>>
>>> could be used to remove the redirect.
>>>
>>> -Dkubernetes.container-start-command-template="%java% %classpath%
>>> %jvmmem% %jvmopts% %logging% %class% %args%"
>>>
>>>
>>>
>>> Best,
>>>
>>> Yang
>>>
>>>
>>>
>>> vino yang  于2019年12月10日周二 上午10:55写道:
>>>
>>> Hi Li,
>>>
>>>
>>>
>>> A potential reason could be conflicting logging frameworks. Can you
>>> share the log in your .out file and let us know if the print format of the
>>> log is the same as the configuration file you gave.
>>>
>>>
>>>
>>> Best,
>>>
>>> Vino
>>>
>>>
>>>
>>> Li Peng  于2019年12月10日周二 上午10:09写道:
>>>
>>> Hey folks, I noticed that my kubernetes flink logs (reached via *kubectl
>>> logs *) completely ignore any of the configurations I put
>>> into /flink/conf/. I set the logger level to WARN, yet I still see INFO
>>> level logging from flink loggers
>>> like org.apache.flink.runtime.checkpoint.CheckpointCoordinator. I even made
>>> copied the same properties to /flink/conf/log4j-console.properties
>>> and log4j-cli.properties.
>>>
>>>
>>>
>>> From what I can tell, kubernetes just listens to stdout and stderr, so
>>> shouldn't the log4j.properties control output to them? 

Re: Interval Join Late Record Metrics

2019-12-11 Thread Chris Gillespie
Thanks Congxian, I made a JIRA to track this request.
https://issues.apache.org/jira/browse/FLINK-15202

On Wed, Dec 11, 2019 at 12:56 AM Congxian Qiu 
wrote:

> Hi Chris
>
> From the code[1], currently, IntervalJoin will ignore the late data
> silently, maybe you can create an issue to track this.
>
> [1]
> https://github.com/apache/flink/blob/5c89d12849ea2aa332126b32808e363f12d436a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L222
> 
> Best,
> Congxian
>
>
> Chris Gillespie  于2019年12月11日周三 上午8:09写道:
>
>> Hello Flink users, first time poster here.
>>
>> I'm using an interval join in my Flink project, however I haven't found
>> where late records get logged in metrics. Window Joins have
>> "numLateRecordsDropped" implemented
>> ,
>>  but is
>> there an equivalent within an interval join?
>>
>> My main use case is to track how often a record falls outside of the
>> lower and upper bounds when trying to join two streams. Interval Join looks
>> like it simply short circuits
>> when
>>  there is
>> a late record? Maybe I am not understanding what defines a late record in
>> this situation.
>>
>> Is there a good way to monitor when an interval join fails to join two
>> streams? Currently I'm looking at the delta between two operator metrics,
>> but it hasn't looked that reliable so far.
>>
>> Thanks,
>> Chris Gillespie
>>
>


Re: Scala case class TypeInformation and Serializer

2019-12-11 Thread Yun Tang
Hi

Would you please give related code? I think it might due to insufficient hint 
to type information.

Best
Yun Tang



From: 杨光 
Date: Wednesday, December 11, 2019 at 7:20 PM
To: user 
Subject: Scala case class TypeInformation and Serializer

Hi, I'm working on write a flink stream job with scala api , how should I find 
out which class is serialied by flink type serializer and which is falled back 
to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some  extend the 
TypeInfo classes of Flink or some other customisations to improve performance.

below is some errors I got when I set disableGenericTypes,so I know if will 
fall back to Kryo

Exception in thread "main" java.lang.UnsupportedOperationException: Generic 
types have been disabled in the ExecutionConfig and type scala.Tuple2 is 
treated as a generic type.
at 
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at 
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)


Re: Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Jark Wu
Hi Arujit,

Thanks for reporting this. Are you using this UDF in window aggregation in
old planner ?
AFAIK, open() method of UDAF is only not called in window aggregation in
old planner,
because old planner uses DataStream WindowOperator which will not call
open() on AggregateFunction [1].

I also tested it in master branch, and it works for other aggregation (e.g.
over aggs, group aggs) in old planner,
and works for any aggregations in blink planner.

If you are using v1.9,  you can switch to blink planner and have a try.

Best,
Jark

[1]:
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java#L804

On Wed, 11 Dec 2019 at 19:01, Timo Walther  wrote:

> At least I hope it has been fixed. Which version and planner are you using?
>
>
> On 11.12.19 11:47, Arujit Pradhan wrote:
> > Hi Timo,
> >
> > Thanks for the bug reference.
> >
> > You mentioned that this bug has been fixed. Is the fix available for
> > flink 1.9+ and default query planner.
> >
> > Thanks and regards,
> > /Arujit/
> >
> > On Wed, Dec 11, 2019 at 3:56 PM Timo Walther  > > wrote:
> >
> > I remember that we fixed some bug around this topic recently. The
> > legacy
> > planner should not be affected.
> >
> > There is another user reporting this:
> > https://issues.apache.org/jira/browse/FLINK-15040
> >
> > Regards,
> > Timo
> >
> > On 11.12.19 10:34, Dawid Wysakowicz wrote:
> >  > Hi Arujit,
> >  >
> >  > Could you also share the query where you use this UDF? It would
> also
> >  > help if you said which version of Flink you are using and which
> > planner.
> >  >
> >  > Best,
> >  >
> >  > Dawid
> >  >
> >  > On 11/12/2019 10:21, Arujit Pradhan wrote:
> >  >> Hi all,
> >  >>
> >  >> So we are creating some User Defined Functions of type
> >  >> AggregateFunction. And we want to send some static metrics from
> the
> >  >> *open()* method of the UDFs as we can get *MetricGroup *by
> >  >> *FunctionContext *which is only exposed in the open method. Our
> > code
> >  >> looks something like this(Which is an implementation of count
> > distinct
> >  >> in SQL) :
> >  >>
> >  >> public class DistinctCount extends AggregateFunction >  >> DistinctCountAccumulator> { @Override public
> > DistinctCountAccumulator
> >  >> createAccumulator() { return new DistinctCountAccumulator(); }
> >  >> @Override public void open(FunctionContext context) throws
> > Exception { super.open(context); MetricGroup metricGroup =
> > context.getMetricGroup(); // add some metric to the group here
> >  >> System.out.println("in the open of UDF"); } @Override public void
> >  >> close() throws Exception { super.close(); } @Override public
> > Integer
> >  >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
> > System.out.println("in the udf"); return
> > distinctCountAccumulator.count(); } public void
> > accumulate(DistinctCountAccumulator distinctCountAccumulator, String
> > item) { if (item== null) { return; }
> > distinctCountAccumulator.add(item); } }
> >  >>
> >  >> But when we use this UDF in FlinkSQL, it seems like the open
> > method is
> >  >> not being called at all.
> >  >>
> >  >> From the filnk UDF documentation we find :
> >  >>
> >  >> *The |open()| method is called once before the evaluation
> > method. The
> >  >> |close()| method after the last call to the evaluation method.*
> >  >>
> >  >> *The |open()| method provides a |FunctionContext| that contains
> >  >> information about the context in which user-defined functions are
> >  >> executed, such as the metric group, the distributed cache files,
> or
> >  >> the global job parameters.*
> >  >>
> >  >> Then is there any reason that open is not working in
> >  >> AggragateFunctions. Btw it works fine in case of
> > ScalarFunctions. Is
> >  >> there any alternative scope where we can register some static
> > metrics
> >  >> in a UDF.
> >  >>
> >  >>
> >  >> Thanks and regards,
> >  >> /Arujit/
> >  >>
> >
>
>


Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks David for your detailed answers.   Mans
On Wednesday, December 11, 2019, 08:12:51 AM EST, David Anderson 
 wrote:  
 
 
If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 

    (a) it is considered late and included in the window function as a late 
firing ?
An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness.

Actually, I'm not sure I got this right -- on this point I recommend some 
experimentation, or careful reading of the code.
On Wed, Dec 11, 2019 at 2:08 PM David Anderson  wrote:

I'll attempt to answer your questions.

If we have allowed lateness to be greater than 0 (say 5), then if an event 
which arrives at window end + 3 (within allowed lateness), 
    (a) it is considered late and included in the window function as a late 
firing ?


An event with a timestamp that falls within the window's boundaries that 
arrives when the current watermark is at window end + 3 will be included as a 
late event that has arrived within the allowed lateness. 
    (b) Are the late firings under the control of the trigger ? 


Yes, the trigger is involved in all firings, late or not. 
    (c) If there are may events like this - are there multiple window function 
invocations ?

With the default event time trigger, each late event causes a late firing. You 
could use a custom trigger to implement other behaviors. 
    (d) Are these events (still within window end + allowed lateness) also 
emitted via the side output late data ?


No. The side output for late events is only used to collect events that fall 
outside the allowed lateness. 
2. If an event arrives after the window end + allowed lateness - 
    (a) Is it excluded from the window function but still emitted from the side 
output late data ?  


Yes. 
    (b) And if it is emitted is there any attribute which indicates for which 
window it was a late event ?  


No, the event is emitted without any additional information.

    (c) Is there any time limit while the late side output remains active for a 
particular window or all late events channeled to it ?

There is no time limit; the late side output remains operative indefinitely. 
Hope that helps,David
On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:

 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> 
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> 
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations
>>  
>>
>>
>> When specifying an allowed lateness greater than 0, the window along 
>> with its content is kept after the watermark passes the end of the 
>> window. In these cases, when a late but not dropped element arrives, 
>> it could trigger another firing for the window. These firings are 
>> called |late 

Re: flink savepoint checkpoint

2019-12-11 Thread 陈帅
flink 1.9里面支持cancel job with savepoint功能
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/savepoints.html#cancel-job-with-savepoint
checkpoint可能是增量的,但savepoint是全量的。具体区别可以参考
https://www.ververica.com/blog/differences-between-savepoints-and-checkpoints-in-flink


lucas.wu  于2019年12月11日周三 上午11:56写道:

> hi 各位:
>
> 有个问题想和大家讨论一下,就是flink的savepoint为什么要设置成手动的?如果在stop程序的时候没有做savepoint的话,在我重启的时候就不能使用之前保存的一些状态信息。为什么不参考spark的方式,定时做checkpoint,然后启动的时候指定ck地址就可以从上次执行的地方继续执行。


flink持续查询过去30分钟登录网站的人数

2019-12-11 Thread 陈帅
例如,用户在以下时间点登录:无, 12:02, 12:13, 12:15, 12:31, 12:40, 12:45, 无

那么我期望在以下时间点(实际查询可能在任意时间点)获取到的结果数为
12:01 (0),  12:03:(1),  12:14 (2),  12:16(3), 12:30 (4), 12:35 (4), 12:41
(5), 12:46 (4), 13:16 (0)

即每个元素进来就会设一个30分钟过期时间,窗口状态是当前还未过期元素集合。

用flink stream api和flink sql分别要如何实现?如果用timerService的话过多元素ttl会不会造成性能问题?


Re: Flink State 过期清除 TTL 问题

2019-12-11 Thread 陈帅
我们也遇到过类似的问题,有可能是进来的数据量带来的状态增长速度大于状态过期清理速度。另外想问一下有没有metrics监控到每次清理过期状态的大小和时间?

Yun Tang  于2019年12月10日周二 下午8:30写道:

> Hi 王磊
>
> Savepoint目录中的数据的时间戳不会在恢复的时候再更新为当前时间,仍然为之前的时间,从代码上看如果你配置了cleanupFullSnapshot就会生效的,另外配置
> cleanupInRocksdbCompactFilter
> 能让过期清理检查在后台执行,据我所知这个功能是可靠的,有尝试过长时间观察么,另外你们的新增数据量是恒定的么?
>
> 祝好
> 唐云
>
> On 12/10/19, 10:16 AM, "wangl...@geekplus.com.cn" <
> wangl...@geekplus.com.cn> wrote:
>
> Hi 唐云,
>
> 我的集群已经升到了 1.8.2,  cleanupFullSnapshot 和 cleanupInRocksdbCompactFilter
> 都试验了下。
> 但 cancel -s 停止后, 生成的 savepoint 目录还是没有变小。过程是这样的:
>
> cancel -s 停止,savepoint 目录大小为 100M
> 代码变更,把原来的 setUpdateType 变为 cleanupFullSnapshot
> 新的代码从 1 的 savepoint 目录恢复
> 新的代码运行一天左右,再 cancel -s, 新的 savepoint 目录变大
>
> 会不会是 每次 flink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Yun Tang
> Send Time: 2019-11-01 01:38
> Receiver: user-zh@flink.apache.org
> Subject: Re: Flink State 过期清除 TTL 问题
> Hi 王磊
>
> 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置
> cleanupFullSnapshot,这样你在执行full
> snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。
>
> 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/state/state.html#cleanup-of-expired-state
> [2] https://www.infoq.cn/article/L_66ga2G9oLWypvOLtIv
>
> 祝好
> 唐云
>
>
> On 10/31/19, 10:25 AM, "wangl...@geekplus.com.cn" <
> wangl...@geekplus.com.cn> wrote:
>
> flink-1.7.2, 用下面的清除策略:StateTtlConfig ttlConfig = StateTtlConfig
>
> .newBuilder(Time.days(3)).setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite).setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired).build();
> ValueStateDescriptor descriptor = new
> ValueStateDescriptor("OrderState", TypeInformation.of(OrderState.class));
> descriptor.enableTimeToLive(ttlConfig);
>
> 程序更新时用 savepoint 的方式, flink cancel -s 保存到 savepoint 目录,再从
> savepoint 目录恢复。
> 我的程序运行了一段时间,已经远大于 3 天,每次 flink cancel -s 后生成的 savepoint
> 目录不断变大。是过期清除策略没生效吗?
>
> 谢谢,
> 王磊
>
>
>
> wangl...@geekplus.com.cn
>
>
>
>
>


Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread David Anderson
>
> If we have allowed lateness to be greater than 0 (say 5), then if an event
> which arrives at window end + 3 (within allowed lateness),
>
> (a) it is considered late and included in the window function as a
> late firing ?
> An event with a timestamp that falls within the window's boundaries that
> arrives when the current watermark is at window end + 3 will be included as
> a late event that has arrived within the allowed lateness.
>
> Actually, I'm not sure I got this right -- on this point I recommend some
experimentation, or careful reading of the code.

On Wed, Dec 11, 2019 at 2:08 PM David Anderson  wrote:

> I'll attempt to answer your questions.
>
> If we have allowed lateness to be greater than 0 (say 5), then if an event
>> which arrives at window end + 3 (within allowed lateness),
>> (a) it is considered late and included in the window function as a
>> late firing ?
>>
>
> An event with a timestamp that falls within the window's boundaries that
> arrives when the current watermark is at window end + 3 will be included as
> a late event that has arrived within the allowed lateness.
>
>
>> (b) Are the late firings under the control of the trigger ?
>>
>
> Yes, the trigger is involved in all firings, late or not.
>
>
>> (c) If there are may events like this - are there multiple window
>> function invocations ?
>
>
> With the default event time trigger, each late event causes a late firing.
> You could use a custom trigger to implement other behaviors.
>
>
>> (d) Are these events (still within window end + allowed lateness)
>> also emitted via the side output late data ?
>>
>
> No. The side output for late events is only used to collect events that
> fall outside the allowed lateness.
>
>
>> 2. If an event arrives after the window end + allowed lateness -
>> (a) Is it excluded from the window function but still emitted from
>> the side output late data ?
>>
>
> Yes.
>
>
>> (b) And if it is emitted is there any attribute which indicates for
>> which window it was a late event ?
>>
>
> No, the event is emitted without any additional information.
>
> (c) Is there any time limit while the late side output remains active
>> for a particular window or all late events channeled to it ?
>
>
> There is no time limit; the late side output remains operative
> indefinitely.
>
> Hope that helps,
> David
>
> On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:
>
>> Thanks Timo for your answer.  I will try the prototype but was wondering
>> if I can find some theoretical documentation to give me a sound
>> understanding.
>>
>> Mans
>>
>> On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther <
>> twal...@apache.org> wrote:
>>
>>
>> Little mistake: The key must be any constant instead of `e`.
>>
>>
>> On 11.12.19 11:42, Timo Walther wrote:
>> > Hi Mans,
>> >
>> > I would recommend to create a little prototype to answer most of your
>> > questions in action.
>> >
>> > You can simple do:
>> >
>> > stream = env.fromElements(1L, 2L, 3L, 4L)
>> > .assignTimestampsAndWatermarks(
>> > new AssignerWithPunctuatedWatermarks{
>> > extractTimestamp(e) = e,
>> > checkAndGetNextWatermark(e, ts) = new Watermark(e)
>> > })
>> >
>> > stream.keyBy(e -> e).window(...).print()
>> > env.execute()
>> >
>> > This allows to quickly create a stream of event time for testing the
>> > semantics.
>> >
>> > I hope this helps. Otherwise of course we can help you in finding the
>> > answers to the remaining questions.
>> >
>> > Regards,
>> > Timo
>> >
>> >
>> >
>> > On 10.12.19 20:32, M Singh wrote:
>> >> Hi:
>> >>
>> >> I have a few questions about the side output late data.
>> >>
>> >> Here is the API
>> >>
>> >> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <-
>> >> required: "assigner" [.trigger(...)] <- optional: "trigger" (else
>> >> default trigger) [.evictor(...)] <- optional: "evictor" (else no
>> >> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero)
>> >> [.sideOutputLateData(...)] <- optional: "output tag" (else no side
>> >> output for late data) .reduce/aggregate/fold/apply() <- required:
>> >> "function" [.getSideOutput(...)] <- optional: "output tag"|
>> >>
>> >>
>> >>
>> >> Apache Flink 1.9 Documentation: Windows
>> >> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> Apache Flink 1.9 Documentation: Windows
>> >>
>> >> <
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >>
>> >> Here is the documentation:
>> >>
>> >>
>> >>   Late elements
>> >>
>> >> considerations<
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>>
>> >>
>> >>
>> >> When specifying an allowed lateness greater than 0, the window along
>> >> with its 

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread David Anderson
I'll attempt to answer your questions.

If we have allowed lateness to be greater than 0 (say 5), then if an event
> which arrives at window end + 3 (within allowed lateness),
> (a) it is considered late and included in the window function as a
> late firing ?
>

An event with a timestamp that falls within the window's boundaries that
arrives when the current watermark is at window end + 3 will be included as
a late event that has arrived within the allowed lateness.


> (b) Are the late firings under the control of the trigger ?
>

Yes, the trigger is involved in all firings, late or not.


> (c) If there are may events like this - are there multiple window
> function invocations ?


With the default event time trigger, each late event causes a late firing.
You could use a custom trigger to implement other behaviors.


> (d) Are these events (still within window end + allowed lateness) also
> emitted via the side output late data ?
>

No. The side output for late events is only used to collect events that
fall outside the allowed lateness.


> 2. If an event arrives after the window end + allowed lateness -
> (a) Is it excluded from the window function but still emitted from the
> side output late data ?
>

Yes.


> (b) And if it is emitted is there any attribute which indicates for
> which window it was a late event ?
>

No, the event is emitted without any additional information.

(c) Is there any time limit while the late side output remains active
> for a particular window or all late events channeled to it ?


There is no time limit; the late side output remains operative
indefinitely.

Hope that helps,
David

On Wed, Dec 11, 2019 at 1:40 PM M Singh  wrote:

> Thanks Timo for your answer.  I will try the prototype but was wondering
> if I can find some theoretical documentation to give me a sound
> understanding.
>
> Mans
>
> On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther <
> twal...@apache.org> wrote:
>
>
> Little mistake: The key must be any constant instead of `e`.
>
>
> On 11.12.19 11:42, Timo Walther wrote:
> > Hi Mans,
> >
> > I would recommend to create a little prototype to answer most of your
> > questions in action.
> >
> > You can simple do:
> >
> > stream = env.fromElements(1L, 2L, 3L, 4L)
> > .assignTimestampsAndWatermarks(
> > new AssignerWithPunctuatedWatermarks{
> > extractTimestamp(e) = e,
> > checkAndGetNextWatermark(e, ts) = new Watermark(e)
> > })
> >
> > stream.keyBy(e -> e).window(...).print()
> > env.execute()
> >
> > This allows to quickly create a stream of event time for testing the
> > semantics.
> >
> > I hope this helps. Otherwise of course we can help you in finding the
> > answers to the remaining questions.
> >
> > Regards,
> > Timo
> >
> >
> >
> > On 10.12.19 20:32, M Singh wrote:
> >> Hi:
> >>
> >> I have a few questions about the side output late data.
> >>
> >> Here is the API
> >>
> >> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <-
> >> required: "assigner" [.trigger(...)] <- optional: "trigger" (else
> >> default trigger) [.evictor(...)] <- optional: "evictor" (else no
> >> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero)
> >> [.sideOutputLateData(...)] <- optional: "output tag" (else no side
> >> output for late data) .reduce/aggregate/fold/apply() <- required:
> >> "function" [.getSideOutput(...)] <- optional: "output tag"|
> >>
> >>
> >>
> >> Apache Flink 1.9 Documentation: Windows
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >>
> >>
> >>
> >> Apache Flink 1.9 Documentation: Windows
> >>
> >> <
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >>
> >> Here is the documentation:
> >>
> >>
> >>   Late elements
> >>
> >> considerations<
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/operators/windows.html#late-elements-considerations>
>
> >>
> >>
> >> When specifying an allowed lateness greater than 0, the window along
> >> with its content is kept after the watermark passes the end of the
> >> window. In these cases, when a late but not dropped element arrives,
> >> it could trigger another firing for the window. These firings are
> >> called |late firings|, as they are triggered by late events and in
> >> contrast to the |main firing| which is the first firing of the window.
> >> In case of session windows, late firings can further lead to merging
> >> of windows, as they may “bridge” the gap between two pre-existing,
> >> unmerged windows.
> >>
> >> Attention You should be aware that the elements emitted by a late
> >> firing should be treated as updated results of a previous computation,
> >> i.e., your data stream will contain multiple results for the same
> >> computation. Depending on your application, you 

Re: Order events by filed that does not represent time

2019-12-11 Thread David Anderson
Krzysztof,

Note that if you want to have Flink treat these sequence numbers as event
time timestamps, you probably can, so long as they are generally
increasing, and there's some bound on how out-of-order they can be.

The advantage to doing this is that you might be able to use Flink SQL's
event time sorting directly, rather than implementing something yourself.
To get this to work you will need to be able to specify watermarking --
which should be feasible, so long as there's some bound on the
out-of-orderness of the sequence numbers.

David

On Tue, Dec 10, 2019 at 5:09 PM KristoffSC 
wrote:

> Hi,
> Is it possible to use an field that does not represent timestamp to order
> events in Flink's pipeline?
>
> In other words, I will receive a stream of events that will ha a sequence
> number (gaps are possible).
> Can I maintain the order of those events based on this field same as I
> would
> do for time representing field?
>
> Regards,
> Krzysztof
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: Apache Flink - Retries for async processing

2019-12-11 Thread M Singh
 Thanks Zhu for your advice.  Mans
On Tuesday, December 10, 2019, 09:32:01 PM EST, Zhu Zhu  
wrote:  
 
 Hi M Singh,
I think you would be able to know the request failure cause and whether it is 
recoverable or not.You can handle the error as you like. For example, if you 
think the error is unrecoverable, you can complete the ResultFuture 
exceptionally to expose this failure to Flink framework. If the error is 
recoverable, you can just retry (or refresh the token), and only complete the 
ResultFuture until it succeeds (until timeout).
Thanks,Zhu Zhu
M Singh  于2019年12月10日周二 下午8:51写道:

 Thanks Jingsong for sharing your solution.
Since both refreshing the token and the actual API request can fail with either 
recoverable and unrecoverable exceptions, are there any patterns for retrying 
both and making the code robust to failures.
Thanks again.
On Monday, December 9, 2019, 10:08:39 PM EST, Jingsong Li 
 wrote:  
 
 Hi M Singh,
Our internal has this scenario too, as far as I know, Flink does not have this 
internal mechanism in 1.9 too.I can share my solution:- In async function, 
start a thread factory.- Send the call to thread factory when this call has 
failed. Do refresh security token too.Actually, deal with anything in function. 
As long as we finally call the relevant methods of ResultFuture.
Best,Jingsong Lee
On Tue, Dec 10, 2019 at 3:25 AM M Singh  wrote:

Hi Folks:
I am working on a project where I will be using Flink's async processing 
capabilities.  The job has to make http request using a token.  The token 
expires periodically and needs to be refreshed.
So, I was looking for patterns for handling async call failures and retries 
when the token expires.  I found this link Re: Backoff strategies for async IO 
functions? and it appears that Flink does not support retries and periodically 
refresh a security token.  I am using 1.6 at the moment but am planning to 
migrate to 1.9 soon.

| 
| 
|  | 
Re: Backoff strategies for async IO functions?


 |

 |

 |

 
If there are any patterns on how to deal with this scenario, please let me know.
Thanks
Mans



-- 
Best, Jingsong Lee  
  

Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread M Singh
 Thanks Timo for your answer.  I will try the prototype but was wondering if I 
can find some theoretical documentation to give me a sound understanding.
Mans
On Wednesday, December 11, 2019, 05:44:15 AM EST, Timo Walther 
 wrote:  
 
 Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:
> Hi Mans,
> 
> I would recommend to create a little prototype to answer most of your 
> questions in action.
> 
> You can simple do:
> 
> stream = env.fromElements(1L, 2L, 3L, 4L)
>     .assignTimestampsAndWatermarks(
>     new AssignerWithPunctuatedWatermarks{
>     extractTimestamp(e) = e,
>     checkAndGetNextWatermark(e, ts) = new Watermark(e)
>     })
> 
> stream.keyBy(e -> e).window(...).print()
> env.execute()
> 
> This allows to quickly create a stream of event time for testing the 
> semantics.
> 
> I hope this helps. Otherwise of course we can help you in finding the 
> answers to the remaining questions.
> 
> Regards,
> Timo
> 
> 
> 
> On 10.12.19 20:32, M Singh wrote:
>> Hi:
>>
>> I have a few questions about the side output late data.
>>
>> Here is the API
>>
>> |stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
>> required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
>> default trigger) [.evictor(...)] <- optional: "evictor" (else no 
>> evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
>> [.sideOutputLateData(...)] <- optional: "output tag" (else no side 
>> output for late data) .reduce/aggregate/fold/apply() <- required: 
>> "function" [.getSideOutput(...)] <- optional: "output tag"|
>>
>>
>>
>> Apache Flink 1.9 Documentation: Windows 
>> 
>>  
>>
>>
>>
>>
>>
>>     Apache Flink 1.9 Documentation: Windows
>>
>> 
>>  
>>
>>
>>
>> Here is the documentation:
>>
>>
>>   Late elements
>>      
>> considerations
>>  
>>
>>
>> When specifying an allowed lateness greater than 0, the window along 
>> with its content is kept after the watermark passes the end of the 
>> window. In these cases, when a late but not dropped element arrives, 
>> it could trigger another firing for the window. These firings are 
>> called |late firings|, as they are triggered by late events and in 
>> contrast to the |main firing| which is the first firing of the window. 
>> In case of session windows, late firings can further lead to merging 
>> of windows, as they may “bridge” the gap between two pre-existing, 
>> unmerged windows.
>>
>> Attention You should be aware that the elements emitted by a late 
>> firing should be treated as updated results of a previous computation, 
>> i.e., your data stream will contain multiple results for the same 
>> computation. Depending on your application, you need to take these 
>> duplicated results into account or deduplicate them.
>>
>>
>> Questions:
>>
>> 1. If we have allowed lateness to be greater than 0 (say 5), then if 
>> an event which arrives at window end + 3 (within allowed lateness),
>>      (a) it is considered late and  included in the window function as 
>> a late firing ?
>>      (b) Are the late firings under the control of the trigger ?
>>      (c) If there are may events like this - are there multiple window 
>> function invocations ?
>>      (d) Are these events (still within window end + allowed lateness) 
>> also emitted via the side output late data ?
>> 2. If an event arrives after the window end + allowed lateness -
>>      (a) Is it excluded from the window function but still emitted 
>> from the side output late data ?
>>      (b) And if it is emitted is there any attribute which indicates 
>> for which window it was a late event ?
>>      (c) Is there any time limit while the late side output remains 
>> active for a particular window or all late events channeled to it ?
>>
>> Thanks
>>
>> Thanks
>>
>> Mans
>>
>>
>>
>>
>>

  

Re: Localenvironment jobcluster ha High availability

2019-12-11 Thread Gary Yao
Hi Eric,

What you say should be possible because your job will be executed in a
MiniCluster [1] which has HA support. I have not tried this out myself,
and I am not aware that people are doing this in production. However,
there are integration tests that use MiniCluster + ZooKeeper [2].

Best,
Gary

[1]
https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
[2]
https://github.com/apache/flink/blob/eddad99123525211c900102206384dacaf8385fc/flink-tests/src/test/java/org/apache/flink/test/checkpointing/EventTimeWindowCheckpointingITCase.java

On Tue, Dec 3, 2019 at 7:21 PM Eric HOFFMANN <
eric.hoffman...@thalesdigital.io> wrote:

> Hi, i use a jobcluster (1 manager and 1 worker) in kubernetes for
> streaming application, i would like to have the lightest possible solution,
> is it possible to use a localenvironment (manager and worker embeded) and
> still have HA with zookeeper in this mode?, I mean kubernetes will restart
> the job, in the case of jobcluster, metadata are retrieve from zookeeper
> and data from S3 or hdfs, is this pattern the same in localenvironment ?
> Thx
> Eric
>
> This message contains confidential information and is intended only for
> the individual(s) addressed in the message. If you are not the named
> addressee, you should not disseminate, distribute, or copy this e-mail. If
> you are not the intended recipient, you are notified that disclosing,
> distributing, or copying this e-mail is strictly prohibited.
>


Scala case class TypeInformation and Serializer

2019-12-11 Thread 杨光
Hi, I'm working on write a flink stream job with scala api , how should I
find out which class is serialied by flink type serializer and which is
falled back to generic Kryo serializer.
And if one class falls back to Kryo serializer, how can I make some  extend
the TypeInfo classes of Flink or some other customisations to improve
performance.

below is some errors I got when I set disableGenericTypes,so I know if will
fall back to Kryo

Exception in thread "main" java.lang.UnsupportedOperationException: Generic
types have been disabled in the ExecutionConfig and type scala.Tuple2 is
treated as a generic type.
at
org.apache.flink.api.java.typeutils.GenericTypeInfo.createSerializer(GenericTypeInfo.java:86)
at
org.apache.flink.streaming.api.graph.StreamGraph.addOperator(StreamGraph.java:258)
at
org.apache.flink.streaming.api.graph.StreamGraphGenerator.transformOneInputTransform(StreamGraphGenerator.java:649)


Re: Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Timo Walther

At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:

Hi Timo,

Thanks for the bug reference.

You mentioned that this bug has been fixed. Is the fix available for 
flink 1.9+ and default query planner.


Thanks and regards,
/Arujit/

On Wed, Dec 11, 2019 at 3:56 PM Timo Walther > wrote:


I remember that we fixed some bug around this topic recently. The
legacy
planner should not be affected.

There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040

Regards,
Timo

On 11.12.19 10:34, Dawid Wysakowicz wrote:
 > Hi Arujit,
 >
 > Could you also share the query where you use this UDF? It would also
 > help if you said which version of Flink you are using and which
planner.
 >
 > Best,
 >
 > Dawid
 >
 > On 11/12/2019 10:21, Arujit Pradhan wrote:
 >> Hi all,
 >>
 >> So we are creating some User Defined Functions of type
 >> AggregateFunction. And we want to send some static metrics from the
 >> *open()* method of the UDFs as we can get *MetricGroup *by
 >> *FunctionContext *which is only exposed in the open method. Our
code
 >> looks something like this(Which is an implementation of count
distinct
 >> in SQL) :
 >>
 >> public class DistinctCount extends AggregateFunction> DistinctCountAccumulator> { @Override public
DistinctCountAccumulator
 >> createAccumulator() { return new DistinctCountAccumulator(); }
 >> @Override public void open(FunctionContext context) throws
Exception { super.open(context); MetricGroup metricGroup =
context.getMetricGroup(); // add some metric to the group here
 >> System.out.println("in the open of UDF"); } @Override public void
 >> close() throws Exception { super.close(); } @Override public
Integer
 >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf"); return
distinctCountAccumulator.count(); } public void
accumulate(DistinctCountAccumulator distinctCountAccumulator, String
item) { if (item== null) { return; }
distinctCountAccumulator.add(item); } }
 >>
 >> But when we use this UDF in FlinkSQL, it seems like the open
method is
 >> not being called at all.
 >>
 >> From the filnk UDF documentation we find :
 >>
 >> *The |open()| method is called once before the evaluation
method. The
 >> |close()| method after the last call to the evaluation method.*
 >>
 >> *The |open()| method provides a |FunctionContext| that contains
 >> information about the context in which user-defined functions are
 >> executed, such as the metric group, the distributed cache files, or
 >> the global job parameters.*
 >>
 >> Then is there any reason that open is not working in
 >> AggragateFunctions. Btw it works fine in case of
ScalarFunctions. Is
 >> there any alternative scope where we can register some static
metrics
 >> in a UDF.
 >>
 >>
 >> Thanks and regards,
 >> /Arujit/
 >>





Re: Order events by filed that does not represent time

2019-12-11 Thread Timo Walther

Hi Krzysztof,

first of all Flink does not sort events based on timestamp. The concept 
of watermarks just postpones the triggering of a time operation until 
the watermark says all events until a time t have arrived.


For your problem, you can simply use a ProcessFunction and buffer the 
events in state until some condition is met. Once the condition is met, 
you sort the data and emit what is allowed to be emitted.


You can also take a look at how Flink SQL's event time sort is 
implemented. Maybe not the easiest implementation but useful for 
understanding the concepts of time and state.


org.apache.flink.table.runtime.aggregate.RowTimeSortProcessFunction

I hope this helps.

Timo



On 10.12.19 17:15, KristoffSC wrote:

Hi,
Is it possible to use an field that does not represent timestamp to order
events in Flink's pipeline?

In other words, I will receive a stream of events that will ha a sequence
number (gaps are possible).
Can I maintain the order of those events based on this field same as I would
do for time representing field?

Regards,
Krzysztof



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread Timo Walther

Little mistake: The key must be any constant instead of `e`.


On 11.12.19 11:42, Timo Walther wrote:

Hi Mans,

I would recommend to create a little prototype to answer most of your 
questions in action.


You can simple do:

stream = env.fromElements(1L, 2L, 3L, 4L)
    .assignTimestampsAndWatermarks(
    new AssignerWithPunctuatedWatermarks{
    extractTimestamp(e) = e,
    checkAndGetNextWatermark(e, ts) = new Watermark(e)
    })

stream.keyBy(e -> e).window(...).print()
env.execute()

This allows to quickly create a stream of event time for testing the 
semantics.


I hope this helps. Otherwise of course we can help you in finding the 
answers to the remaining questions.


Regards,
Timo



On 10.12.19 20:32, M Singh wrote:

Hi:

I have a few questions about the side output late data.

Here is the API

|stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
default trigger) [.evictor(...)] <- optional: "evictor" (else no 
evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
[.sideOutputLateData(...)] <- optional: "output tag" (else no side 
output for late data) .reduce/aggregate/fold/apply() <- required: 
"function" [.getSideOutput(...)] <- optional: "output tag"|




Apache Flink 1.9 Documentation: Windows 
 






    Apache Flink 1.9 Documentation: Windows

 




Here is the documentation:


  Late elements
  
considerations 



When specifying an allowed lateness greater than 0, the window along 
with its content is kept after the watermark passes the end of the 
window. In these cases, when a late but not dropped element arrives, 
it could trigger another firing for the window. These firings are 
called |late firings|, as they are triggered by late events and in 
contrast to the |main firing| which is the first firing of the window. 
In case of session windows, late firings can further lead to merging 
of windows, as they may “bridge” the gap between two pre-existing, 
unmerged windows.


Attention You should be aware that the elements emitted by a late 
firing should be treated as updated results of a previous computation, 
i.e., your data stream will contain multiple results for the same 
computation. Depending on your application, you need to take these 
duplicated results into account or deduplicate them.



Questions:

1. If we have allowed lateness to be greater than 0 (say 5), then if 
an event which arrives at window end + 3 (within allowed lateness),
     (a) it is considered late and  included in the window function as 
a late firing ?

     (b) Are the late firings under the control of the trigger ?
     (c) If there are may events like this - are there multiple window 
function invocations ?
     (d) Are these events (still within window end + allowed lateness) 
also emitted via the side output late data ?

2. If an event arrives after the window end + allowed lateness -
     (a) Is it excluded from the window function but still emitted 
from the side output late data ?
     (b) And if it is emitted is there any attribute which indicates 
for which window it was a late event ?
     (c) Is there any time limit while the late side output remains 
active for a particular window or all late events channeled to it ?


Thanks

Thanks

Mans









Re: Apache Flink - Clarifications about late side output

2019-12-11 Thread Timo Walther

Hi Mans,

I would recommend to create a little prototype to answer most of your 
questions in action.


You can simple do:

stream = env.fromElements(1L, 2L, 3L, 4L)
   .assignTimestampsAndWatermarks(
   new AssignerWithPunctuatedWatermarks{
   extractTimestamp(e) = e,
   checkAndGetNextWatermark(e, ts) = new Watermark(e)
   })

stream.keyBy(e -> e).window(...).print()
env.execute()

This allows to quickly create a stream of event time for testing the 
semantics.


I hope this helps. Otherwise of course we can help you in finding the 
answers to the remaining questions.


Regards,
Timo



On 10.12.19 20:32, M Singh wrote:

Hi:

I have a few questions about the side output late data.

Here is the API

|stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- 
required: "assigner" [.trigger(...)] <- optional: "trigger" (else 
default trigger) [.evictor(...)] <- optional: "evictor" (else no 
evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) 
[.sideOutputLateData(...)] <- optional: "output tag" (else no side 
output for late data) .reduce/aggregate/fold/apply() <- required: 
"function" [.getSideOutput(...)] <- optional: "output tag"|




Apache Flink 1.9 Documentation: Windows 






Apache Flink 1.9 Documentation: Windows




Here is the documentation:


  Late elements
  
considerations

When specifying an allowed lateness greater than 0, the window along 
with its content is kept after the watermark passes the end of the 
window. In these cases, when a late but not dropped element arrives, it 
could trigger another firing for the window. These firings are called 
|late firings|, as they are triggered by late events and in contrast to 
the |main firing| which is the first firing of the window. In case of 
session windows, late firings can further lead to merging of windows, as 
they may “bridge” the gap between two pre-existing, unmerged windows.


Attention You should be aware that the elements emitted by a late firing 
should be treated as updated results of a previous computation, i.e., 
your data stream will contain multiple results for the same computation. 
Depending on your application, you need to take these duplicated results 
into account or deduplicate them.



Questions:

1. If we have allowed lateness to be greater than 0 (say 5), then if an 
event which arrives at window end + 3 (within allowed lateness),
     (a) it is considered late and  included in the window function as a 
late firing ?

     (b) Are the late firings under the control of the trigger ?
     (c) If there are may events like this - are there multiple window 
function invocations ?
     (d) Are these events (still within window end + allowed lateness) 
also emitted via the side output late data ?

2. If an event arrives after the window end + allowed lateness -
     (a) Is it excluded from the window function but still emitted from 
the side output late data ?
     (b) And if it is emitted is there any attribute which indicates for 
which window it was a late event ?
     (c) Is there any time limit while the late side output remains 
active for a particular window or all late events channeled to it ?


Thanks

Thanks

Mans









Re: Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Timo Walther
I remember that we fixed some bug around this topic recently. The legacy 
planner should not be affected.


There is another user reporting this:
https://issues.apache.org/jira/browse/FLINK-15040

Regards,
Timo

On 11.12.19 10:34, Dawid Wysakowicz wrote:

Hi Arujit,

Could you also share the query where you use this UDF? It would also 
help if you said which version of Flink you are using and which planner.


Best,

Dawid

On 11/12/2019 10:21, Arujit Pradhan wrote:

Hi all,

So we are creating some User Defined Functions of type 
AggregateFunction. And we want to send some static metrics from the 
*open()* method of the UDFs as we can get *MetricGroup *by 
*FunctionContext *which is only exposed in the open method. Our code 
looks something like this(Which is an implementation of count distinct 
in SQL) :


public class DistinctCount extends AggregateFunctionDistinctCountAccumulator> { @Override public DistinctCountAccumulator 
createAccumulator() { return new DistinctCountAccumulator(); } 
@Override public void open(FunctionContext context) throws Exception { super.open(context); MetricGroup metricGroup = context.getMetricGroup(); // add some metric to the group here 
System.out.println("in the open of UDF"); } @Override public void 
close() throws Exception { super.close(); } @Override public Integer 
getValue(DistinctCountAccumulator distinctCountAccumulator) { System.out.println("in the udf"); return distinctCountAccumulator.count(); } public void accumulate(DistinctCountAccumulator distinctCountAccumulator, String item) { if (item== null) { return; } distinctCountAccumulator.add(item); } }


But when we use this UDF in FlinkSQL, it seems like the open method is 
not being called at all.


From the filnk UDF documentation we find :

*The |open()| method is called once before the evaluation method. The 
|close()| method after the last call to the evaluation method.*


*The |open()| method provides a |FunctionContext| that contains 
information about the context in which user-defined functions are 
executed, such as the metric group, the distributed cache files, or 
the global job parameters.*


Then is there any reason that open is not working in 
AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is 
there any alternative scope where we can register some static metrics 
in a UDF.



Thanks and regards,
/Arujit/





Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-11 Thread Timo Walther
1. Yes, methods will only be called by one thread. The FLink API aims to 
abstract all concurrency topics away when using the provided methods and 
state.


2. The open() method should always be the first method being called. If 
this is not the case, this is definitely a bug. Which Flink version are 
you using? If it is 1.9, could you verify the behavior with 1.8? The 
community recently simplified the architeture under the hood.


Thanks for your feedback.

Regards,
Timo

On 11.12.19 10:41, KristoffSC wrote:

Hi Vino,
Thank you for your response and provided links.

So just to clarify and small follow up.

1. Methods will be called only by one thread right?

2. The links you provided are tackling a case when we got a "fast stream"
element before we received broadcast stream element. In my case we had
Broadcast element first, before we got any "fast stream" element. Because
open method was not called (I've observed it will be called only before
first processElement method call, so before processing the first "fast
stream" element) we don't have the state descriptor which would be
initialized in open method. So we actually cannot "store/process" this
broadcast element in our broadcast state.


  @Override
 public void open(Configuration parameters) throws Exception {
 super.open(parameters);
 processingRulesDesc = new MapStateDescriptor<>(
 "RulesBroadcastState",
 Types.VOID,
 TypeInformation.of(new TypeHint() {
 }));


 }

In this case, bcState  will be null since open method was not yet called.
  public void processBroadcastElement(ProcessingRule rule, Context ctx,
Collector out) throws Exception {
 // store the new pattern by updating the broadcast state
 BroadcastState bcState =
ctx.getBroadcastState(processingRulesDesc);
 bcState.put(null, rule);
 }






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Need help using AggregateFunction instead of FoldFunction

2019-12-11 Thread Arvid Heise
Hi Devin,

for event-time based windows, you need to give Flink two types of
information:
- timestamp of records, which I assume is in your case already embedded
into the Pulsar records
- and a watermark assigner.

The watermarks help Flink to determine when windows can be closed in
respect to out-of-order and late events. This is highly usecase-specific
and cannot usually be inferred automatically. So you need to specify a
watermark assigner for event time windows to work. Pulsar offers a similar
API to Kafka, so that you can simply refer to the respective documentation
[1]. The other sections of this page give you a more general overview of
the options, which may be interesting for future use cases where you want
to aggregate event time-based records.

Best,

Arvid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/event_timestamps_watermarks.html#timestamps-per-kafka-partition

On Tue, Dec 10, 2019 at 9:45 PM Devin Bost  wrote:

> I did confirm that I got no resulting output after 20 seconds and after
> sending additional data after waiting over a minute between batches of
> data.
>
> My code looks like this:
>
> PulsarSourceBuilder builder = PulsarSourceBuilder
>   .builder(new SimpleStringSchema())
>   .serviceUrl(SERVICE_URL)
>   .topic(INPUT_TOPIC)
>   .subscriptionName(SUBSCRIPTION_NAME);
> SourceFunction src = builder.build();
> DataStream dataStream = env.addSource(src);
>
> DataStream combinedEnvelopes = dataStream
>   .map(new MapFunction>() {
>  @Override
>  public Tuple2 map(String incomingMessage) throws Exception {
> return mapToTuple(incomingMessage);
>  }
>   })
>   .keyBy(0)
>   //.timeWindow(Time.seconds(5))
>   .window(EventTimeSessionWindows.withGap(Time.seconds(5)))
>   .aggregate(new JsonConcatenator());
> //dataStream.print();
>
> Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
> logger.info("Ran dataStream. Adding sink next");
> combinedEnvelopes.addSink(new FlinkPulsarProducer<>(
>   SERVICE_URL,
>   OUTPUT_TOPIC,
>   new AuthenticationDisabled(), // probably need to fix //  
> AuthenticationTls()
>   combinedData -> combinedData.toString().getBytes(UTF_8),
>   combinedData -> "test")
> );
> logger.info("Added sink. Executing job.");
> // execute program
> env.execute("Flink Streaming Java API Skeleton");
>
>
> Here is the JsonConcatenator class:
>
> private static class JsonConcatenator
>   implements AggregateFunction, Tuple2 String>, String> {
>Logger logger = LoggerFactory.getLogger(SplinklerJob.class);
>@Override
>public Tuple2 createAccumulator() {
>   return new Tuple2("","");
>}
>
>@Override
>public Tuple2 add(Tuple2 value, 
> Tuple2 accumulator) {
>   logger.info("Running Add on value.f0: " + value.f0 + " and value.f1: " 
> + value.f1);
>   return new Tuple2<>(value.f0, accumulator.f1 + ", " + value.f1);
>}
>
>@Override
>public String getResult(Tuple2 accumulator) {
>   logger.info("Running getResult on accumulator.f1: " + accumulator.f1);
>   return "[" + accumulator.f1.substring(1) + "]";
>}
>
>@Override
>public Tuple2 merge(Tuple2 a, 
> Tuple2 b) {
>   // Merge is applied when you allow lateness.
>   logger.info("Running merge on (a.f0: " + a.f0 + " and a.f1: " + a.f1 + 
> " and b.f1: " + b.f1);
>   if(b.f1.charAt(0) == '['){
>  logger.info("During merge, we detected the right message starts with 
> the '[' character. Removing it.");
>  b.f1 = b.f1.substring(1);
>   }
>   return new Tuple2<>(a.f0, a.f1 + ", " + b.f1);
>}
> }
>
>
> Devin G. Bost
>
> Re:
>
> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>> Best,
>> Arvid
>
>
> On Tue, Dec 10, 2019 at 3:22 AM Arvid Heise  wrote:
>
>> getResult will only be called when the window is triggered. For a
>> fixed-time window, it triggers at the end of the window.
>>
>> However, for EventTimeSessionWindows you need to have gaps in the data.
>> Can you verify that there is actually a 20sec pause inbetween data points
>> for your keys?
>> Additionally, it may also be an issue with extracting the event time from
>> the sources. Could you post the relevant code as well?
>>
>> Best,
>>
>> Arvid
>>
>> On Mon, Dec 9, 2019 at 8:51 AM vino yang  wrote:
>>
>>> Hi dev,
>>>
>>> The time of the window may have different semantics.
>>> In the session window, it's only a time gap, the size of the window is
>>> driven via activity events.
>>> In the tumbling or sliding window, it means the size of the window.
>>>
>>> For more 

Re: Flink 'Job Cluster' mode Ui Access

2019-12-11 Thread Chesnay Schepler

Would it be possible for you to provide us with full debug log file?

On 10/12/2019 18:07, Jatin Banger wrote:

Yes, I did.

On Tue, Dec 10, 2019 at 3:47 PM Arvid Heise > wrote:


Hi Jatin,

just to be sure. Did you increase the log level to debug [1]
before checking for *StaticFileServerHandler*?

Best,

Arvid

[1]

https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/logging.html#configuring-log4j

On Mon, Dec 9, 2019 at 7:54 AM Jatin Banger
mailto:bangerjatinrm...@gmail.com>>
wrote:

Hi,

I have checked the logs with this keyword
*StaticFileServerHandler *in it, But there were no logs coming
for "Flink Job Cluster".
Then i checked for Flink Session Cluster, i was able to find
the logs for the *StaticFileServerHandler *keyword.

Can i raise this as bug ?

Best Regards,
Jatin


On Thu, Dec 5, 2019 at 8:59 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

Ok, it's good to know that the WebUI files are there.

Please enable DEBUG logging and try again, searching for
messages from the StaticFileServerHandler.

This handler logs every file that is requested (which
effectively happens when the WebUI is being served); let's
see what is actually being requested.

On 05/12/2019 05:57, Jatin Banger wrote:

I have tried that already using
'$FLINK_HOME/bin/jobmanager.sh" start-foreground
Ui comes fine with this one.
Which means web/index.html is present.


On Wed, Dec 4, 2019 at 9:01 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

hmm...this is quite odd.

Let's try to narrow things down a bit.

Could you try starting a local cluster (using the
same distribution) and checking whether the UI is
accessible?

Could you also check whether the flink-dist.jar in
/lib contains web/index.html?

On 04/12/2019 06:02, Jatin Banger wrote:

Hi,

I am using flink binary directly.

I am using this command to deploy the script.

"$FLINK_HOME/bin/standalone-job.sh" start-foreground
--job-classname ${ARGS_FOR_JOB}
where ARGS_FOR_JOB contain job class name and all
other necessary details needed by the job.

Best regards,
Jatin


On Fri, Nov 29, 2019 at 4:18 PM Chesnay Schepler
mailto:ches...@apache.org>> wrote:

To clarify, you ran "mvn package -pl flink-dist
-am" to build Fink?

If so, could you run that again and provide us
with the maven output?
|
|
On 29/11/2019 11:23, Jatin Banger wrote:

Hi,

@vino yang   I am
using flink 1.8.1

I am using the following procedure for the
deployment:

https://github.com/apache/flink/blob/master/flink-container/docker/README.md

And i tried accessing the path you mentioned:

# curl :4081/#/overview
{"errors":["Not found."]}

Best Regards,
Jatin

On Thu, Nov 28, 2019 at 10:21 PM Chesnay
Schepler mailto:ches...@apache.org>> wrote:

Could you try accessing :/#/overview ?

The REST API is obviously accessible, and
hence the WebUI should be too.

How did you setup the session cluster? Are
you using some custom Flink build or
something, which potentially excluded
flink-runtime-web from the classpath?

On 28/11/2019 10:02, Jatin Banger wrote:

Hi,

I checked the log file there is no error.
And I checked the pods internal ports by
using rest api.

# curl : 4081
{"errors":["Not found."]}
4081 is the Ui port

# curl :4081/config
{"refresh-interval":3000,"timezone-name":"Coordinated
Universal

Time","timezone-offset":0,"flink-version":"","flink-revision":"ceba8af
@ 11.02.2019 @ 22:17:09 CST"}

# curl :4081/jobs

Re: Processing Events by custom rules kept in Broadcast State

2019-12-11 Thread Timo Walther

Hi,

I think when it comes to the question "What data type should I put in 
state?", this question should usually be answered with a well-defined 
data structure that allows for future state upgrades. Like defining a 
database schema. So I would not put "arbirary" classes such as Jackson's 
ObjectNode in there.


Putting a JSON string or an object like you RuleParams into state 
depends on the performance. If the JSON format changes frequently, it 
might be better to just store string there. But reparsing might be 
expensive too so keeping the transient variable for broadcast state as a 
cache should work.


Regards,
Timo

On 11.12.19 04:21, vino yang wrote:

Hi KristoffSC,

It seems the main differences are when to parse your rules and what 
could be put into the broadcast state.


IMO, multiple solutions all can take effect. I prefer option 3. I'd like 
to parse the rules ASAP and let them be real rule event stream (not 
ruleset stream) in the source. Then doing the real parse in the 
processBroadcastElement.


In short, it's my personal opinion.

Best,
Vino

KristoffSC > 于2019年12月11日周三 上午6:26写道:


Hi,
I think this would be the very basic use case for Broadcast State
Pattern
but I would like to know what are the best approaches to solve this
problem.

I have an operator that extends BroadcastProcessFunction. The
brodcastElement is an element sent as Json format message by Kafka. It
describes a processing rules like key/value mapping, like so: ruleName -
ruleValue (both strings).

In processElement method I'm delegating to my custom
RuleEngineService. It
is a class that has the "rule engine" logic and accepts received
event and
"set of processing rules" in some form.

What would be the best approaches:
1. Keep original Json String in broadcast state. Whenever there is a
new set
of rules streamed by Kafka, then in processBroadcastElement method parse
this Json, map to some RuleParams abstraction and keep it as
transient field
in my BroadcastProcessFunction operator. Save Json in broadcast
state. Pass
RuleParams to rule engine service.

2. Same as 1 but instead keeping Raw Json String in broadcast state,
keep
already parsed JsonObject, somethign like ObjectNode from KafkaConnector
lib.

3. Keep each pair of ruleName - ruleValue (both strings) separate in
broadcast state. In processBrodcastElement method parse the received
Json
and update the state. In processElement method take all rules, build
RulePArams object (basically a map) and pass them to rule engine

4. Parse Json in processBroadcastElement method, map it to RuleParams
abstraction method, keeping rules in a hashMap and keep this
RulePrams in
broadcast state

5. any other...





--
Sent from:
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/





Re: Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Dawid Wysakowicz
Hi Arujit,

Could you also share the query where you use this UDF? It would also
help if you said which version of Flink you are using and which planner.

Best,

Dawid

On 11/12/2019 10:21, Arujit Pradhan wrote:
> Hi all,
>
> So we are creating some User Defined Functions of type
> AggregateFunction. And we want to send some static metrics from the
> *open()* method of the UDFs as we can get *MetricGroup *by
> *FunctionContext *which is only exposed in the open method. Our code
> looks something like this(Which is an implementation of count distinct
> in SQL) :
>
> public class DistinctCount extends AggregateFunction DistinctCountAccumulator> { @Override public DistinctCountAccumulator
> createAccumulator() { return new DistinctCountAccumulator(); }
> @Override public void open(FunctionContext context) throws Exception { 
> super.open(context); MetricGroup metricGroup = context.getMetricGroup(); // 
> add some metric to the group here
> System.out.println("in the open of UDF"); } @Override public void
> close() throws Exception { super.close(); } @Override public Integer
> getValue(DistinctCountAccumulator distinctCountAccumulator) { 
> System.out.println("in the udf"); return distinctCountAccumulator.count(); } 
> public void accumulate(DistinctCountAccumulator distinctCountAccumulator, 
> String item) { if (item == null) { return; } 
> distinctCountAccumulator.add(item); } }
>
> But when we use this UDF in FlinkSQL, it seems like the open method is
> not being called at all.
>
> From the filnk UDF documentation we find :
>
> *The |open()| method is called once before the evaluation method. The
> |close()| method after the last call to the evaluation method.*
>
> *The |open()| method provides a |FunctionContext| that contains
> information about the context in which user-defined functions are
> executed, such as the metric group, the distributed cache files, or
> the global job parameters.*
>
> Then is there any reason that open is not working in
> AggragateFunctions. Btw it works fine in case of ScalarFunctions. Is
> there any alternative scope where we can register some static metrics
> in a UDF.
>
>
> Thanks and regards,
> /Arujit/
>


signature.asc
Description: OpenPGP digital signature


Re: Thread access and broadcast state initialization in BroadcastProcessFunction

2019-12-11 Thread KristoffSC
Hi Vino,
Thank you for your response and provided links.

So just to clarify and small follow up.

1. Methods will be called only by one thread right?

2. The links you provided are tackling a case when we got a "fast stream"
element before we received broadcast stream element. In my case we had
Broadcast element first, before we got any "fast stream" element. Because
open method was not called (I've observed it will be called only before
first processElement method call, so before processing the first "fast
stream" element) we don't have the state descriptor which would be
initialized in open method. So we actually cannot "store/process" this
broadcast element in our broadcast state.


 @Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processingRulesDesc = new MapStateDescriptor<>(
"RulesBroadcastState",
Types.VOID,
TypeInformation.of(new TypeHint() {
}));


}

In this case, bcState  will be null since open method was not yet called.
 public void processBroadcastElement(ProcessingRule rule, Context ctx,
Collector out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState bcState =
ctx.getBroadcastState(processingRulesDesc);
bcState.put(null, rule);
}






--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Request for removal from subscription

2019-12-11 Thread Tom Blackwood
Please send a message to: user-unsubscr...@flink.apache.org for
unsubscribing.

On Wed, Dec 11, 2019 at 1:39 PM L Jainkeri, Suman (Nokia - IN/Bangalore) <
suman.l_jaink...@nokia.com> wrote:

> Unsubscribe
>


Open Method is not being called in case of AggregateFunction UDFs

2019-12-11 Thread Arujit Pradhan
Hi all,

So we are creating some User Defined Functions of type AggregateFunction.
And we want to send some static metrics from the *open()* method of the
UDFs as we can get *MetricGroup *by *FunctionContext *which is only exposed
in the open method. Our code looks something like this(Which is an
implementation of count distinct in SQL) :

public class DistinctCount extends AggregateFunction {
@Override
public DistinctCountAccumulator createAccumulator() {
return new DistinctCountAccumulator();
}

@Override
public void open(FunctionContext context) throws Exception {
super.open(context);
MetricGroup metricGroup = context.getMetricGroup();
// add some metric to the group here
System.out.println("in the open of UDF");
}

@Override
public void close() throws Exception {
super.close();
}

@Override
public Integer getValue(DistinctCountAccumulator distinctCountAccumulator) {
System.out.println("in the udf");
return distinctCountAccumulator.count();
}

public void accumulate(DistinctCountAccumulator
distinctCountAccumulator, String item) {
if (item == null) {
return;
}
distinctCountAccumulator.add(item);
}
}


But when we use this UDF in FlinkSQL, it seems like the open method is not
being called at all.

>From the filnk UDF documentation we find :

*The open() method is called once before the evaluation method. The close()
method after the last call to the evaluation method.*

*The open() method provides a FunctionContext that contains information
about the context in which user-defined functions are executed, such as the
metric group, the distributed cache files, or the global job parameters.*
Then is there any reason that open is not working in AggragateFunctions.
Btw it works fine in case of ScalarFunctions. Is there any alternative
scope where we can register some static metrics in a UDF.


Thanks and regards,
*Arujit*


Re: Flink ML feature

2019-12-11 Thread Till Rohrmann
Hi guys,

it is true that we dropped Flink-ML with 1.9. The reason is that the
community started working on a new ML library which you can find under
flink-ml-parent [1]. This module contains the framework for building ML
pipelines but not yet too many algorithms iirc. The plan is to extend this
library with algorithms from Alink in the near future to grow Flink's
machine learning library.

[1] https://github.com/apache/flink/tree/master/flink-ml-parent

Cheers,
Till

On Wed, Dec 11, 2019 at 3:42 AM vino yang  wrote:

> Hi Benoit,
>
> I can only try to ping @Till Rohrmann  @Kurt Young
>   who may know more information to answer this question.
>
> Best,
> Vino
>
> Benoît Paris  于2019年12月10日周二 下午7:06写道:
>
>> Is there any information as to whether Alink is going to be contributed
>> to Apache Flink as the official ML Lib?
>>
>>
>> On Tue, Dec 10, 2019 at 7:11 AM vino yang  wrote:
>>
>>> Hi Chandu,
>>>
>>> AFAIK, there is a project named Alink[1] which is the Machine Learning
>>> algorithm platform based on Flink, developed by the PAI team of Alibaba
>>> computing platform. FYI
>>>
>>> Best,
>>> Vino
>>>
>>> [1]: https://github.com/alibaba/Alink
>>>
>>> Tom Blackwood  于2019年12月10日周二 下午2:07写道:
>>>
 You may try Spark ML, which is a production ready library for ML stuff.

 regards.

 On Tue, Dec 10, 2019 at 1:04 PM chandu soa  wrote:

> Hello Community,
>
> Can you please give me some pointers for implementing Machine Learning
> using Flink.
>
> I see Flink ML libraries were dropped in v1.9. It looks like ML
> feature in Flink going to be enhanced.
>
> What is the recommended approach for implementing production grade ML
> based apps using Flink? v1.9 is ok?or should wait for 1.10?
>
> Thanks,
> Chandu
>

>>
>> --
>> Benoît Paris
>> Ingénieur Machine Learning Explicable
>> Tél : +33 6 60 74 23 00
>> http://benoit.paris
>> http://explicable.ml
>>
>


Re: Interval Join Late Record Metrics

2019-12-11 Thread Congxian Qiu
Hi Chris

>From the code[1], currently, IntervalJoin will ignore the late data
silently, maybe you can create an issue to track this.

[1]
https://github.com/apache/flink/blob/5c89d12849ea2aa332126b32808e363f12d436a2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/IntervalJoinOperator.java#L222
Best,
Congxian


Chris Gillespie  于2019年12月11日周三 上午8:09写道:

> Hello Flink users, first time poster here.
>
> I'm using an interval join in my Flink project, however I haven't found
> where late records get logged in metrics. Window Joins have
> "numLateRecordsDropped" implemented
> ,
> but is there an equivalent within an interval join?
>
> My main use case is to track how often a record falls outside of the lower
> and upper bounds when trying to join two streams. Interval Join looks like
> it simply short circuits
> when
> there is a late record? Maybe I am not understanding what defines a late
> record in this situation.
>
> Is there a good way to monitor when an interval join fails to join two
> streams? Currently I'm looking at the delta between two operator metrics,
> but it hasn't looked that reliable so far.
>
> Thanks,
> Chris Gillespie
>


Re:回复:窗口去重

2019-12-11 Thread yanggang_it_job
我觉得可以这样处理:1:首先把你的stream流注册为表(不管是一个还是多个stream)2:然后对这个表使用FLINKSQL进行业务表达3:最后使用FLINK
 
SQL提供的开窗函数指定想要去重的字段注意:控制state的大小参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/sql.html#deduplication
在 2019-12-11 15:53:00,"Jimmy Wong"  写道:
>属于不同的window,是window内去重,window间不去重
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2019年12月11日 12:08,梁溪 写道:
>去重了为什么还会有两个2
>
>
>
>
>| |
>梁溪
>|
>|
>邮箱:lx_la...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2019年12月11日 11:19,Jimmy Wong 写道:
>Hi, Yuan,Youjun 谢谢。 你这种方案是 SQL 的角度吧,如果用 DataStream 算子要怎么处理呢?
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>
>
>在2019年12月11日 09:04,Yuan,Youjun 写道:
>第一种情况,用firstvalue这种聚合函数; 第二种情况,用min聚合函数,然后group by id,是不是就是你要的结果?
>
>-邮件原件-
>发件人: Jimmy Wong 
>发送时间: Tuesday, December 10, 2019 4:40 PM
>收件人: user-zh@flink.apache.org
>主题: 窗口去重
>
>Hi,All:
>请教一个问题,现在有个实时场景:需要对每 5 分钟内数据进行去重,然后 Sink。
>比如:
>数据
>{ts: 2019-12-10 16:24:00 id: 1}
>{ts: 2019-12-10 16:22:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:21:00 id: 1}
>{ts: 2019-12-10 16:29:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>{ts: 2019-12-10 16:26:00 id: 2}
>
>
>第一种情景,不考虑时间去重,结果如下:
>{ts: 2019-12-10 16:24:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:29:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>
>
>第二种情景,考虑时间去重,结果如下:
>{ts: 2019-12-10 16:21:00 id: 1}
>{ts: 2019-12-10 16:23:00 id: 2}
>{ts: 2019-12-10 16:26:00 id: 2}
>{ts: 2019-12-10 16:27:00 id: 3}
>
>
>请教下,对于上面两种情景,分别有什么高效实时的解决方案么, 谢谢?我想了一下用 5min 窗口,和 ProcessWindowFunction 
>可以解决,但是 ProcessWindowFunction 要缓存 5min 的窗口数据,但是有延迟。
>
>
>
>
>| |
>Jimmy Wong
>|
>|
>wangzmk...@163.com
>|
>签名由网易邮箱大师定制
>