Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 Thread Benchao Li
Hi Peihui,

正如Jark所说,FLINK-18002正是想解决这个问题,可以指定任意一个JsonNode为varchar类型。

当然,这个feature不能解决所有问题,比如你有一个字段,内容不太确定,而且也不需要额外处理,
主要是想保留这个字段,下游输出json的时候仍然还是这个字段。
如果用FLINK-18002的思路,输出到下游的时候,会把这部分数据整体作为一个json string,所以
从结果上来看,*还不能完全做到原封不动的输出到下游*。

不知道后面这个场景是不是你面对的场景。如果是的话,我们目前有两个思路解决这个问题:
1. 用RAW类型,这个需要json node类型对于flink来讲,都是可以序列化的
2. 用BINARY类型,因为现在已经有了对BINARY类型的处理,所以还需要额外加一个选项来指定对于BINARY类型
  的处理模式。我们可以把任意json node转成它的json字符串表达形式,再转成byte[]进行中间的传输和处理;在
  序列化的时候,再直接通过这个byte[]数据构造一个json node(这样可以保证它跟原来的json node一模一样)。

Jark Wu  于2020年7月10日周五 下午12:22写道:

> 社区有个 issue 正在解决这个问题,可以关注一下
> https://issues.apache.org/jira/browse/FLINK-18002
>
> Best,
> Jark
>
> On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:
>
> > Hi, Peihui
> >
> > 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format
> 的解析的底层实现
> > 就是按照json的标准格式解析(jackson)的,没法将一个
> > jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> > 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
> >
> > 一种做法是定义复杂的jsonObject对应的ROW
> > 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> > 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> > 然后query里用UDTF处理。
> >
> >
> > 祝好
> > Leonard Xu
> >
> >
> >
> >
> > > 在 2020年7月10日,10:16,Peihui He  写道:
> > >
> > > Hello,
> > >
> > >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> > >
> > > Best wishes.
> > >
> > > Peihui He  于2020年7月10日周五 上午10:12写道:
> > >
> > >> Hello,
> > >>
> > >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> > >>
> > >>
> > >> Best wishes.
> > >>
> > >> LakeShen  于2020年7月10日周五 上午10:03写道:
> > >>
> > >>> Hi Peihui,
> > >>>
> > >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> > >>>
> > >>> {
> > >>>"a":"b",
> > >>>"c":{
> > >>>"d":"e",
> > >>>"g":"f"
> > >>>}
> > >>> },
> > >>>
> > >>> 那么在 kafka table source 可以使用 row 来定义:
> > >>>
> > >>> create table xxx (
> > >>> a varchar,
> > >>> c row
> > >>> )
> > >>>
> > >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> > >>>
> > >>> Best,
> > >>> LakeShen
> > >>>
> > >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> > >>>
> >  Hello:
> > 
> > 在用flink
> > sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> > 
> >  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> > 
> > 
> >  Best wishes.
> > 
> > >>>
> > >>
> >
> >
>


-- 

Best,
Benchao Li


回复:flink 1.11 on kubernetes 构建失败

2020-07-09 Thread SmileSmile
hi Yang

在1.10版本,running的作业点击拓普图中随便一个operation,有detail subtasks taskmanagers xxx x 
这行,taskmanagers这栏里的host,显示的是 podname:端口

在1.11变成ip:端口

目前我这边遇到的情况是,构建了一个有120slot的集群,作业并行度是120。 提交到jm后jm就失联了,jm timeout。观察jm日志,疯狂在刷


No hostname could be resolved for the IP address 10.35.160.5, using IP address 
as host name. Local input split assignment (such as for HDFS files) may be 
impacted


目前观察到的改变主要是这块podname和ip的区别,其他不确定


| |
a511955993
|
|
邮箱:a511955...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月10日 12:13,Yang Wang 写道:
我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
都是ip地址的


Best,
Yang

SmileSmile  于2020年7月10日周五 上午10:42写道:

> hi yang wang
>
> 1.11版本的on kubernetes在hostname上有做什么变化吗?
>
> 作业运行的时候 flink ui上 tm变成ip:端口
> ,在1.10版本,ui上是 podname:端口。
>
> 作业启动的时候,jm日志一直在刷
>
> No hostname could be resolved for the IP address 10.35.160.5, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月09日 20:02,Yang Wang 写道:
> sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> 才会这样[1]
>
> 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> 如果不是,需要你把你的yaml发出来
>
>
> [1].
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> [2].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月9日周四 下午1:40写道:
>
> > hi
> >
> > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> >
> >
> > 目前看差别在于1.11启动jm和tm是通过args:
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > 本地挂载的flink-configuration-configmap.yaml导致失败。
> >
> >
> > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> >
> > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> >  while :;
> >  do
> >if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
> >  then tail -f -n +1 log/*jobmanager*.log;
> >fi;
> >  done"]
> >
> >
> > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> >
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > [3]
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月08日 16:38,SmileSmile 写道:
> > hi yun tang!
> >
> > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > best!
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月08日 16:29,Yun Tang 写道:
> > Hi
> >
> > 你是不是对 /opt/flink/conf
> > 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml
> > 等文件,而这个挂载的目录其实是不可写的。
> > 直接修改configmap里面的内容,这样挂载时候就会自动更新了。
> >
> > 祝好
> > 唐云
> > 
> > From: SmileSmile 
> > Sent: Wednesday, July 8, 2020 13:03
> > To: Flink user-zh mailing list 
> > Subject: flink 1.11 on kubernetes 构建失败
> >
> > hi
> >
> > 按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
> >
> >
> > Starting Task Manager
> > sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only
> > file system
> > sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only
> > file system
> > /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> > /opt/flink/conf/flink-conf.yaml: Permission denied
> > sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only
> > file system
> > /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create
> > /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> > [ERROR] The execution result is empty.
> > [ERROR] Could not get JVM parameters and dynamic configurations properly.
> >
> >
> > 是否有遇到同样的问题,支个招
> >
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
>


Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 Thread Dian Fu
好的,针对你这个case,这个是个已知问题:https://issues.apache.org/jira/browse/FLINK-15973 
,暂时还没有修复。


你可以这样改写一下,应该可以绕过去这个问题:

 table = st_env.scan("source") \
.where("action === 'Insert'") \
.window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
.group_by("hourlywindow") \
.select("action.max as action1, conv_string(eventTime.collect) as 
etlist, hourlywindow.start as time1") \
.select("action1 as action, hbf_thres(etlist) as eventtime, time1as 
actiontime")

st_env.create_temporary_view("tmp", table)
st_env.scan("tmp").filter("eventtime.isNotNull").insert_into("alarm_ad")


> 在 2020年7月10日,上午10:08,lgs <9925...@qq.com> 写道:
> 
> 谢谢提示。
> 我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:
> 
> 
> 
>st_env.scan("source") \
> .where("action === 'Insert'") \
> 
> .window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
> .group_by("hourlywindow") \
> .select("action.max as action1, conv_string(eventTime.collect) as
> etlist, hourlywindow.start as time1") \
> .select("action1 as action, hbf_thres(etlist) as eventtime, time1
> as actiontime") \
> * .filter("eventtime.isNotNull") \
> * .insert_into("alarm_ad")
> 
> 
> LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`],
> fields=[action, eventtime, actiontime])
> +- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime])
> *   +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0])
>  +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS
> NOT NULL(f0)])
> * +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS
> f0])
>+- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS
> f0])
>   +-
> GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime,
> 360)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0,
> COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2])
>  +- Exchange(distribution=[single])
> +- Calc(select=[recordId, action, originalState,
> newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime,
> duration, acked, pmdId, pmdTime, actionTime], where=[=(action,
> _UTF-16LE'Insert')])
>+- Reused(reference_id=[1])
> 
> 我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗?
> 
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



Re: Flink DataStream 统计UV问题

2020-07-09 Thread tison
你这个需求貌似是要看一天的 UV 的实时更新量,可以看一下 sliding window。如果是每天 0 点清零,实时看今天的
UV,那就是另一个问题了,应该需要自己定义 trigger & evictor

每条触发一次 window...看你数据量吧

Best,
tison.


shizk233  于2020年7月10日周五 上午10:23写道:

> Hi Jiazhi,
>
>
> 1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
> 2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL
>
> Best,
> shizk233
>
> ゞ野蠻遊戲χ  于2020年7月7日周二 下午10:27写道:
>
> > 大家好!
> >
> >  想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
> > 1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
> > 这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
> > 2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
> >
> >
> > 谢谢!
> > Jiazhi
> >
>


Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 Thread Jark Wu
社区有个 issue 正在解决这个问题,可以关注一下
https://issues.apache.org/jira/browse/FLINK-18002

Best,
Jark

On Fri, 10 Jul 2020 at 11:13, Leonard Xu  wrote:

> Hi, Peihui
>
> 我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
> 就是按照json的标准格式解析(jackson)的,没法将一个
> jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
> 因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。
>
> 一种做法是定义复杂的jsonObject对应的ROW
> 将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
> 另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String,
> 然后query里用UDTF处理。
>
>
> 祝好
> Leonard Xu
>
>
>
>
> > 在 2020年7月10日,10:16,Peihui He  写道:
> >
> > Hello,
> >
> >   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> >
> > Best wishes.
> >
> > Peihui He  于2020年7月10日周五 上午10:12写道:
> >
> >> Hello,
> >>
> >>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
> >>
> >>
> >> Best wishes.
> >>
> >> LakeShen  于2020年7月10日周五 上午10:03写道:
> >>
> >>> Hi Peihui,
> >>>
> >>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
> >>>
> >>> {
> >>>"a":"b",
> >>>"c":{
> >>>"d":"e",
> >>>"g":"f"
> >>>}
> >>> },
> >>>
> >>> 那么在 kafka table source 可以使用 row 来定义:
> >>>
> >>> create table xxx (
> >>> a varchar,
> >>> c row
> >>> )
> >>>
> >>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
> >>>
> >>> Best,
> >>> LakeShen
> >>>
> >>> Peihui He  于2020年7月10日周五 上午9:12写道:
> >>>
>  Hello:
> 
> 在用flink
> sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
> 
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
> 
> 
>  Best wishes.
> 
> >>>
> >>
>
>


Re:Re: flink 写入es失败导致数据丢失

2020-07-09 Thread sunfulin






hi,
感谢两位的回复。我先来本地复现下。如果有问题的话,会建个issue。











在 2020-07-10 11:43:33,"Congxian Qiu"  写道:
>Hi
>
>从官方文档的配置[1] 来看,对于 handle failure 来说,默认是 fail,也就是说 request 失败了会导致作业失败的,可以尝试在
>log 中看能否找到这个日志,或者显示的设置成 fail 看看。如果发现 handle failure 是 fail 的情况下不符合预期,可以想
>Leonard 说的那样建立一个 issue 来追踪这个问题
>
>[1]
>https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#connector-options
>Best,
>Congxian
>
>
>Leonard Xu  于2020年7月10日周五 上午11:33写道:
>
>> Hi,
>> 我理解作业应该失败才对,你本地可以复现吗?可以复现的话可以在社区建个issue。
>>
>> Best,
>> Leonard Xu
>>
>> > 在 2020年7月10日,11:20,sunfulin  写道:
>> >
>> > ,但是作业确实没失败。
>>
>>


Re: flink 1.11 on kubernetes 构建失败

2020-07-09 Thread Yang Wang
我记得1.11里面对host这个地方应该是没有改动,taskmanager.network.bind-policy的
默认值一会都是ip。所以你说的UI上是podname,这个是哪里的?正常TM列表akka地址
都是ip地址的


Best,
Yang

SmileSmile  于2020年7月10日周五 上午10:42写道:

> hi yang wang
>
> 1.11版本的on kubernetes在hostname上有做什么变化吗?
>
> 作业运行的时候 flink ui上 tm变成ip:端口
> ,在1.10版本,ui上是 podname:端口。
>
> 作业启动的时候,jm日志一直在刷
>
> No hostname could be resolved for the IP address 10.35.160.5, using IP
> address as host name. Local input split assignment (such as for HDFS files)
> may be impacted
>
>
>
>
> | |
> a511955993
> |
> |
> 邮箱:a511955...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2020年07月09日 20:02,Yang Wang 写道:
> sed替换报错应该不是Pod启动失败的根本原因,因为目前的docker-entrypoint.sh做了修改
> 才会这样[1]
>
> 你这个报错看着是执行bash-java-utils.jar报的错,确认你用的是社区的yaml文件[2],我运行是没有问题的。
> 如果不是,需要你把你的yaml发出来
>
>
> [1].
> https://github.com/apache/flink-docker/blob/dev-master/docker-entrypoint.sh
> [2].
>
> https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/kubernetes.html
>
>
> Best,
> Yang
>
> SmileSmile  于2020年7月9日周四 下午1:40写道:
>
> > hi
> >
> > 按照新版本的部署文件[1],会部署失败.如果将部署文件改用1.10版本,只是修改镜像文件和log4j文件,可以成功构建[2]。
> >
> >
> > 目前看差别在于1.11启动jm和tm是通过args:
> >
> ["jobmanager"]的方法,通过docker-entrypoint.sh[3]看到调用set_common_options方法的时候会sed
> > 本地挂载的flink-configuration-configmap.yaml导致失败。
> >
> >
> > 1.10 版本是通过$FLINK_HOME/bin/jobmanager.sh启动。
> >
> > command: ["/bin/bash", "-c", "$FLINK_HOME/bin/jobmanager.sh start;\
> >  while :;
> >  do
> >if [[ -f $(find log -name '*jobmanager*.log' -print -quit) ]];
> >  then tail -f -n +1 log/*jobmanager*.log;
> >fi;
> >  done"]
> >
> >
> > 如果遇到该问题的,沿用1.10版本的部署方式部署1.11镜像可以成功。  1.11 版本的部署方式如果有大佬可以走通的,求分享。
> >
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > [2]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> > [3]
> >
> https://github.com/apache/flink-docker/blob/master/1.11/scala_2.11-debian/docker-entrypoint.sh
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月08日 16:38,SmileSmile 写道:
> > hi yun tang!
> >
> > 没有对 /opt/flink/config 目录下的文件做写操作。 只是按照官网上的配置文件进行部署,镜像用的也是社区的镜像。
> > best!
> >
> >
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
> > 在2020年07月08日 16:29,Yun Tang 写道:
> > Hi
> >
> > 你是不是对 /opt/flink/conf
> > 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml
> > 等文件,而这个挂载的目录其实是不可写的。
> > 直接修改configmap里面的内容,这样挂载时候就会自动更新了。
> >
> > 祝好
> > 唐云
> > 
> > From: SmileSmile 
> > Sent: Wednesday, July 8, 2020 13:03
> > To: Flink user-zh mailing list 
> > Subject: flink 1.11 on kubernetes 构建失败
> >
> > hi
> >
> > 按照文档[1]的方法部署session cluster on kubernetes,集群构建的时候出现了如下报错
> >
> >
> > Starting Task Manager
> > sed: couldn't open temporary file /opt/flink/conf/sedVdyy6Q: Read-only
> > file system
> > sed: couldn't open temporary file /opt/flink/conf/sedcj5VKQ: Read-only
> > file system
> > /docker-entrypoint.sh: 72: /docker-entrypoint.sh: cannot create
> > /opt/flink/conf/flink-conf.yaml: Permission denied
> > sed: couldn't open temporary file /opt/flink/conf/sedB5eynR: Read-only
> > file system
> > /docker-entrypoint.sh: 120: /docker-entrypoint.sh: cannot create
> > /opt/flink/conf/flink-conf.yaml.tmp: Read-only file system
> > [ERROR] The execution result is empty.
> > [ERROR] Could not get JVM parameters and dynamic configurations properly.
> >
> >
> > 是否有遇到同样的问题,支个招
> >
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html#session-cluster-resource-definitions
> >
> >
> > | |
> > a511955993
> > |
> > |
> > 邮箱:a511955...@163.com
> > |
> >
> > 签名由 网易邮箱大师 定制
> >
>


Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread m...@sinoiov.com
hi:Congxian qiu:

topic没问题,用kafka指令创建,其他应用也能写入,我换一个kafka集群也不行





马琪
研发中心
北京中交兴路车联网科技有限公司


T. 010-50822710  M. 13701177502
F. 010-50822899  E. m...@sinoiov.com
地址:北京市海淀区东北旺西路8号中关村软件园27号院千方科技大厦A座(100085)



 
发件人: Congxian Qiu
发送时间: 2020-07-10 10:20
收件人: user-zh
主题: Re: Re: flink1.10.1在yarn上无法写入kafka的问题
Hi
从 org.apache.kafka.common.errors.InvalidTopicException: 这个异常来看,是 topic
invalid 导致,具体的可以看一下 InvalidTopicException 的介绍[1], 这上面说的有可能是
名字太长,或者有非法字符等,这也可以查看一下
 
[1]
https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/2.0.0/org/apache/kafka/common/errors/InvalidTopicException.html
Best,
Congxian
 
 
LakeShen  于2020年7月10日周五 上午10:10写道:
 
> 你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。
>
> LakeShen  于2020年7月10日周五 上午10:08写道:
>
> > Hi,
> >
> > 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
> >
> > 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
> >
> > Best,
> > LakeShen
> >
> > m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:
> >
> >> hi:zhisheng:
> >>
> >> 这是TM日志,在这之前没有任何错误日志,
> >>
> >> 代码逻辑很简单:
> >> SingleOutputStreamOperator>
> >> sourceStream = env.addSource(source)
> >> .setParallelism(2)
> >> .uid("DataProcessSource")
> >> .flatMap(new DataConvertFunction())
> >> .setParallelism(2)
> >> .uid("DataProcessDataCovert")
> >> .keyBy(new KeySelectorFunction())
> >> .process(new DataCleanFunction())
> >> .setParallelism(2)
> >> .uid("DataProcessDataProcess");
> >>
> >> AsyncDataStream.orderedWait(
> >> sourceStream,
> >> new AsyncDataCleanFunction(),
> >> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
> >> TimeUnit.MILLISECONDS,
> >> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
> >> ).uid("DataProcessAsync")
> >> .setParallelism(2)
> >> .addSink(sink)
> >> .uid("DataProcessSinkKafka")
> >> .setParallelism(2);
> >>
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'gps.kafka.sasl' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'java.ext.dirs' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'java.class.version' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
> >> - Kafka version: 2.2.0
> >> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
> >> - Kafka commitId: 05fcfde8f69b0349
> >> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
> >> http pool init,maxTotal:18,maxPerRoute:6
> >> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
> >> [Producer clientId=producer-1] Error while fetching metadata with
> >> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
> >> 2020-07-09 19:33:38,974 INFO
> >> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> >> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
> >> 9223372036854775807 ms.
> >> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
> >> async wait operator -> Sink: Unnamed (1/2)
> >> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> >> to send data to Kafka:
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> >> at
> >>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >> at
> >>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
> >> at
> >>
> 

Re: flink 写入es失败导致数据丢失

2020-07-09 Thread Congxian Qiu
Hi

从官方文档的配置[1] 来看,对于 handle failure 来说,默认是 fail,也就是说 request 失败了会导致作业失败的,可以尝试在
log 中看能否找到这个日志,或者显示的设置成 fail 看看。如果发现 handle failure 是 fail 的情况下不符合预期,可以想
Leonard 说的那样建立一个 issue 来追踪这个问题

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#connector-options
Best,
Congxian


Leonard Xu  于2020年7月10日周五 上午11:33写道:

> Hi,
> 我理解作业应该失败才对,你本地可以复现吗?可以复现的话可以在社区建个issue。
>
> Best,
> Leonard Xu
>
> > 在 2020年7月10日,11:20,sunfulin  写道:
> >
> > ,但是作业确实没失败。
>
>


Re: flink 写入es失败导致数据丢失

2020-07-09 Thread Leonard Xu
Hi,
我理解作业应该失败才对,你本地可以复现吗?可以复现的话可以在社区建个issue。

Best,
Leonard Xu

> 在 2020年7月10日,11:20,sunfulin  写道:
> 
> ,但是作业确实没失败。



Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Leonard Xu
Hello,Zach

>>> Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException:
>>> Could not find a suitable table factory for
>>> 'org.apache.flink.table.factories.TableSourceFactory' in
>>> the classpath.
>>> 
>>> 
>>> Reason: Required context properties mismatch.
这个错误,一般是SQL 程序缺少了SQL connector 或 format的依赖,你pom里下面的这两个依赖,

  
   org.apache.flink
   flink-sql-connector-kafka_2.11
   ${flink.version}
   
   
   org.apache.flink
   flink-connector-kafka_2.11
   ${flink.version}
   

放在一起是会冲突的,flink-sql-connector-kafka_2.11 shaded 了kafka的依赖, 
flink-connector-kafka_2.11 是没有shade的。
你根据你的需要,如果是SQL 程序用第一个, 如果是 dataStream 作业 使用第二个。

祝好,
Leonard Xu


> 在 2020年7月10日,11:08,Shuiqiang Chen  写道:
> 
> Hi,
> 看样子是kafka table source没有成功创建,也许你需要将
> 
>org.apache.flink
>flink-sql-connector-kafka_2.11
>${flink.version}
> 
> 
> 这个jar 放到 FLINK_HOME/lib 目录下
> 
> Congxian Qiu  于2020年7月10日周五 上午10:57写道:
> 
>> Hi
>> 
>> 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。
>> 
>> PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包
>> 
>> ```
>> The following factories have been considered:
>> org.apache.flink.table.sources.CsvBatchTableSourceFactory
>> org.apache.flink.table.sources.CsvAppendTableSourceFactory
>> org.apache.flink.table.filesystem.FileSystemTableFactory
>> at
>> 
>> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
>> at
>> 
>> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
>> at
>> 
>> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
>> at
>> 
>> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
>> at
>> 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
>> ... 37 more
>> ```
>> 
>> [1] http://apache-flink.147419.n8.nabble.com/flink-1-11-td4471.html
>> Best,
>> Congxian
>> 
>> 
>> Zhou Zach  于2020年7月10日周五 上午10:39写道:
>> 
>>> 日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。
>>> 
>>> 
>>> 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常:
>>> 
>>> 
>>> SLF4J: Class path contains multiple SLF4J bindings.
>>> SLF4J: Found binding in
>>> 
>> [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: Found binding in
>>> 
>> [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
>>> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
>>> explanation.
>>> SLF4J: Actual binding is of type
>>> [org.apache.logging.slf4j.Log4jLoggerFactory]
>>> 
>>> 
>>> 
>>> The program finished with the following exception:
>>> 
>>> 
>>> org.apache.flink.client.program.ProgramInvocationException: The main
>>> method caused an error: findAndCreateTableSource failed.
>>> at
>>> 
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
>>> at
>>> 
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
>>> at
>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
>>> at
>>> 
>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
>>> at
>>> 
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
>>> at
>>> 
>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
>>> at java.security.AccessController.doPrivileged(Native Method)
>>> at javax.security.auth.Subject.doAs(Subject.java:422)
>>> at
>>> 
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
>>> at
>>> 
>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
>>> Caused by: org.apache.flink.table.api.TableException:
>>> findAndCreateTableSource failed.
>>> at
>>> 
>> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
>>> at
>>> 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
>>> at
>>> 
>> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89)
>>> at
>>> 
>> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
>>> at
>>> 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
>>> at
>>> 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
>>> at
>>> 
>> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
>>> at
>>> 
>> 

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 Thread sunfulin



hi,Leonard
是的。es集群服务不可用。我能观察到写入es失败,但是作业确实没失败。等到es集群服务恢复后,作业也正常了,但是故障期间的数据有丢失。














在 2020-07-10 11:16:17,"Leonard Xu"  写道:
>Hello, fulin
>
>> es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。
>
>es 服务挂掉是指es 集群不可用吗?那这时应该是写入es应该失败,作业也会失败,你说的没有积压是指什么呢?
>
>Best
>Leonard Xu


Re: flink 写入es失败导致数据丢失

2020-07-09 Thread Leonard Xu
Hello, fulin

> es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。

es 服务挂掉是指es 集群不可用吗?那这时应该是写入es应该失败,作业也会失败,你说的没有积压是指什么呢?

Best
Leonard Xu

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 Thread Leonard Xu
Hi, Peihui

我理解你的需求是json中有一些复杂的字段,你不想解析,希望后续用UDTF在来解析,这个应该做不到的,现在的json format 的解析的底层实现
就是按照json的标准格式解析(jackson)的,没法将一个 
jsonObject解析成一个String。另外如果你jsonObject中的内容格式不确定,也不适合在Schema中声明,
因为SQL 是预编译后执行的,不能做到schema里是三个field,执行时又能解析四个field。

一种做法是定义复杂的jsonObject对应的ROW 
将全部可能的字段包含进去,每条记录没有的字段解析出来的会是null,fail-on-missing-field 默认关闭的,
另外一种推荐你把复杂的字段在上游就转义成一个String放到json的一个field中,这样Flink解析出来就是一个String, 
然后query里用UDTF处理。


祝好
Leonard Xu




> 在 2020年7月10日,10:16,Peihui He  写道:
> 
> Hello,
> 
>   实际上,我也并不太关心这个字段的内容,能按string 保存下来就好了。
> 
> Best wishes.
> 
> Peihui He  于2020年7月10日周五 上午10:12写道:
> 
>> Hello,
>> 
>>   明白您的意思。但是当一个字段下的json 字段不确定,类似一个黑盒子一样的化,就不好定义了。
>> 
>> 
>> Best wishes.
>> 
>> LakeShen  于2020年7月10日周五 上午10:03写道:
>> 
>>> Hi Peihui,
>>> 
>>> 如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:
>>> 
>>> {
>>>"a":"b",
>>>"c":{
>>>"d":"e",
>>>"g":"f"
>>>}
>>> },
>>> 
>>> 那么在 kafka table source 可以使用 row 来定义:
>>> 
>>> create table xxx (
>>> a varchar,
>>> c row
>>> )
>>> 
>>> 如果 还存在嵌套,可以继续再使用 Row 来定义。
>>> 
>>> Best,
>>> LakeShen
>>> 
>>> Peihui He  于2020年7月10日周五 上午9:12写道:
>>> 
 Hello:
 
在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
 
 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
 
 
 Best wishes.
 
>>> 
>> 



Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Shuiqiang Chen
Hi,
看样子是kafka table source没有成功创建,也许你需要将

org.apache.flink
flink-sql-connector-kafka_2.11
${flink.version}
 

这个jar 放到 FLINK_HOME/lib 目录下

Congxian Qiu  于2020年7月10日周五 上午10:57写道:

> Hi
>
> 从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。
>
> PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包
>
> ```
> The following factories have been considered:
> org.apache.flink.table.sources.CsvBatchTableSourceFactory
> org.apache.flink.table.sources.CsvAppendTableSourceFactory
> org.apache.flink.table.filesystem.FileSystemTableFactory
> at
>
> org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
> at
>
> org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
> at
>
> org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
> at
>
> org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
> at
>
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
> ... 37 more
> ```
>
> [1] http://apache-flink.147419.n8.nabble.com/flink-1-11-td4471.html
> Best,
> Congxian
>
>
> Zhou Zach  于2020年7月10日周五 上午10:39写道:
>
> > 日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。
> >
> >
> > 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常:
> >
> >
> > SLF4J: Class path contains multiple SLF4J bindings.
> > SLF4J: Found binding in
> >
> [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: Found binding in
> >
> [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> > explanation.
> > SLF4J: Actual binding is of type
> > [org.apache.logging.slf4j.Log4jLoggerFactory]
> >
> >
> > 
> >  The program finished with the following exception:
> >
> >
> > org.apache.flink.client.program.ProgramInvocationException: The main
> > method caused an error: findAndCreateTableSource failed.
> > at
> >
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> > at
> >
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> > at
> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> > at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> > at
> >
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> > at java.security.AccessController.doPrivileged(Native Method)
> > at javax.security.auth.Subject.doAs(Subject.java:422)
> > at
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> > at
> >
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> > at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> > Caused by: org.apache.flink.table.api.TableException:
> > findAndCreateTableSource failed.
> > at
> >
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> > at
> >
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
> > at
> >
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> > at
> >
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> >
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> > at
> >
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> > at
> >
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> > at
> >
> 

Re:Re: flink 写入es失败导致数据丢失

2020-07-09 Thread sunfulin
hi
我使用社区默认的ES,主要配置如下:我使用flink 1.10.1,blink-planner。使用了ES6的sink。
我看了下文档,默认有个参数是 
connector.failure-handler,是fail。我也能在TM日志里看到连接es失败的报错,但是整个任务checkpoint并没有失败。数据丢了。


WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '',
'connector.hosts' = '',
'connector.index' = 'realtime_fund_product_all_sell',
'connector.document-type' = '_doc',
'update-mode' = 'upsert',
'connector.key-delimiter' = '$',
'connector.key-null-literal' = 'n/a',
'connector.bulk-flush.interval' = '1000',
'format.type' = 'json'
)

















在 2020-07-10 11:00:53,"Congxian Qiu"  写道:
>Hi
>
>你 ES Sink 是自己写的,还是用的社区的呢?社区的使用了哪个版本,以及配置是啥样的呢
>
>Best,
>Congxian
>
>
>sunfulin  于2020年7月10日周五 上午10:51写道:
>
>> hi,
>>
>> 我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Aaron Levin
Hi Georg, you can try using the circe library for this which has a way to
automatically generate JSON decoders for scala case classes.

As it was mentioned earlier, Flink does not come packaged with
JSON-decoding generators for Scala like spark does.

On Thu, Jul 9, 2020 at 4:45 PM Georg Heiler 
wrote:

> Great. Thanks.
> But would it be possible to automate this i.e. to have this work
> automatically for the case class / product?
>
> Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
> taher...@gmail.com>:
>
>> The performant way would be to apply a map function over the stream and
>> then use the Jackson ObjectMapper to convert to scala objects. In flink
>> there is no API like Spark to automatically get all fields.
>>
>> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
>> wrote:
>>
>>> How can I use it with a scala case class?
>>> If I understand it correctly for better performance the Object Mapper is
>>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>>> probably I should rephrase to: how can I then map these to case classes
>>> without handcoding it?  https://github.com/json4s/json4s or
>>> https://github.com/FasterXML/jackson-module-scala both only seem to
>>> consume strings.
>>>
>>> Best,
>>> Georg
>>>
>>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>>> taher...@gmail.com>:
>>>
 You can try the Jackson ObjectMapper library and that will get you from
 json to object.

 Regards,
 Taher Koitawala

 On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
 wrote:

> Hi,
>
> I want to map a stream of JSON documents from Kafka to a scala
> case-class. How can this be accomplished using the
> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
> required?
>
> I have a Spark background. There, such manual mappings usually are
> discouraged. Instead, they offer a nice API (dataset API) to perform such 
> a
> type of assignment.
> 1) this is concise
> 2) it operates on sparks off-heap memory representations (tungsten) to
> be faster
>
> In Flink, instead, such off-heap optimizations seem not to be talked
> much about (sorry if I miss something, I am a Flink newbie). Is there a
> reason why these optimizations are not necessary in Flink?
>
>
> How could I get the following example:
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
> new FlinkKafkaConsumer(
>   "tweets-raw-json",
>   serializer,
>   properties
> ).setStartFromEarliest() // TODO experiment with different start
> values
>   )
>
> to map to this Tweet class concisely, i.e. without manually iterating
> through all the attribute fields and parsing the keys from the object node
> tree.
>
> final case class Tweet(tweet_id: Option[String], text: Option[String],
> source: Option[String], geo: Option[String], place: Option[String], lang:
> Option[String], created_at: Option[String], timestamp_ms: Option[String],
> coordinates: Option[String], user_id: Option[Long], user_name:
> Option[String], screen_name: Option[String], user_created_at:
> Option[String], followers_count: Option[Long], friends_count: 
> Option[Long],
> user_lang: Option[String], user_location: Option[String], hashtags:
> Option[Seq[String]])
>
> Best,
> Georg
>



Re: flink 写入es失败导致数据丢失

2020-07-09 Thread Congxian Qiu
Hi

你 ES Sink 是自己写的,还是用的社区的呢?社区的使用了哪个版本,以及配置是啥样的呢

Best,
Congxian


sunfulin  于2020年7月10日周五 上午10:51写道:

> hi,
>
> 我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。


Re: Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Congxian Qiu
Hi

从异常看,可能是某个包没有引入导致的,和这个[1]比较像,可能你需要对比一下需要的是哪个包没有引入。

PS 从栈那里看到是 csv 相关的,可以优先考虑下 cvs 相关的包

```
The following factories have been considered:
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.filesystem.FileSystemTableFactory
at
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:322)
at
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:190)
at
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:143)
at
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:96)
at
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:46)
... 37 more
```

[1] http://apache-flink.147419.n8.nabble.com/flink-1-11-td4471.html
Best,
Congxian


Zhou Zach  于2020年7月10日周五 上午10:39写道:

> 日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。
>
>
> 我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常:
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
>
>
> 
>  The program finished with the following exception:
>
>
> org.apache.flink.client.program.ProgramInvocationException: The main
> method caused an error: findAndCreateTableSource failed.
> at
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
> at
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
> at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
> at
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
> at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
> at
> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
> at java.security.AccessController.doPrivileged(Native Method)
> at javax.security.auth.Subject.doAs(Subject.java:422)
> at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
> at
> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
> Caused by: org.apache.flink.table.api.TableException:
> findAndCreateTableSource failed.
> at
> org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
> at
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
> at
> org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
> at
> 

flink 写入es失败导致数据丢失

2020-07-09 Thread sunfulin
hi,
我们现在使用flink消费kafka数据写到es,今天发现在默认设置下,es服务挂掉时,这段时间写入es失败,但是没有积压,而是数据丢失了。这个应该不符合预期。想问下可能是啥原因造成的。

Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
Got it. That was what I always thought but needed to be sure. Thank you for
confirming


On Thu, Jul 9, 2020 at 9:39 PM Xintong Song  wrote:

> Hi Vishal,
>
> If you have streaming jobs only and do not use RocksDB, you can tune the
> fraction (taskmanager.memory.managed.fraction) to 0. In this way, no more
> off-heap managed memory will be reserved for the user code execution.
> Please be aware that this does not mean the JVM heap will get all of the
> `process.size`. The Flink framework will still use some off-heap memory,
> for purposes like network buffering and JVM overhead.
>
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 9, 2020 at 10:57 PM Vishal Santoshi 
> wrote:
>
>> ager.memory.process.size(none)MemorySizeTotal Process Memory size for
>> the TaskExecutors. This includes all the memory that a TaskExecutor
>> consumes, consisting of Total Flink Memory, JVM Metaspace, and JVM
>> Overhead. On containerized setups, this should be set to the container
>> memory. See also 'taskmanager.memory.flink.size' for total Flink memory
>> size configuration.
>>
>> The Total Flink Memory consists of off heap memory governed by fraction
>> .
>> In pure streaming case ( and non ROCKSDB state case )
>>
>> This is the size of off-heap memory managed by the memory manager,
>> reserved for sorting, hash tables, caching of intermediate results and
>> RocksDB state backend. Memory consumers can either allocate memory from the
>> memory manager in the form of MemorySegments, or reserve bytes from the
>> memory manager and keep their memory usage within that boundary.
>>
>> Is not used AFAIK . May be reduce the fraction to 0 ? We do not  use
>> offline heap ( aka batch jobs ) on our cluster ?
>>
>>
>> Any help will be appreciated.
>>
>> On Thu, Jul 9, 2020 at 9:25 AM Vishal Santoshi 
>> wrote:
>>
>>> Hello folks,
>>>   As established
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
>>>  ,
>>> I set the taskmanager.memory.process.size and 
>>> taskmanager.memory.task.off-heap.size
>>> in my flink-conf.yaml and I see the 2 properties being pulled in.
>>>
>>> * - Loading configuration property: taskmanager.memory.process.size,
>>> 8000m*
>>>
>>> * - Loading configuration property:
>>> taskmanager.memory.task.off-heap.size, 1024m*
>>>
>>> I am not sure how the -Xmx and -Xms are calculated but I see
>>>
>>> *Starting taskexecutor as a console application on host
>>> kafka-to-hdfs-taskmanager-dev-8597c78d9c-59dqw.*
>>>
>>> *VM settings:*
>>>
>>> *Min. Heap Size: 2.27G*
>>>
>>> *Max. Heap Size: 2.27G*
>>>
>>> *Using VM: OpenJDK 64-Bit Server VM*
>>>
>>>
>>> What gives ?
>>>
>>> I am looking through the scripts and am not sure I see any calculations
>>> based on taskmanager.memory.process.size
>>>
>>>
>>>


Re:Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Zhou Zach
日志贴全了的,这是从yarn ui贴的full log,用yarn logs命令也是这些log,太简短,看不出错误在哪。。。


我又提交了另外之前用flink1.10跑过的任务,现在用flink1.11跑,报了异常:


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/opt/flink-1.11.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]



 The program finished with the following exception:


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: findAndCreateTableSource failed.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:302)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
Caused by: org.apache.flink.table.api.TableException: findAndCreateTableSource 
failed.
at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:49)
at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.findAndCreateLegacyTableSource(LegacyCatalogSourceTable.scala:190)
at 
org.apache.flink.table.planner.plan.schema.LegacyCatalogSourceTable.toRel(LegacyCatalogSourceTable.scala:89)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:527)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:204)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:747)
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForUserUV$.main(FromKafkaSinkJdbcForUserUV.scala:78)
at 
cn.ibobei.qile.dataflow.sql.FromKafkaSinkJdbcForUserUV.main(FromKafkaSinkJdbcForUserUV.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
... 11 more
Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could 
not find a suitable table factory for 
'org.apache.flink.table.factories.TableSourceFactory' in
the classpath.


Reason: Required context properties mismatch.


The following properties are requested:
connector.properties.bootstrap.servers=cdh1:9092,cdh2:9092,cdh3:9092

Re: flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Congxian Qiu
Hi

这个看上去是提交到 Yarn 了,具体的原因需要看下 JM log 是啥原因。另外是否是日志没有贴全,这里只看到本地 log,其他的就只有小部分
jobmanager.err 的 log。

Best,
Congxian


Zhou Zach  于2020年7月9日周四 下午9:23写道:

> hi all,
> 原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因,
> yarn log:
> Log Type: jobmanager.err
>
>
> Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
>
>
> Log Length: 785
>
>
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/yarn/nm/usercache/hdfs/appcache/application_1594271580406_0010/filecache/11/data-flow-1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
> log4j:WARN No appenders could be found for logger
> (org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
> log4j:WARN Please initialize the log4j system properly.
> log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for
> more info.
>
>
> Log Type: jobmanager.out
>
>
> Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
>
>
> Log Length: 0
>
>
>
>
> Log Type: prelaunch.err
>
>
> Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
>
>
> Log Length: 0
>
>
>
>
> Log Type: prelaunch.out
>
>
> Log Upload Time: Thu Jul 09 21:02:48 +0800 2020
>
>
> Log Length: 70
>
>
> Setting up env variables
> Setting up job resources
> Launching container
>
>
>
>
>
>
>
>
> 本地log:
> 2020-07-09 21:02:41,015 INFO  org.apache.flink.client.cli.CliFrontend
> [] -
> 
> 2020-07-09 21:02:41,020 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2020-07-09 21:02:41,020 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2020-07-09 21:02:41,021 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2020-07-09 21:02:41,021 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2020-07-09 21:02:41,021 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.numberOfTaskSlots, 1
> 2020-07-09 21:02:41,021 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: parallelism.default, 1
> 2020-07-09 21:02:41,021 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.execution.failover-strategy, region
> 2020-07-09 21:02:41,164 INFO
> org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop
> user set to hdfs (auth:SIMPLE)
> 2020-07-09 21:02:41,172 INFO
> org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file
> will be created as /tmp/jaas-2213111423022415421.conf.
> 2020-07-09 21:02:41,181 INFO  org.apache.flink.client.cli.CliFrontend
> [] - Running 'run-application' command.
> 2020-07-09 21:02:41,194 INFO
> org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer
> [] - Submitting application in 'Application Mode'.
> 2020-07-09 21:02:41,201 WARN
> org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The
> configuration directory ('/opt/flink-1.11.0/conf') already contains a LOG4J
> config file.If you want to use logback, then please delete or rename the
> log configuration file.
> 2020-07-09 21:02:41,537 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - No path for the flink jar passed. Using the location
> of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
> 2020-07-09 21:02:41,665 INFO
> org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] -
> Failing over to rm220
> 2020-07-09 21:02:41,717 INFO  org.apache.hadoop.conf.Configuration
>  [] - resource-types.xml not found
> 2020-07-09 21:02:41,718 INFO
> org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to
> find 'resource-types.xml'.
> 2020-07-09 21:02:41,755 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Cluster specification:
> ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=4096,
> slotsPerTaskManager=1}
> 2020-07-09 21:02:42,723 INFO  org.apache.flink.yarn.YarnClusterDescriptor
> [] - Submitting application master
> application_1594271580406_0010
> 2020-07-09 21:02:42,969 INFO
> org.apache.hadoop.yarn.client.api.impl.YarnClientImpl[] - Submitted
> application application_1594271580406_0010
> 2020-07-09 21:02:42,969 INFO  

Re: Flink DataStream 统计UV问题

2020-07-09 Thread shizk233
Hi Jiazhi,

1.如果数据流量不是很大的话,按每条数据触发也没问题。另外,基于事件时间的情况,提前触发可以选择ContinuousEventTimeTrigger,可以查看Trigger接口的实现找到你想要的trigger。
2.窗口结束后会自动释放。一般对于Global窗口需要手动设置TTL

Best,
shizk233

ゞ野蠻遊戲χ  于2020年7月7日周二 下午10:27写道:

> 大家好!
>
>  想问下,现在在用DataStream的api来统计每天的UV,代码如下,有2个使用问题:
> 1、在使用Tumbling窗口的时候,由于使用窗口跨度是1天(Time.days(1)),只有以一天结束的时候,才能输出一个uv值,
> 这样时间等待太长了,所以加了一个trigger,每来一条都触发一次窗口,不知道这样的用法没有问题。
> 2、还有想问下在窗口结束后,里面的state状态会自动释放吗?还是要自己手动设置TTL的。
>
>
> 谢谢!
> Jiazhi
>


Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread Congxian Qiu
Hi
从 org.apache.kafka.common.errors.InvalidTopicException: 这个异常来看,是 topic
invalid 导致,具体的可以看一下 InvalidTopicException 的介绍[1], 这上面说的有可能是
名字太长,或者有非法字符等,这也可以查看一下

[1]
https://www.javadoc.io/doc/org.apache.kafka/kafka-clients/2.0.0/org/apache/kafka/common/errors/InvalidTopicException.html
Best,
Congxian


LakeShen  于2020年7月10日周五 上午10:10写道:

> 你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。
>
> LakeShen  于2020年7月10日周五 上午10:08写道:
>
> > Hi,
> >
> > 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
> >
> > 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
> >
> > Best,
> > LakeShen
> >
> > m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:
> >
> >> hi:zhisheng:
> >>
> >> 这是TM日志,在这之前没有任何错误日志,
> >>
> >> 代码逻辑很简单:
> >> SingleOutputStreamOperator>
> >> sourceStream = env.addSource(source)
> >> .setParallelism(2)
> >> .uid("DataProcessSource")
> >> .flatMap(new DataConvertFunction())
> >> .setParallelism(2)
> >> .uid("DataProcessDataCovert")
> >> .keyBy(new KeySelectorFunction())
> >> .process(new DataCleanFunction())
> >> .setParallelism(2)
> >> .uid("DataProcessDataProcess");
> >>
> >> AsyncDataStream.orderedWait(
> >> sourceStream,
> >> new AsyncDataCleanFunction(),
> >> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
> >> TimeUnit.MILLISECONDS,
> >> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
> >> ).uid("DataProcessAsync")
> >> .setParallelism(2)
> >> .addSink(sink)
> >> .uid("DataProcessSinkKafka")
> >> .setParallelism(2);
> >>
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'gps.kafka.sasl' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'java.ext.dirs' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 WARN
> >> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> >> 'java.class.version' was supplied but isn't a known config.
> >> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
> >> - Kafka version: 2.2.0
> >> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
> >> - Kafka commitId: 05fcfde8f69b0349
> >> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
> >> http pool init,maxTotal:18,maxPerRoute:6
> >> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
> >> [Producer clientId=producer-1] Error while fetching metadata with
> >> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
> >> 2020-07-09 19:33:38,974 INFO
> >> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> >> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
> >> 9223372036854775807 ms.
> >> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
> >> async wait operator -> Sink: Unnamed (1/2)
> >> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
> >> to send data to Kafka:
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> >> at
> >>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> >> at
> >>
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> >> at
> >>
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> >> at
> >>
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> >> at
> >>
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
> >> at
> >>
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
> >> at
> >>
> 

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread m...@sinoiov.com
hi ,LakeShen

对,测试环境包括yarn集群和kafka集群,他们想联通的

配置的是测试环境的kafka broker的地址

road.kafka.brokers=172.17.47.134:9092,172.17.47.135:9092,172.17.47.136:9092
road.kafka.topic=road-map
road.kafka.group.id=ins-001
road.kafka.transaction.timeout.ms=30





马琪
研发中心
北京中交兴路车联网科技有限公司


T. 010-50822710  M. 13701177502
F. 010-50822899  E. m...@sinoiov.com
地址:北京市海淀区东北旺西路8号中关村软件园27号院千方科技大厦A座(100085)



 
发件人: LakeShen
发送时间: 2020-07-10 10:10
收件人: user-zh
主题: Re: Re: flink1.10.1在yarn上无法写入kafka的问题
你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。
 
LakeShen  于2020年7月10日周五 上午10:08写道:
 
> Hi,
>
> 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
>
> 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
>
> Best,
> LakeShen
>
> m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:
>
>> hi:zhisheng:
>>
>> 这是TM日志,在这之前没有任何错误日志,
>>
>> 代码逻辑很简单:
>> SingleOutputStreamOperator>
>> sourceStream = env.addSource(source)
>> .setParallelism(2)
>> .uid("DataProcessSource")
>> .flatMap(new DataConvertFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataCovert")
>> .keyBy(new KeySelectorFunction())
>> .process(new DataCleanFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataProcess");
>>
>> AsyncDataStream.orderedWait(
>> sourceStream,
>> new AsyncDataCleanFunction(),
>> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
>> TimeUnit.MILLISECONDS,
>> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
>> ).uid("DataProcessAsync")
>> .setParallelism(2)
>> .addSink(sink)
>> .uid("DataProcessSinkKafka")
>> .setParallelism(2);
>>
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'gps.kafka.sasl' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.ext.dirs' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.class.version' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka version: 2.2.0
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka commitId: 05fcfde8f69b0349
>> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
>> http pool init,maxTotal:18,maxPerRoute:6
>> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
>> [Producer clientId=producer-1] Error while fetching metadata with
>> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
>> 2020-07-09 19:33:38,974 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - [Producer
>> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
>> async wait operator -> Sink: Unnamed (1/2)
>> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
>> to send data to Kafka:
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
>> at
>> 

Re: flink时间窗口

2020-07-09 Thread Congxian Qiu
Hi

对于这个问题,可以尝试看添加相关日志能否在线上(或者测试环境)排查,另外可以使用 watermark 相关的 metric[1] 查看下是否符合预期
如果上面的不行,可以尝试看能否在 IDE 中进行复现,这样可以 debug 进行追查

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#io
Best,
Congxian


忝忝向仧 <153488...@qq.com> 于2020年7月9日周四 下午11:36写道:

> new ProcessWindowFunction是怎么处理的?
> 是否设置了水印?
>
>
>
>
> --原始邮件--
> 发件人:
>   "user-zh"
> <
> aichiyuyiy...@163.com;
> 发送时间:2020年7月9日(星期四) 中午11:37
> 收件人:"user-zh"
> 主题:flink时间窗口
>
>
>
> 你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
> SingleOutputStreamOperator flatMap.keyBy(0,1)
> 
> .timeWindow(Time.minutes(1))
> 
> .process(new ProcessWindowFunction)
> 当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。


Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread LakeShen
你的 Yarn 环境,Flink 任务使用的 Kafka 地址,应该是 Yarn 环境的 kafka broker 地址。

LakeShen  于2020年7月10日周五 上午10:08写道:

> Hi,
>
> 从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。
>
> 这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。
>
> Best,
> LakeShen
>
> m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:
>
>> hi:zhisheng:
>>
>> 这是TM日志,在这之前没有任何错误日志,
>>
>> 代码逻辑很简单:
>> SingleOutputStreamOperator>
>> sourceStream = env.addSource(source)
>> .setParallelism(2)
>> .uid("DataProcessSource")
>> .flatMap(new DataConvertFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataCovert")
>> .keyBy(new KeySelectorFunction())
>> .process(new DataCleanFunction())
>> .setParallelism(2)
>> .uid("DataProcessDataProcess");
>>
>> AsyncDataStream.orderedWait(
>> sourceStream,
>> new AsyncDataCleanFunction(),
>> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
>> TimeUnit.MILLISECONDS,
>> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
>> ).uid("DataProcessAsync")
>> .setParallelism(2)
>> .addSink(sink)
>> .uid("DataProcessSinkKafka")
>> .setParallelism(2);
>>
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'gps.kafka.sasl' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.ext.dirs' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 WARN
>> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
>> 'java.class.version' was supplied but isn't a known config.
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka version: 2.2.0
>> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser
>> - Kafka commitId: 05fcfde8f69b0349
>> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
>> http pool init,maxTotal:18,maxPerRoute:6
>> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
>> [Producer clientId=producer-1] Error while fetching metadata with
>> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
>> 2020-07-09 19:33:38,974 INFO
>> org.apache.kafka.clients.producer.KafkaProducer - [Producer
>> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
>> 9223372036854775807 ms.
>> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
>> async wait operator -> Sink: Unnamed (1/2)
>> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed
>> to send data to Kafka:
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
>> at
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
>> at
>> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
>> at
>> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
>> at
>> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
>> at
>> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
>> at
>> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
>> at
>> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:336)
>> at
>> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
>> at
>> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:255)
>> at
>> 

Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 Thread lgs
谢谢提示。
我打印出来explain,发现确实调用了两次udf,条件是那个eventtime.isNotNull:



st_env.scan("source") \
 .where("action === 'Insert'") \

.window(Tumble.over("1.hour").on("actionTime").alias("hourlywindow")) \
 .group_by("hourlywindow") \
 .select("action.max as action1, conv_string(eventTime.collect) as
etlist, hourlywindow.start as time1") \
 .select("action1 as action, hbf_thres(etlist) as eventtime, time1
as actiontime") \
* .filter("eventtime.isNotNull") \
* .insert_into("alarm_ad")


LegacySink(name=[`default_catalog`.`default_database`.`alarm_ad`],
fields=[action, eventtime, actiontime])
+- Calc(select=[EXPR$0 AS action, f0 AS eventtime, EXPR$2 AS actiontime])
*   +- PythonCalc(select=[EXPR$0, EXPR$2, simple_udf(f0) AS f0])
  +- Calc(select=[EXPR$0, EXPR$2, UDFLength(EXPR$1) AS f0], where=[IS
NOT NULL(f0)])
* +- PythonCalc(select=[EXPR$0, EXPR$1, EXPR$2, simple_udf(f0) AS
f0])
+- Calc(select=[EXPR$0, EXPR$1, EXPR$2, UDFLength(EXPR$1) AS
f0])
   +-
GroupWindowAggregate(window=[TumblingGroupWindow('hourlywindow, actionTime,
360)], properties=[EXPR$2], select=[MAX(action) AS EXPR$0,
COLLECT(eventTime) AS EXPR$1, start('hourlywindow) AS EXPR$2])
  +- Exchange(distribution=[single])
 +- Calc(select=[recordId, action, originalState,
newState, originalCause, newCause, ser_name, enb, eventTime, ceasedTime,
duration, acked, pmdId, pmdTime, actionTime], where=[=(action,
_UTF-16LE'Insert')])
+- Reused(reference_id=[1])

我这里是想过滤python udf的返回,如果返回是空,我就不要sink。是我的sql写错了吗?




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread LakeShen
Hi,

从日志看出,应该是你提交到 Yarn 的环境,这个环境和你的测试环境的 kafka 连接不上,获取不到元数据。

这里你检查一下你的 Yarn 环境,Flink kafka broker 地址是否是测试环境的 kafka broker 地址。

Best,
LakeShen

m...@sinoiov.com  于2020年7月9日周四 下午9:21写道:

> hi:zhisheng:
>
> 这是TM日志,在这之前没有任何错误日志,
>
> 代码逻辑很简单:
> SingleOutputStreamOperator>
> sourceStream = env.addSource(source)
> .setParallelism(2)
> .uid("DataProcessSource")
> .flatMap(new DataConvertFunction())
> .setParallelism(2)
> .uid("DataProcessDataCovert")
> .keyBy(new KeySelectorFunction())
> .process(new DataCleanFunction())
> .setParallelism(2)
> .uid("DataProcessDataProcess");
>
> AsyncDataStream.orderedWait(
> sourceStream,
> new AsyncDataCleanFunction(),
> EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
> TimeUnit.MILLISECONDS,
> EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
> ).uid("DataProcessAsync")
> .setParallelism(2)
> .addSink(sink)
> .uid("DataProcessSinkKafka")
> .setParallelism(2);
>
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'gps.kafka.sasl' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'java.ext.dirs' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 WARN
> org.apache.kafka.clients.consumer.ConsumerConfig - The configuration
> 'java.class.version' was supplied but isn't a known config.
> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser -
> Kafka version: 2.2.0
> 2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser -
> Kafka commitId: 05fcfde8f69b0349
> 2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil -
> http pool init,maxTotal:18,maxPerRoute:6
> 2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient -
> [Producer clientId=producer-1] Error while fetching metadata with
> correlation id 8 : {=INVALID_TOPIC_EXCEPTION}
> 2020-07-09 19:33:38,974 INFO
> org.apache.kafka.clients.producer.KafkaProducer - [Producer
> clientId=producer-1] Closing the Kafka producer with timeoutMillis =
> 9223372036854775807 ms.
> 2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task -
> async wait operator -> Sink: Unnamed (1/2)
> (cdbe008dcdb76813f88c4a48b9907d77) switched from RUNNING to FAILED.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka:
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> at
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
> at
> org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:336)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:255)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
> at
> 

Re: flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 Thread LakeShen
Hi Peihui,

如果消费的 Kafka json 中,json 比较复杂的话,比如存在嵌套,就像下面的格式:

{
"a":"b",
"c":{
"d":"e",
"g":"f"
}
},

那么在 kafka table source 可以使用 row 来定义:

create table xxx (
a varchar,
c row
)

如果 还存在嵌套,可以继续再使用 Row 来定义。

Best,
LakeShen

Peihui He  于2020年7月10日周五 上午9:12写道:

> Hello:
>
> 在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。
>
>  有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。
>
>
> Best wishes.
>


Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Xintong Song
Hi Vishal,

If you have streaming jobs only and do not use RocksDB, you can tune the
fraction (taskmanager.memory.managed.fraction) to 0. In this way, no more
off-heap managed memory will be reserved for the user code execution.
Please be aware that this does not mean the JVM heap will get all of the
`process.size`. The Flink framework will still use some off-heap memory,
for purposes like network buffering and JVM overhead.


Thank you~

Xintong Song



On Thu, Jul 9, 2020 at 10:57 PM Vishal Santoshi 
wrote:

> ager.memory.process.size(none)MemorySizeTotal Process Memory size for the
> TaskExecutors. This includes all the memory that a TaskExecutor consumes,
> consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On
> containerized setups, this should be set to the container memory. See also
> 'taskmanager.memory.flink.size' for total Flink memory size configuration.
>
> The Total Flink Memory consists of off heap memory governed by fraction
> .
> In pure streaming case ( and non ROCKSDB state case )
>
> This is the size of off-heap memory managed by the memory manager,
> reserved for sorting, hash tables, caching of intermediate results and
> RocksDB state backend. Memory consumers can either allocate memory from the
> memory manager in the form of MemorySegments, or reserve bytes from the
> memory manager and keep their memory usage within that boundary.
>
> Is not used AFAIK . May be reduce the fraction to 0 ? We do not  use
> offline heap ( aka batch jobs ) on our cluster ?
>
>
> Any help will be appreciated.
>
> On Thu, Jul 9, 2020 at 9:25 AM Vishal Santoshi 
> wrote:
>
>> Hello folks,
>>   As established
>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
>>  ,
>> I set the taskmanager.memory.process.size and 
>> taskmanager.memory.task.off-heap.size
>> in my flink-conf.yaml and I see the 2 properties being pulled in.
>>
>> * - Loading configuration property: taskmanager.memory.process.size,
>> 8000m*
>>
>> * - Loading configuration property:
>> taskmanager.memory.task.off-heap.size, 1024m*
>>
>> I am not sure how the -Xmx and -Xms are calculated but I see
>>
>> *Starting taskexecutor as a console application on host
>> kafka-to-hdfs-taskmanager-dev-8597c78d9c-59dqw.*
>>
>> *VM settings:*
>>
>> *Min. Heap Size: 2.27G*
>>
>> *Max. Heap Size: 2.27G*
>>
>> *Using VM: OpenJDK 64-Bit Server VM*
>>
>>
>> What gives ?
>>
>> I am looking through the scripts and am not sure I see any calculations
>> based on taskmanager.memory.process.size
>>
>>
>>


Re: pyflink1.11.0window

2020-07-09 Thread Shuiqiang Chen
琴师你好,

你的source ddl里有指定time1为 time attribute吗?
create table source1(
id int,
time1 timestamp,
type string,
WATERMARK FOR time1 as time1 - INTERVAL '2' SECOND
) with (...)

奇怪的不朽琴师 <1129656...@qq.com> 于2020年7月10日周五 上午8:43写道:

> --原始邮件--
> 发件人:
>   "奇怪的不朽琴师"
> <
> 1129656...@qq.com;
> 发送时间:2020年7月9日(星期四) 下午5:08
> 收件人:"godfrey he"
> 主题:pyflink1.11.0window
>
>
>
> 你好:
>  我在使用pyflink1.11版本时,window开窗仍会报错
> : org.apache.flink.table.api.ValidationException: A group window expects a
> time attribute for grouping in a stream environment.
>
> 请问这个问题没有修复么?或者是我使用的方式不对,如果是使用不对,能提供一个正确的案例么?
> 代码如下
> 谢谢
>
>
> def from_kafka_to_kafka_demo():
>   s_env =
> StreamExecutionEnvironment.get_execution_environment()
>   s_env.set_parallelism(1)
>
>
>   # use blink table planner
>   st_env = StreamTableEnvironment.create(s_env)
>
>
>   # register source and sink
>   register_rides_source(st_env)
>   register_rides_sink(st_env)
>
>
>   st_env.from_path("source1")\
>
> .window(Tumble.over("1.secends").on("time1").alias("w")) \
> .group_by("w") \
> .select(" id, time1 , time1 ")\
> .insert_into("sink1")
>  
>   st_env.execute("2-from_kafka_to_kafka")
>
>
>
>
> def register_rides_source(st_env):
>   source_ddl = \
>   '''
>   create table source1(
> id int,
>   time1 timestamp,
>   type string
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp1',
>   'connector.properties.bootstrap.servers' = 'localhost:9092'
>   )
>   '''
>   st_env.sql_update(source_ddl)
>
>
>
>
> def register_rides_sink(st_env):
>   sink_ddl = \
>   '''
>   create table sink1(
> id int,
>   time1 timestamp,
>   time2 timestamp
>   ) with (
>   'connector.type' = 'kafka',
>   'update-mode' = 'append',
>   'connector.topic' = 'tp3',
>   'connector.properties.bootstrap.servers' = 'localhost:9092'
>   )
>   '''
>   st_env.sql_update(sink_ddl)
>
>
>
>
> if __name__ == '__main__':
>   from_kafka_to_kafka_demo()
>
>
>  


flink 1.10 sql kafka format json 定制schema时, 一个字段的数据可以定义为类似json object不?

2020-07-09 Thread Peihui He
Hello:

在用flink sql从kafka消费数据时,有些json比较复杂,想直接定义为object,在后续通过udf转为string。

 有什么办法不?我在建表的时候有提示过json_object 类型,但是用了又提示其他类型。


Best wishes.


flink 1.11 使用sql写入hdfs无法自动提交分区

2020-07-09 Thread Jun Zhang
大家好:
 我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢

我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。

public static void main(String[] args) throws Exception{
StreamExecutionEnvironment bsEnv =
StreamExecutionEnvironment.getExecutionEnvironment();
bsEnv.enableCheckpointing(1);
bsEnv.setParallelism(1);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);

String sqlSource = "CREATE TABLE  source_kafka (\n" +
  "appName  STRING,\n" +
  "appVersion STRING,\n" +
  "uploadTime STRING\n" +
  ") WITH (\n" +
  "  'connector.type' = 'kafka',   \n" +
  "  'connector.version' = '0.10',\n" +
  "  'connector.topic' = 'mytest',\n" +
  "  'connector.properties.zookeeper.connect' =
'localhost:2181',\n" +
  "  'connector.properties.bootstrap.servers' =
'localhost:9092',\n" +
  "  'connector.properties.group.id' = 'testGroup',\n" +
  "  'format.type'='json',\n" +
  "  'update-mode' = 'append' )";

tEnv.executeSql(sqlSource);

String sql = "CREATE TABLE fs_table (\n" +
"appName  STRING,\n" +
"appVersion STRING,\n" +
"uploadTime STRING,\n" +
"  dt STRING," +
"  h string" +
")  PARTITIONED BY (dt,h)  WITH (\n" +
"  'connector'='filesystem',\n" +
 "  'path'='hdfs://localhost/tmp/',\n" +
 " 'sink.partition-commit.policy.kind' =
'success-file', " +
 "  'format'='orc'\n" +
 ")";
tEnv.executeSql(sql);

String insertSql = "insert into  fs_table SELECT appName
,appVersion,uploadTime, " +
  " DATE_FORMAT(LOCALTIMESTAMP, '-MM-dd'),
DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";

tEnv.executeSql(insertSql);

}


??????pyflink1.11.0window

2020-07-09 Thread ??????????????
----
??: 
   "??" 
   
<1129656...@qq.com;
:2020??7??9??(??) 5:08
??:"godfrey he"

Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Hi Gordon,
ThreadPoolSize default is 10. I have parallelism of 80 spread out across 32
nodes.
Could it be that the 80 threads get bottlenecked on a common ThreadPool of
10 or is it spawning 80 * 10 threads in total. The Flink TaskManagers run
in separate slots/vCPUs and can be spread across 32 nodes in my case but
occupying 80 slots/vCPUs. Is my understanding correct and will this be the
reason that the KPL gets flooded with too many pending requests at regular
intervals ??

TIA,

On Thu, Jul 9, 2020 at 12:15 PM Vijay Balakrishnan 
wrote:

> Thanks,Gordon for your reply.
>
> I do not set a queueLimit and so the default unbounded queueSize is 
> 2147483647.
> So, it should just be dropping records being produced from the
> 80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
> do not want backpressure as you said it effectively blocks all upstream
> operators.
>
> But from what you are saying, it will apply backpressure when the number
> of outstanding records accumulated exceeds the default queue limit of 
> 2147483647
> or* does it also do it if it is r**ate-limited* *to 1MB per second per
> shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
> probable.
>
> So, calculating Queue Limit:
> Based on this, my records size = 1600 bytes. I have 96 shards
> Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size
> of 100kB per shard should be sufficient.So, Queue size/shard=100KB
> Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
> Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25
>
> Acc. to the docs:
>
> By default, FlinkKinesisProducer does not backpressure. Instead, records
> that cannot be sent because of the rate restriction of 1 MB per second per
> shard are buffered in an unbounded queue and dropped when their RecordTtl
> expires.
>
> To avoid data loss, you can enable backpressuring by restricting the size
> of the internal queue:
>
> // 200 Bytes per record, 1 shard
> kinesis.setQueueLimit(500);
>
>
> On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Vijay,
>>
>> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
>> It does however apply backpressure (therefore effectively blocking all
>> upstream operators) when the number of outstanding records accumulated
>> exceeds a set limit, configured using the
>> FlinkKinesisProducer#setQueueLimit
>> method.
>>
>> For starters, you can maybe check if that was set appropriately.
>>
>> Cheers,
>> Gordon
>>
>>
>>
>> --
>> Sent from:
>> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>>
>


MalformedClassName for scala case class

2020-07-09 Thread Georg Heiler
Hi,

why can't I register the stream as a table and get a MalformedClassName
exception?

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

case class Foo(lang: String, count: Int)
val r = stream
.map(e => {
  Foo(e.get("value").get("lang").asText(), 1)
})
.keyBy(_.lang)
.timeWindow(Time.seconds(10))
.sum("count")
r.print()
stenv.registerDataStream("tweets_json", r)

Best,
Georg


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Great. Thanks.
But would it be possible to automate this i.e. to have this work
automatically for the case class / product?

Am Do., 9. Juli 2020 um 20:21 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> The performant way would be to apply a map function over the stream and
> then use the Jackson ObjectMapper to convert to scala objects. In flink
> there is no API like Spark to automatically get all fields.
>
> On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
> wrote:
>
>> How can I use it with a scala case class?
>> If I understand it correctly for better performance the Object Mapper is
>> already initialized in each KafkaConsumer and returning ObjectNodes. So
>> probably I should rephrase to: how can I then map these to case classes
>> without handcoding it?  https://github.com/json4s/json4s or
>> https://github.com/FasterXML/jackson-module-scala both only seem to
>> consume strings.
>>
>> Best,
>> Georg
>>
>> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
>> taher...@gmail.com>:
>>
>>> You can try the Jackson ObjectMapper library and that will get you from
>>> json to object.
>>>
>>> Regards,
>>> Taher Koitawala
>>>
>>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
>>> wrote:
>>>
 Hi,

 I want to map a stream of JSON documents from Kafka to a scala
 case-class. How can this be accomplished using the
 JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
 required?

 I have a Spark background. There, such manual mappings usually are
 discouraged. Instead, they offer a nice API (dataset API) to perform such a
 type of assignment.
 1) this is concise
 2) it operates on sparks off-heap memory representations (tungsten) to
 be faster

 In Flink, instead, such off-heap optimizations seem not to be talked
 much about (sorry if I miss something, I am a Flink newbie). Is there a
 reason why these optimizations are not necessary in Flink?


 How could I get the following example:
 val serializer = new JSONKeyValueDeserializationSchema(false)
 val stream = senv.addSource(
 new FlinkKafkaConsumer(
   "tweets-raw-json",
   serializer,
   properties
 ).setStartFromEarliest() // TODO experiment with different start
 values
   )

 to map to this Tweet class concisely, i.e. without manually iterating
 through all the attribute fields and parsing the keys from the object node
 tree.

 final case class Tweet(tweet_id: Option[String], text: Option[String],
 source: Option[String], geo: Option[String], place: Option[String], lang:
 Option[String], created_at: Option[String], timestamp_ms: Option[String],
 coordinates: Option[String], user_id: Option[Long], user_name:
 Option[String], screen_name: Option[String], user_created_at:
 Option[String], followers_count: Option[Long], friends_count: Option[Long],
 user_lang: Option[String], user_location: Option[String], hashtags:
 Option[Seq[String]])

 Best,
 Georg

>>>


Re: Stateful Functions: Routing to remote functions

2020-07-09 Thread Jan Brusch

Hi Igal,

I got around to toying around with your proposed solutions today. 
Unfortunately I didn't get it to work. However, you asked me to share 
the code and I prepared an example that provides a minimal reproduction 
of my use case and the corresponding error.


Please find it here: 
https://github.com/Bruschkov/statefun-remote-counter-example


As I said in an earlier mail: I found a way to get my desired setup to 
work via a wrapper function. However, I am still interested if and how a 
"direct routing" solution would work.


If you find the time to look at the code and give me some feedback, I 
would really appreciate it.



Best regards

Jan

On 06.07.20 14:11, Igal Shilman wrote:

Hi Jan,

Two followup questions:

1. Looking at the stack trace provided in your email, it does seem 
like the function type is unavailable, and I'd like to follow up on 
that: can you please share your Dockerfile, so
we have the complete picture. If you are not comfortable sharing that, 
then you can please try to execute into the container 
and manually validate that the module.yaml is present
both on the "worker" image and the "master" image, and it defines the 
remote function name correctly?


2. In your original email, the provided router does not route messages 
of type Any, but it actually
forwards them as-in, the remote functions API requires that the 
message being sent to the remote function

is of type Any.  Can you try something like this:

final class EventRouter implements Router {


 static final FunctionType PYTHON_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterPython");
 static final FunctionType JAVA_EVENT_COUNTER_TYPE = new
FunctionType("demo", "eventCounterJava");
 @Override
 public void route(com.google.protobuf.Message event,
Downstream downstream) {

downstream.forward(
 JAVA_EVENT_COUNTER_TYPE,
 "count",
 event)
 ;
 downstream.forward(
 new Address(
 PYTHON_EVENT_COUNTER_TYPE,
 "count"
 ),
Any.pack(event)
 );
 }
}



In addition you would have to change the definition of your ingress 
identifier to have a produced type of com.google.protobuf.Message

instead of an Event.


Good luck!
Igal

On Fri, Jul 3, 2020 at 10:09 PM Jan Brusch > wrote:


Hi Igal,

thanks for your reply. Initially I thought the same thing, but it
turns out I am able to call the remote function from an embedded
"wrapper" function using the exact same setup (Relevant Code
below). So that's one kind of solution to that Problem. But to me
it seems like it's a bit of a hack and not the idiomatic way to
solve this...

From my understanding of the address based communication within
Flink Stateful Functions, I feel like it should be possible to
call that function from the router directly. But I am probably
either using the Router wrong or misunderstand some of the ideas
behind address based communication...


EventRouter.java




final class EventRouter implements Router {

  @Override
  public void route(Event event, Downstream downstream) {
    downstream.forward(EventCounterWrapper.TYPE, "_", event);
  }
}


--


EventCounterWrapper.java


---

public class EventCounterWrapper implements StatefulFunction {

    static final FunctionType TYPE = new FunctionType("demo",
"eventCounterWrapper");
    public static final FunctionType REMOTE_FUNCTION_TYPE = new
FunctionType("demo/external", "eventCounterPython");

    @Override
    public void invoke(Context context, Object input) {
    if (input instanceof Event) {
    Event event = (Event) input;
    Any message = Any.pack(event);
    context.send(REMOTE_FUNCTION_TYPE, "_", message);
    }

    if (input instanceof Any) {
    final EventCount eventCount;
    try {
    eventCount = ((Any) input).unpack(EventCount.class);
    } catch (InvalidProtocolBufferException e) {
    throw new RuntimeException("Unexpected type", e);
    }
context.send(SessionIO.EVENT_COUNT_EGRESS_ID, eventCount);
    }
    }
}


---


worker.py

@functions.bind("demo/external/eventCounterPython")
def handle_event(context, _):
 state = 

Re: Asynchronous I/O poor performance

2020-07-09 Thread Arvid Heise
Hi Mark,

I already explained that this latency is only occurring because of the
shuffle step before async IO (e.g. data is sent over network).

If you replace

val x : DataStream[String] = someIntegers.map( _ =>
s"${System.currentTimeMillis()}")

with

val x : DataStream[String] = someIntegers.shuffle.map( _ =>
s"${System.currentTimeMillis()}")

You can see that latency between map and async IO becomes 0.

Throughput and latency are not directly related. You can have very high
throughput but also a high latency if you have many shuffle steps. Latency
is pretty much determined in how long a certain record lives in all network
buffers and the processing time of all involved operators. To understand
it, consider the following program

class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, String)] {
  override def asyncInvoke(str: String, resultFuture:
ResultFuture[(String, String)]): Unit = {
val start = str.substring(3).toLong
val delta = System.currentTimeMillis() - start
resultFuture.complete(Iterable((start.toString, s"${delta}")))
  }
}

object Job {
  def main(args: Array[String]): Unit = {
  // set up the execution environment
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
  //env.enableCheckpointing(10)
  env.setParallelism(1)

  val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
  //someIntegers.map { _ => System.currentTimeMillis()}.map{ s =>
System.currentTimeMillis()-s}.print()
val prefix = "*" * 3
  val x : DataStream[String] = someIntegers.map( _ =>
s"$prefix${System.currentTimeMillis()}")
  val resultStream: DataStream[(String, String)] =
AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L,
TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
  //AsyncDataStream.unorderedWait(data , new
AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
  resultStream.print()
  println(env.getConfig.getAutoWatermarkInterval)
  env.execute("Flink Scala API Skeleton")
  }
}

It's your program in condensed form, however, each record is prefixed with
30k * characters. Thus, in one network buffer only 1-2 records fit and thus
the latency is now very little (1-2 ms on my machine), because the record
does not live very long in network buffers.

However, if you would compare how long it takes to process 1m records, you
would see that your initial version is much faster = higher throughput,
because each buffer fits 3.2k records instead of 1-2.

So after you have verified that latency is indeed not an issue here, please
evaluate the thread pool size of akka.

On Thu, Jul 9, 2020 at 9:27 PM Mark Zitnik  wrote:

> Hi Arvid,
>
> The http client is not my buttoleneck as I said before I check the async
> and I have a delay until it enters to asyncinvoke about 80 ms if some can
> explained me why we have such big delay I have attached a sample code in my
> previous email can some one explain the delay
>
> Thanks
>
> On Mon, 6 Jul 2020, 23:31 Arvid Heise,  wrote:
>
>> Hi Mark,
>>
>> Async wait operators cannot be chained to sources so the messages go
>> through the network stack. Thus, having some latency is normal and cannot
>> be avoided. It can be tuned though, but I don't think that this is the
>> issue at hand as it should mostly impact latency and affect throughput
>> less. Since external I/O calls are much more heavy weight than our internal
>> communication, both the drop of throughput and the increase in latency are
>> usually dwarfed by the external I/O call costs.
>>
>> Please try to increase the thread pool for akka as written in my previous
>> email and report back.
>>
>> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik  wrote:
>>
>>> Hi Benchao,
>>>
>>> i have run this in the code:
>>>
>>> println(env.getConfig.getAutoWatermarkInterval)
>>>
>>> and got 200 i do fully understand how watermarks and AsyncOperator
>>> operator works, but
>>> i have decided to make a simple test that should evaluate the time it
>>> takes to enter to the asyncInvoke method  and it looks that it takes about
>>> 80ms witch is longer than the time it take to get a response from my
>>> micro-service
>>>
>>> code below
>>>
>>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
>>> String)] {
>>>
>>>   implicit lazy val executor: ExecutionContext = 
>>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>>
>>>   /*
>>>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
>>> Some(executor))
>>>   implicit val materializer = ActorMaterializer()
>>>   implicit val executionContext = actorSystem.dispatcher
>>>
>>>
>>>   println(materializer.system.name)
>>>   println("start")
>>>   */
>>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>>
>>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>>   var actorSystem: ActorSystem = null
>>>   var materializer: ActorMaterializer = null
>>> 

Re: Asynchronous I/O poor performance

2020-07-09 Thread Mark Zitnik
Hi Arvid,

The http client is not my buttoleneck as I said before I check the async
and I have a delay until it enters to asyncinvoke about 80 ms if some can
explained me why we have such big delay I have attached a sample code in my
previous email can some one explain the delay

Thanks

On Mon, 6 Jul 2020, 23:31 Arvid Heise,  wrote:

> Hi Mark,
>
> Async wait operators cannot be chained to sources so the messages go
> through the network stack. Thus, having some latency is normal and cannot
> be avoided. It can be tuned though, but I don't think that this is the
> issue at hand as it should mostly impact latency and affect throughput
> less. Since external I/O calls are much more heavy weight than our internal
> communication, both the drop of throughput and the increase in latency are
> usually dwarfed by the external I/O call costs.
>
> Please try to increase the thread pool for akka as written in my previous
> email and report back.
>
> On Mon, Jul 6, 2020 at 9:44 PM Mark Zitnik  wrote:
>
>> Hi Benchao,
>>
>> i have run this in the code:
>>
>> println(env.getConfig.getAutoWatermarkInterval)
>>
>> and got 200 i do fully understand how watermarks and AsyncOperator
>> operator works, but
>> i have decided to make a simple test that should evaluate the time it
>> takes to enter to the asyncInvoke method  and it looks that it takes about
>> 80ms witch is longer than the time it take to get a response from my
>> micro-service
>>
>> code below
>>
>> class AsyncDatabaseRequest extends RichAsyncFunction[String, (String, 
>> String)] {
>>
>>   implicit lazy val executor: ExecutionContext = 
>> ExecutionContext.fromExecutor(Executors.directExecutor())
>>
>>   /*
>>   implicit val actorSystem = ActorSystem.apply("test", None, None, 
>> Some(executor))
>>   implicit val materializer = ActorMaterializer()
>>   implicit val executionContext = actorSystem.dispatcher
>>
>>
>>   println(materializer.system.name)
>>   println("start")
>>   */
>> // redis-streaming-dev-new.xwudy5.ng.0001.use1.cache.amazonaws.com
>>
>>   // redis-streaming-dev-001.xwudy5.0001.use1.cache.amazonaws.com
>>   var actorSystem: ActorSystem = null
>>   var materializer: ActorMaterializer = null
>>   var executionContext: ExecutionContextExecutor = null
>>   //var akkaHttp: HttpExt = null
>>
>>   override def open(parameters: Configuration): Unit = {
>> actorSystem = akka.actor.ActorSystem(UUID.randomUUID().toString, 
>> Some(ConfigFactory.load("application.conf")), None, Some(executor))
>> materializer = ActorMaterializer()(actorSystem)
>> executionContext = actorSystem.dispatcher
>> //akkaHttp = Http(actorSystem)
>>   }
>>
>>   override def close(): Unit = {
>> actorSystem.terminate()
>>   }
>>
>>   override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, 
>> String)]): Unit = {
>> val start = str.toLong
>> val delta = System.currentTimeMillis() - start
>> resultFuture.complete(Iterable((str, s"${delta}")))
>>   }
>> }
>>
>>
>> object Job {
>>   def main(args: Array[String]): Unit = {
>> // set up the execution environment
>> val env = StreamExecutionEnvironment.getExecutionEnvironment
>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>> //env.enableCheckpointing(10)
>> env.setParallelism(1)
>>
>> val someIntegers: DataStream[Long] = env.generateSequence(1, 100)
>> //someIntegers.map { _ => System.currentTimeMillis()}.map{ s => 
>> System.currentTimeMillis()-s}.print()
>> val x : DataStream[String] = someIntegers.map( _ => 
>> s"${System.currentTimeMillis()}")
>> val resultStream: DataStream[(String, String)] = 
>> AsyncDataStream.unorderedWait(x, new AsyncDatabaseRequest(), 10L, 
>> TimeUnit.MILLISECONDS, 100)//.setParallelism(16)
>>   //AsyncDataStream.unorderedWait(data , new 
>> AsyncDatabaseRequest,3L,TimeUnit.SECONDS)
>> resultStream.print()
>> println(env.getConfig.getAutoWatermarkInterval)
>> env.execute("Flink Scala API Skeleton")
>>   }
>> }
>>
>> is this normal behavior?
>>
>>
>> On Mon, Jul 6, 2020 at 2:45 PM Benchao Li  wrote:
>>
>>> Hi Mark,
>>>
>>> According to your data, I think the config of AsyncOperator is OK.
>>> There is one more config that might affect the throughput of
>>> AsyncOperator, it's watermark.
>>> Because unordered async operator still keeps the order between
>>> watermarks, did you use
>>> event time in your job, and if yes, what's the watermark interval in
>>> your job?
>>>
>>> Mark Zitnik  于2020年7月5日周日 下午7:44写道:
>>>
 Hi Benchao

 The capacity is 100
 Parallelism is 8
 Rpc req is 20ms

 Thanks


 On Sun, 5 Jul 2020, 6:16 Benchao Li,  wrote:

> Hi Mark,
>
> Could you give more details about your Flink job?
> - the capacity of AsyncDataStream
> - the parallelism of AsyncDataStream operator
> - the time of per blocked rpc request
>
> Mark Zitnik  于2020年7月5日周日 上午3:48写道:
>
>> Hi
>>
>> In my flink 

Re: FlinkKinesisProducer blocking ?

2020-07-09 Thread Vijay Balakrishnan
Thanks,Gordon for your reply.

I do not set a queueLimit and so the default unbounded queueSize is 2147483647.
So, it should just be dropping records being produced from the
80(parallelism) * 10 (ThreadPoolSize) = 800 threads based on Recordttl. I
do not want backpressure as you said it effectively blocks all upstream
operators.

But from what you are saying, it will apply backpressure when the number of
outstanding records accumulated exceeds the default queue limit of 2147483647
or* does it also do it if it is r**ate-limited* *to 1MB per second per
shard by Kinesis* ? The 2nd case of Rate Limiting by Kinesis seems more
probable.

So, calculating Queue Limit:
Based on this, my records size = 1600 bytes. I have 96 shards
Assuming - With the default RecordMaxBufferedTime of 100ms, a queue size of
100kB per shard should be sufficient.So, Queue size/shard=100KB
Queue Limit with 96 shards = (96 * 10^5)/ 1600 = 6000
Queue Limit with 4 shards = (4 * 10^5)/ 1600 = 0.25

Acc. to the docs:

By default, FlinkKinesisProducer does not backpressure. Instead, records
that cannot be sent because of the rate restriction of 1 MB per second per
shard are buffered in an unbounded queue and dropped when their RecordTtl
expires.

To avoid data loss, you can enable backpressuring by restricting the size
of the internal queue:

// 200 Bytes per record, 1 shard
kinesis.setQueueLimit(500);


On Wed, Jul 8, 2020 at 12:20 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Vijay,
>
> The FlinkKinesisProducer does not use blocking calls to the AWS KDS API.
> It does however apply backpressure (therefore effectively blocking all
> upstream operators) when the number of outstanding records accumulated
> exceeds a set limit, configured using the
> FlinkKinesisProducer#setQueueLimit
> method.
>
> For starters, you can maybe check if that was set appropriately.
>
> Cheers,
> Gordon
>
>
>
> --
> Sent from:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
>


Re: AsyncFunction retries

2020-07-09 Thread Arvid Heise
Hi Gadi,

FutureUtils is not a public API, so there are no single guarantees that if
the method works now, it would work in any coming Flink version.

Rather, I'd first check if you can use httpcomponents client 5.0+, then you
could simply use the retry handler [1].
If not, then I'd probably copy the code of retryWithDelay and and adjust
the code to use the executor of httpcomponents (whenComplete instead of
whenCompleteAsync).

In general, as long as you are not synchronously calling in AsyncFunction,
you can choose any thread you want. I'm not aware that
FutureUtils.retryWithDelay actually uses the common pool (after all you
need to supply an executor).

[1]
https://hc.apache.org/httpcomponents-client-5.0.x/httpclient5/apidocs/org/apache/hc/client5/http/impl/async/H2AsyncClientBuilder.html#setRetryStrategy(org.apache.hc.client5.http.HttpRequestRetryStrategy)

On Thu, Jul 9, 2020 at 4:43 PM Gadi Katsovich 
wrote:

> Hi all,
> I have a job with the following diagram:
> source -> Flat Map -> Filter -> Filter -> Filter -> async wait operator ->
> Process -> sink
>
> The async operation sends an HTTP post (using Apache HttpAsyncClient).
> In case the HTTP post times out or fails, I want to retry a few times.
>
> Is using FutureUtils.retryWithDelay() acceptable in user code?
> I tried it with local tests and the application works as expected.
> However, the API requires a ScheduledExecutor. And even though I provide
> one I see that the code is executed on the ForkJoin common pool.
>
> I'm a bit confused as to how the threads work here, and I'm afraid to take
> up resources needed for Flink framework operation.
>
> Please advise.
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Validating my understanding of SHARD_DISCOVERY_INTERVAL_MILLIS

2020-07-09 Thread Vijay Balakrishnan
Hi,
I see these 2 constants- SHARD_GETRECORDS_INTERVAL_MILLIS &
SHARD_DISCOVERY_INTERVAL_MILLIS.

My understanding was SHARD_GETRECORDS_INTERVAL_MILLIS defines how often
records are fetched from Kinesis Data Stream(KDS). Code seems to be doing
this in ShardConsumer.run()-->getRecords()

SHARD_DISCOVERY_INTERVAL_MILLIS defines how often the KinesisConsmer checks
if there are any changes to shards. We don't change shards during our
Application run.I have changed it to a very high value to avoid this check
as I was running into ListShards issues with LimitExceedeException when
using 282 shards
Would this be a correct understanding of these 2 constants -especially the
SHARD_DISCOVERY_INTERVAL_MILLIS

My assumption that needs to be validated:
The SHARD_DISCOVERY_INTERVAL_MILLIS should not affect the fetching of
records as defined by SHARD_GETRECORDS_INTERVAL_MILLIS.

Code below:
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS,
getRecsIntervalMs);//2000

/*
We do not change shards while the app is running.
So, we can increase SHARD_DISCOVERY_INTERVAL_MILLIS to a very high value to
avoid any rateLimiting issues from the AWS API with the ListShards call.
Default is 10s. We can increase this to avoid this LimitExceededException
as we don't change shards in the middle.
 */

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS,
shardDiscoveryInterval);//1800 ms


TIA,


Re: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Arvid Heise
Hi Lucas,

1.9.2 [1] support depends on Hive upgrading as well [2] . You could cast a
vote on both tickets to accelerate it.

Schema registry 5.5.0 depends on Avro 1.9.2 and I'm not sure what the
implications of a downgrade are.

Of course, you could build the module yourself with 5.5.0, test, and report
back. [3]

[1] https://issues.apache.org/jira/browse/FLINK-12532
[2] https://issues.apache.org/jira/browse/HIVE-21737
[3]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/pom.xml#L33

On Thu, Jul 9, 2020 at 8:09 PM Lucas Heimberg 
wrote:

> Hello,
>
> I noticed that even in Flink 1.11. Avro in flink-avro and the Kafka Schema
> Registry client in flink-avro-confluent-registry are still at version 1.8.2
> and 4.1.0, respectively.
>
> Avro 1.9.2 brings a lot of improvements and bugfixes, in particular in
> respect to logical types.
> The Kafka Schema Registry Client 5.5.0 finally supports schema references,
> i.e., schemas that are composed from different subjects of the Schema
> Registry, which is a very useful feature for the reuse of schemas.
>
> I would like to ask if there are plans to bump both version numbers in the
> near future, or whether there are specific obstacles for that?
>
> Thank you very much & kind regards,
> Lucas
>
> --
> Dr. Lucas Heimberg
>


-- 

Arvid Heise | Senior Java Developer



Follow us @VervericaData

--

Join Flink Forward  - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
(Toni) Cheng


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Taher Koitawala
The performant way would be to apply a map function over the stream and
then use the Jackson ObjectMapper to convert to scala objects. In flink
there is no API like Spark to automatically get all fields.

On Thu, Jul 9, 2020, 11:38 PM Georg Heiler 
wrote:

> How can I use it with a scala case class?
> If I understand it correctly for better performance the Object Mapper is
> already initialized in each KafkaConsumer and returning ObjectNodes. So
> probably I should rephrase to: how can I then map these to case classes
> without handcoding it?  https://github.com/json4s/json4s or
> https://github.com/FasterXML/jackson-module-scala both only seem to
> consume strings.
>
> Best,
> Georg
>
> Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
> taher...@gmail.com>:
>
>> You can try the Jackson ObjectMapper library and that will get you from
>> json to object.
>>
>> Regards,
>> Taher Koitawala
>>
>> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
>> wrote:
>>
>>> Hi,
>>>
>>> I want to map a stream of JSON documents from Kafka to a scala
>>> case-class. How can this be accomplished using the
>>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>>> required?
>>>
>>> I have a Spark background. There, such manual mappings usually are
>>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>>> type of assignment.
>>> 1) this is concise
>>> 2) it operates on sparks off-heap memory representations (tungsten) to
>>> be faster
>>>
>>> In Flink, instead, such off-heap optimizations seem not to be talked
>>> much about (sorry if I miss something, I am a Flink newbie). Is there a
>>> reason why these optimizations are not necessary in Flink?
>>>
>>>
>>> How could I get the following example:
>>> val serializer = new JSONKeyValueDeserializationSchema(false)
>>> val stream = senv.addSource(
>>> new FlinkKafkaConsumer(
>>>   "tweets-raw-json",
>>>   serializer,
>>>   properties
>>> ).setStartFromEarliest() // TODO experiment with different start
>>> values
>>>   )
>>>
>>> to map to this Tweet class concisely, i.e. without manually iterating
>>> through all the attribute fields and parsing the keys from the object node
>>> tree.
>>>
>>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>>> source: Option[String], geo: Option[String], place: Option[String], lang:
>>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>>> coordinates: Option[String], user_id: Option[Long], user_name:
>>> Option[String], screen_name: Option[String], user_created_at:
>>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>>> user_lang: Option[String], user_location: Option[String], hashtags:
>>> Option[Seq[String]])
>>>
>>> Best,
>>> Georg
>>>
>>


Fwd: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Lucas Heimberg
Hello,

I noticed that even in Flink 1.11. Avro in flink-avro and the Kafka Schema
Registry client in flink-avro-confluent-registry are still at version 1.8.2
and 4.1.0, respectively.

Avro 1.9.2 brings a lot of improvements and bugfixes, in particular in
respect to logical types.
The Kafka Schema Registry Client 5.5.0 finally supports schema references,
i.e., schemas that are composed from different subjects of the Schema
Registry, which is a very useful feature for the reuse of schemas.

I would like to ask if there are plans to bump both version numbers in the
near future, or whether there are specific obstacles for that?

Thank you very much & kind regards,
Lucas

-- 
Dr. Lucas Heimberg


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
How can I use it with a scala case class?
If I understand it correctly for better performance the Object Mapper is
already initialized in each KafkaConsumer and returning ObjectNodes. So
probably I should rephrase to: how can I then map these to case classes
without handcoding it?  https://github.com/json4s/json4s or
https://github.com/FasterXML/jackson-module-scala both only seem to consume
strings.

Best,
Georg

Am Do., 9. Juli 2020 um 19:17 Uhr schrieb Taher Koitawala <
taher...@gmail.com>:

> You can try the Jackson ObjectMapper library and that will get you from
> json to object.
>
> Regards,
> Taher Koitawala
>
> On Thu, Jul 9, 2020, 9:54 PM Georg Heiler 
> wrote:
>
>> Hi,
>>
>> I want to map a stream of JSON documents from Kafka to a scala
>> case-class. How can this be accomplished using the
>> JSONKeyValueDeserializationSchema?Is a manual mapping of object nodes
>> required?
>>
>> I have a Spark background. There, such manual mappings usually are
>> discouraged. Instead, they offer a nice API (dataset API) to perform such a
>> type of assignment.
>> 1) this is concise
>> 2) it operates on sparks off-heap memory representations (tungsten) to be
>> faster
>>
>> In Flink, instead, such off-heap optimizations seem not to be talked much
>> about (sorry if I miss something, I am a Flink newbie). Is there a reason
>> why these optimizations are not necessary in Flink?
>>
>>
>> How could I get the following example:
>> val serializer = new JSONKeyValueDeserializationSchema(false)
>> val stream = senv.addSource(
>> new FlinkKafkaConsumer(
>>   "tweets-raw-json",
>>   serializer,
>>   properties
>> ).setStartFromEarliest() // TODO experiment with different start
>> values
>>   )
>>
>> to map to this Tweet class concisely, i.e. without manually iterating
>> through all the attribute fields and parsing the keys from the object node
>> tree.
>>
>> final case class Tweet(tweet_id: Option[String], text: Option[String],
>> source: Option[String], geo: Option[String], place: Option[String], lang:
>> Option[String], created_at: Option[String], timestamp_ms: Option[String],
>> coordinates: Option[String], user_id: Option[Long], user_name:
>> Option[String], screen_name: Option[String], user_created_at:
>> Option[String], followers_count: Option[Long], friends_count: Option[Long],
>> user_lang: Option[String], user_location: Option[String], hashtags:
>> Option[Seq[String]])
>>
>> Best,
>> Georg
>>
>


Re: map JSON to scala case class & off-heap optimization

2020-07-09 Thread Taher Koitawala
You can try the Jackson ObjectMapper library and that will get you from
json to object.

Regards,
Taher Koitawala

On Thu, Jul 9, 2020, 9:54 PM Georg Heiler  wrote:

> Hi,
>
> I want to map a stream of JSON documents from Kafka to a scala case-class.
> How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
> a manual mapping of object nodes required?
>
> I have a Spark background. There, such manual mappings usually are
> discouraged. Instead, they offer a nice API (dataset API) to perform such a
> type of assignment.
> 1) this is concise
> 2) it operates on sparks off-heap memory representations (tungsten) to be
> faster
>
> In Flink, instead, such off-heap optimizations seem not to be talked much
> about (sorry if I miss something, I am a Flink newbie). Is there a reason
> why these optimizations are not necessary in Flink?
>
>
> How could I get the following example:
> val serializer = new JSONKeyValueDeserializationSchema(false)
> val stream = senv.addSource(
> new FlinkKafkaConsumer(
>   "tweets-raw-json",
>   serializer,
>   properties
> ).setStartFromEarliest() // TODO experiment with different start values
>   )
>
> to map to this Tweet class concisely, i.e. without manually iterating
> through all the attribute fields and parsing the keys from the object node
> tree.
>
> final case class Tweet(tweet_id: Option[String], text: Option[String],
> source: Option[String], geo: Option[String], place: Option[String], lang:
> Option[String], created_at: Option[String], timestamp_ms: Option[String],
> coordinates: Option[String], user_id: Option[Long], user_name:
> Option[String], screen_name: Option[String], user_created_at:
> Option[String], followers_count: Option[Long], friends_count: Option[Long],
> user_lang: Option[String], user_location: Option[String], hashtags:
> Option[Seq[String]])
>
> Best,
> Georg
>


map JSON to scala case class & off-heap optimization

2020-07-09 Thread Georg Heiler
Hi,

I want to map a stream of JSON documents from Kafka to a scala case-class.
How can this be accomplished using the JSONKeyValueDeserializationSchema?Is
a manual mapping of object nodes required?

I have a Spark background. There, such manual mappings usually are
discouraged. Instead, they offer a nice API (dataset API) to perform such a
type of assignment.
1) this is concise
2) it operates on sparks off-heap memory representations (tungsten) to be
faster

In Flink, instead, such off-heap optimizations seem not to be talked much
about (sorry if I miss something, I am a Flink newbie). Is there a reason
why these optimizations are not necessary in Flink?


How could I get the following example:
val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

to map to this Tweet class concisely, i.e. without manually iterating
through all the attribute fields and parsing the keys from the object node
tree.

final case class Tweet(tweet_id: Option[String], text: Option[String],
source: Option[String], geo: Option[String], place: Option[String], lang:
Option[String], created_at: Option[String], timestamp_ms: Option[String],
coordinates: Option[String], user_id: Option[Long], user_name:
Option[String], screen_name: Option[String], user_created_at:
Option[String], followers_count: Option[Long], friends_count: Option[Long],
user_lang: Option[String], user_location: Option[String], hashtags:
Option[Seq[String]])

Best,
Georg


flink take single element from stream

2020-07-09 Thread Georg Heiler
How can I explore a stream in Flink interactively?

Spark has the concept of take/head to extract the first n elements of a
dataframe / table.

Is something similar available in Flink for a stream like:

val serializer = new JSONKeyValueDeserializationSchema(false)
val stream = senv.addSource(
new FlinkKafkaConsumer(
  "tweets-raw-json",
  serializer,
  properties
).setStartFromEarliest() // TODO experiment with different start values
  )

stream.head/take

does not seem to be implemented.


??????flink????????

2020-07-09 Thread ????????
new ProcessWindowFunction?
???




----
??: 
   "user-zh"



Re: Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
ager.memory.process.size(none)MemorySizeTotal Process Memory size for the
TaskExecutors. This includes all the memory that a TaskExecutor consumes,
consisting of Total Flink Memory, JVM Metaspace, and JVM Overhead. On
containerized setups, this should be set to the container memory. See also
'taskmanager.memory.flink.size' for total Flink memory size configuration.

The Total Flink Memory consists of off heap memory governed by fraction
.
In pure streaming case ( and non ROCKSDB state case )

This is the size of off-heap memory managed by the memory manager, reserved
for sorting, hash tables, caching of intermediate results and RocksDB state
backend. Memory consumers can either allocate memory from the memory
manager in the form of MemorySegments, or reserve bytes from the memory
manager and keep their memory usage within that boundary.

Is not used AFAIK . May be reduce the fraction to 0 ? We do not  use
offline heap ( aka batch jobs ) on our cluster ?


Any help will be appreciated.

On Thu, Jul 9, 2020 at 9:25 AM Vishal Santoshi 
wrote:

> Hello folks,
>   As established
> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
>  ,
> I set the taskmanager.memory.process.size and 
> taskmanager.memory.task.off-heap.size
> in my flink-conf.yaml and I see the 2 properties being pulled in.
>
> * - Loading configuration property: taskmanager.memory.process.size, 8000m*
>
> * - Loading configuration property: taskmanager.memory.task.off-heap.size,
> 1024m*
>
> I am not sure how the -Xmx and -Xms are calculated but I see
>
> *Starting taskexecutor as a console application on host
> kafka-to-hdfs-taskmanager-dev-8597c78d9c-59dqw.*
>
> *VM settings:*
>
> *Min. Heap Size: 2.27G*
>
> *Max. Heap Size: 2.27G*
>
> *Using VM: OpenJDK 64-Bit Server VM*
>
>
> What gives ?
>
> I am looking through the scripts and am not sure I see any calculations
> based on taskmanager.memory.process.size
>
>
>


Fwd: Avro and Kafka Schema Registry Client versions out of date

2020-07-09 Thread Lucas Heimberg
Hello,

I noticed that even in Flink 1.11. Avro in flink-avro and the Kafka Schema
Registry client in flink-avro-confluent-registry are still at version 1.8.2
and 4.1.0, respectively.

Avro 1.9.2 brings a lot of improvements and bugfixes, in particular in
respect to logical types.
The Kafka Schema Registry Client 5.5.0 finally supports schema references,
i.e., schemas that are composed from different subjects of the Schema
Registry, which is a very useful feature for the reuse of schemas.

I would like to ask if there are plans to bump both version numbers in the
near future, or whether there are specific obstacles for that?

Thank you very much & kind regards,
Lucas

--
Dr. Lucas Heimberg


AsyncFunction retries

2020-07-09 Thread Gadi Katsovich
Hi all,
I have a job with the following diagram:
source -> Flat Map -> Filter -> Filter -> Filter -> async wait operator ->
Process -> sink

The async operation sends an HTTP post (using Apache HttpAsyncClient).
In case the HTTP post times out or fails, I want to retry a few times.

Is using FutureUtils.retryWithDelay() acceptable in user code?
I tried it with local tests and the application works as expected. However,
the API requires a ScheduledExecutor. And even though I provide one I see
that the code is executed on the ForkJoin common pool.

I'm a bit confused as to how the threads work here, and I'm afraid to take
up resources needed for Flink framework operation.

Please advise.


Re: Task recovery?

2020-07-09 Thread John Smith
Hi Robert is my assumption correct?

On Fri., Jul. 3, 2020, 12:42 p.m. John Smith, 
wrote:

> Here is one log
>
> https://www.dropbox.com/s/s8uom5uto708izf/flink-job-001.log?dl=0
>
> If I understand correctly on June 23rd it suspended the jobs? So at that
> point they would no longer show in the UI or be restarted?
>
> On Fri, 3 Jul 2020 at 12:05, John Smith  wrote:
>
>> I didn't restart the job manager. Let me see if I can dig up the logs...
>> Also I just realised it's possible that the retry attempts to recover may
>> have been exhausted.
>>
>


Implications of taskmanager.memory.process.size

2020-07-09 Thread Vishal Santoshi
Hello folks,
  As established
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#memory-configuration
,
I set the taskmanager.memory.process.size and
taskmanager.memory.task.off-heap.size
in my flink-conf.yaml and I see the 2 properties being pulled in.

* - Loading configuration property: taskmanager.memory.process.size, 8000m*

* - Loading configuration property: taskmanager.memory.task.off-heap.size,
1024m*

I am not sure how the -Xmx and -Xms are calculated but I see

*Starting taskexecutor as a console application on host
kafka-to-hdfs-taskmanager-dev-8597c78d9c-59dqw.*

*VM settings:*

*Min. Heap Size: 2.27G*

*Max. Heap Size: 2.27G*

*Using VM: OpenJDK 64-Bit Server VM*


What gives ?

I am looking through the scripts and am not sure I see any calculations
based on taskmanager.memory.process.size


flink1.10升级到flink1.11 提交到yarn失败

2020-07-09 Thread Zhou Zach
hi all,
原来用1.10使用per job模式,可以提交的作业,现在用1.11使用应用模式提交失败,看日志,也不清楚原因,
yarn log:
Log Type: jobmanager.err


Log Upload Time: Thu Jul 09 21:02:48 +0800 2020


Log Length: 785


SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/yarn/nm/usercache/hdfs/appcache/application_1594271580406_0010/filecache/11/data-flow-1.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
log4j:WARN No appenders could be found for logger 
(org.apache.flink.runtime.entrypoint.ClusterEntrypoint).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more 
info.


Log Type: jobmanager.out


Log Upload Time: Thu Jul 09 21:02:48 +0800 2020


Log Length: 0




Log Type: prelaunch.err


Log Upload Time: Thu Jul 09 21:02:48 +0800 2020


Log Length: 0




Log Type: prelaunch.out


Log Upload Time: Thu Jul 09 21:02:48 +0800 2020


Log Length: 70


Setting up env variables
Setting up job resources
Launching container








本地log:
2020-07-09 21:02:41,015 INFO  org.apache.flink.client.cli.CliFrontend   
   [] - 

2020-07-09 21:02:41,020 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.address, localhost
2020-07-09 21:02:41,020 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.rpc.port, 6123
2020-07-09 21:02:41,021 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.memory.process.size, 1600m
2020-07-09 21:02:41,021 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.memory.process.size, 1728m
2020-07-09 21:02:41,021 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: taskmanager.numberOfTaskSlots, 1
2020-07-09 21:02:41,021 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: parallelism.default, 1
2020-07-09 21:02:41,021 INFO  
org.apache.flink.configuration.GlobalConfiguration   [] - Loading 
configuration property: jobmanager.execution.failover-strategy, region
2020-07-09 21:02:41,164 INFO  
org.apache.flink.runtime.security.modules.HadoopModule   [] - Hadoop user 
set to hdfs (auth:SIMPLE)
2020-07-09 21:02:41,172 INFO  
org.apache.flink.runtime.security.modules.JaasModule [] - Jaas file 
will be created as /tmp/jaas-2213111423022415421.conf.
2020-07-09 21:02:41,181 INFO  org.apache.flink.client.cli.CliFrontend   
   [] - Running 'run-application' command.
2020-07-09 21:02:41,194 INFO  
org.apache.flink.client.deployment.application.cli.ApplicationClusterDeployer 
[] - Submitting application in 'Application Mode'.
2020-07-09 21:02:41,201 WARN  
org.apache.flink.yarn.configuration.YarnLogConfigUtil[] - The 
configuration directory ('/opt/flink-1.11.0/conf') already contains a LOG4J 
config file.If you want to use logback, then please delete or rename the log 
configuration file.
2020-07-09 21:02:41,537 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - No path for the flink jar passed. Using the location of class 
org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-07-09 21:02:41,665 INFO  
org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing 
over to rm220
2020-07-09 21:02:41,717 INFO  org.apache.hadoop.conf.Configuration  
   [] - resource-types.xml not found
2020-07-09 21:02:41,718 INFO  
org.apache.hadoop.yarn.util.resource.ResourceUtils   [] - Unable to 
find 'resource-types.xml'.
2020-07-09 21:02:41,755 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Cluster specification: 
ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=4096, 
slotsPerTaskManager=1}
2020-07-09 21:02:42,723 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Submitting application master application_1594271580406_0010
2020-07-09 21:02:42,969 INFO  
org.apache.hadoop.yarn.client.api.impl.YarnClientImpl[] - Submitted 
application application_1594271580406_0010
2020-07-09 21:02:42,969 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Waiting for the cluster to be allocated
2020-07-09 21:02:42,971 INFO  org.apache.flink.yarn.YarnClusterDescriptor   
   [] - Deploying cluster, current state ACCEPTED
2020-07-09 21:02:47,619 INFO  org.apache.flink.yarn.YarnClusterDescriptor 

Re: Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread m...@sinoiov.com
hi:zhisheng:

这是TM日志,在这之前没有任何错误日志,

代码逻辑很简单:
SingleOutputStreamOperator> sourceStream = 
env.addSource(source)
.setParallelism(2)
.uid("DataProcessSource")
.flatMap(new DataConvertFunction())
.setParallelism(2)
.uid("DataProcessDataCovert")
.keyBy(new KeySelectorFunction())
.process(new DataCleanFunction())
.setParallelism(2)
.uid("DataProcessDataProcess");

AsyncDataStream.orderedWait(
sourceStream,
new AsyncDataCleanFunction(),
EnvUtil.TOOL.getLong(Constant.ASYNC_TOMEOUT),
TimeUnit.MILLISECONDS,
EnvUtil.TOOL.getInt(Constant.ASYNC_CAPACITY)
).uid("DataProcessAsync")
.setParallelism(2)
.addSink(sink)
.uid("DataProcessSinkKafka")
.setParallelism(2);

2020-07-09 19:33:37,291 WARN org.apache.kafka.clients.consumer.ConsumerConfig - 
The configuration 'gps.kafka.sasl' was supplied but isn't a known config.
2020-07-09 19:33:37,291 WARN org.apache.kafka.clients.consumer.ConsumerConfig - 
The configuration 'java.ext.dirs' was supplied but isn't a known config.
2020-07-09 19:33:37,291 WARN org.apache.kafka.clients.consumer.ConsumerConfig - 
The configuration 'java.class.version' was supplied but isn't a known config.
2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka version: 2.2.0
2020-07-09 19:33:37,291 INFO org.apache.kafka.common.utils.AppInfoParser - 
Kafka commitId: 05fcfde8f69b0349
2020-07-09 19:33:38,482 INFO com.sinoi.rt.common.protocol.HttpPoolUtil - http 
pool init,maxTotal:18,maxPerRoute:6
2020-07-09 19:33:38,971 WARN org.apache.kafka.clients.NetworkClient - [Producer 
clientId=producer-1] Error while fetching metadata with correlation id 8 : 
{=INVALID_TOPIC_EXCEPTION}
2020-07-09 19:33:38,974 INFO org.apache.kafka.clients.producer.KafkaProducer - 
[Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 
9223372036854775807 ms.
2020-07-09 19:33:41,612 INFO org.apache.flink.runtime.taskmanager.Task - async 
wait operator -> Sink: Unnamed (1/2) (cdbe008dcdb76813f88c4a48b9907d77) 
switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send 
data to Kafka: 
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
at 
org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:279)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:76)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:351)
at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:336)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.run(StreamTaskActionExecutor.java:87)
at org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:78)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:255)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.errors.InvalidTopicException: 
2020-07-09 19:33:41,615 INFO org.apache.flink.runtime.taskmanager.Task - 

Re: flink1.10.1在yarn上无法写入kafka的问题

2020-07-09 Thread zhisheng
hi,maqi

有完整的日志吗?在这个异常之前还有其他的异常信息吗?如果有,可以提供一下!

Best,
zhisheng

m...@sinoiov.com  于2020年7月9日周四 下午7:57写道:

>
> 请教各位:
> flink任务在本机写入测试环境kafka集群没问题,
>
> 但是上传到yarn环境,就是写不进去,其他job运行在yarn可以写入测试环境的kafka
>
> 异常信息如下:
>
> 2020-07-09 19:17:33,126 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> KeyedProcess (1/2) (9449b1e3b758a40fb5e1e60cf84fd844) switched from
> DEPLOYING to RUNNING.
> 2020-07-09 19:17:33,164 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph-
> KeyedProcess (2/2) (bc6eefd911cf44412121939d0afa6a81) switched from
> DEPLOYING to RUNNING.
> 2020-07-09 19:17:39,049 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph- async wait
> operator -> Sink: Unnamed (1/2) (cfc31005099a8ad7e44a94dc617dd45f) switched
> from RUNNING to FAILE
> D.
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to
> send data to Kafka:
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1225)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:767)
> at
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.invoke(FlinkKafkaProducer.java:99)
> at
> org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:235)
> at
> org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
> at
> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
> at
> org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:65)
> at
> org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue.emitCompletedElement(OrderedStreamElementQueue.java:71)
>
>
>
>
>
>
>


Re: A query on Flink metrics in kubernetes

2020-07-09 Thread Chesnay Schepler
From Flink's perspective no metrics are aggregated, nor are metric 
requests forwarded to some other process.


Each TaskExecutor has its own reporter, that each must be scraped to get 
the full set of metrics.


On 09/07/2020 11:39, Manish G wrote:

Hi,

I have a query regarding prometheus scraping Flink metrics data with 
application running in kubernetes cluster.


If taskmanager is running on multiple nodes, and prometheus requests 
for the metrics data, then is that request directed to one of the 
nodes(based on some strategy, like round-robin) or is data aggregated 
from all the nodes?


With regards





Re: pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 Thread Dian Fu
Table 
API的作业在执行之前会经过一系列的rule优化,最终的执行计划,存在一个UDF调用多次的可能,你可以把执行计划打印出来看看(TableEnvironment#explain)。

具体原因,需要看一下作业逻辑。可以发一下你的作业吗?可重现代码即可。

> 在 2020年7月9日,下午5:50,lgs <9925...@qq.com> 写道:
> 
> Hi,
> 
> 我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。
> log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?
> 
> 2020-07-09 17:44:17,501 INFO  flink_test_stream_time_kafka.py:22  
> 
> [] - start to ad
> 2020-07-09 17:44:17,530 INFO  flink_test_stream_time_kafka.py:63  
> 
> [] - start to send rest api.
> 2020-07-09 17:44:17,532 INFO  flink_test_stream_time_kafka.py:69  
> 
> [] - Receive: {"Received": "successful"}
> 2020-07-09 17:44:17,579 INFO 
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:564
> [] - Creating insecure state channel for localhost:57954.
> 2020-07-09 17:44:17,580 INFO 
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:571
> [] - State channel established.
> 2020-07-09 17:44:17,584 INFO 
> /home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:526
> [] - Creating client data channel for localhost:60902
> 2020-07-09 17:44:17,591 INFO 
> org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn
> Data client connected.
> 2020-07-09 17:44:17,761 INFO  flink_test_stream_time_kafka.py:22  
> 
> [] - start to ad
> 2020-07-09 17:44:17,810 INFO  flink_test_stream_time_kafka.py:63  
> 
> [] - start to send rest api.
> 2020-07-09 17:44:17,812 INFO  flink_test_stream_time_kafka.py:69  
> 
> [] - Receive: {"Received": "successful"}
> 
> 
> 
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/



pyflink udf中发送rest api会导致udf被调用两次

2020-07-09 Thread lgs
Hi,

我观察到一个现象:我定义了一个tumble window,调用一个python udf,在这个udf里面使用requests发送rest api。
log显示这个udf会被调用两次。相隔不到一秒。这个是什么原因?requests库跟beam冲突了?

2020-07-09 17:44:17,501 INFO  flink_test_stream_time_kafka.py:22
  
[] - start to ad
2020-07-09 17:44:17,530 INFO  flink_test_stream_time_kafka.py:63
  
[] - start to send rest api.
2020-07-09 17:44:17,532 INFO  flink_test_stream_time_kafka.py:69
  
[] - Receive: {"Received": "successful"}
2020-07-09 17:44:17,579 INFO 
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:564
[] - Creating insecure state channel for localhost:57954.
2020-07-09 17:44:17,580 INFO 
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/sdk_worker.py:571
[] - State channel established.
2020-07-09 17:44:17,584 INFO 
/home/sysadmin/miniconda3/lib/python3.7/site-packages/apache_beam/runners/worker/data_plane.py:526
[] - Creating client data channel for localhost:60902
2020-07-09 17:44:17,591 INFO 
org.apache.beam.runners.fnexecution.data.GrpcDataService [] - Beam Fn
Data client connected.
2020-07-09 17:44:17,761 INFO  flink_test_stream_time_kafka.py:22
  
[] - start to ad
2020-07-09 17:44:17,810 INFO  flink_test_stream_time_kafka.py:63
  
[] - start to send rest api.
2020-07-09 17:44:17,812 INFO  flink_test_stream_time_kafka.py:69
  
[] - Receive: {"Received": "successful"}



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Check pointing for simple pipeline

2020-07-09 Thread Dawid Wysakowicz
Hi Prasanna,

I'd like to add my two cents here. I would not say using the incremental
checkpoint is always the best choice. It might have its downsides when
restoring from the checkpoint as it will have to apply all the deltas.
Therefore restoring from a non-incremental checkpoint might be faster.


As Yun Tang, mentioned the incremental checkpoints are supported by
RocksDB only. You don't necessarily need the RocksDB state backend in
all cases. If you are sure that the state will fit into the memory (it
is probably the case for such a simple job, especially if the map
function is stateless), you should be good with the Filesystem state
backend[1]. This state backend should be faster as it does not need to
spill anything to disk and keeps everything in a deserialized form
during the runtime.


You might also find this short post[2] helpful.


Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/state_backends.html#the-fsstatebackend

[2]
https://www.ververica.com/blog/stateful-stream-processing-apache-flink-state-backends


On 08/07/2020 05:25, Yun Tang wrote:
> Hi Prasanna
>
> Using incremental checkpoint is always better than not as this is
> faster and less memory consumed.
> However, incremental checkpoint is only supported by RocksDB
> state-backend.
>
>
> Best
> Yun Tang
> 
> *From:* Prasanna kumar 
> *Sent:* Tuesday, July 7, 2020 20:43
> *To:* d...@flink.apache.org ; user
> 
> *Subject:* Check pointing for simple pipeline
>  
> Hi ,
>
> I have pipeline. Source-> Map(JSON transform)-> Sink.. 
>
> Both source and sink are Kafka. 
>
> What is the best checkpoint ing mechanism?
>
>  Is setting checkpoints incremental a good option? What should be
> careful of? 
>
> I am running it on aws emr.
>
> Will checkpoint slow the speed? 
>
> Thanks,
> Prasanna.


signature.asc
Description: OpenPGP digital signature


A query on Flink metrics in kubernetes

2020-07-09 Thread Manish G
Hi,

I have a query regarding prometheus scraping Flink metrics data with
application running in kubernetes cluster.

If taskmanager is running on multiple nodes, and prometheus requests for
the metrics data, then is that request directed to one of the nodes(based
on some strategy, like round-robin) or is data aggregated from all the
nodes?

With regards


Re:flink 双流join报错,java.lang.AssertionError

2020-07-09 Thread sunfulin



hi,
我切到最新的1.11 release版本,跑同样的sql,没有抛出异常。想问下这有相关的issue么?想确认下原因。














在 2020-07-09 16:53:34,"sunfulin"  写道:
>hi,
>我使用flink 1.10.1 
>blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
> 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?
>
>
>select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name 
>as product_name, cast(B.balance as double) as balance
>from (
>select toLong(behaviorTime, true) as recvTime, user_id,
>cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
>regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
>regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime 
>from kafka_zl_etrack_event_stream
>where pageId = ''
>and eventId = 'click'
>and btnId = '
>and CHARACTER_LENGTH(user_id) > 4
>) A
>left join
>(
>select customerNumber, balance, fundCode, lastUpdateTime, proctime
>  from lscsp_sc_order_all
>   where `status` = '4'
> and businessType IN ('4','5','14','16','17','18')
> and fundCode IS NOT NULL
> and balance IS NOT NULL
> and lastUpdateTime IS NOT NULL
>) B
>on A.user_id = B.customerNumber and A.fund_code = B.fundCode
>group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
>cast(B.balance as double)
>
>
>
>
>
>
>Exception in thread "main" java.lang.AssertionError
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
>at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
>at 
>org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
>at 
>org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
>at 
>org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)


Re: Heterogeneous or Dynamic Stream Processing

2020-07-09 Thread Dawid Wysakowicz
Hi Rob,

Maybe I am quite late to the party, but I think it might be worth having
a look at the Stateful functions API[1] as well. Especially your latest
approach reminds me a bit about the architecture of the Stateful
functions. There you can have arbitrary routing to functions. You can
also delegate some functions execution to external services.

[1]
https://ci.apache.org/projects/flink/flink-statefun-docs-release-2.1/concepts/distributed_architecture.html

Best,

Dawid

On 09/07/2020 10:17, Arvid Heise wrote:
> Hi Rob,
>
> your approach looks good, but I haven't used iterations in streams
> yet. If it works for you, it can't be bad anyways.
>
> If it is indeed working as expected, I'd recommend checking out
> broadcasts to distribute the rules [1]. This pattern will allow you to
> dynamically add new rules via a special input source (like Kafka topic).
> Also on datastream API level, you have the option to emit side-outputs
> [2]. That would be a natural choice for error handling.
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html
>
> On Wed, Jul 8, 2020 at 2:28 AM Rob Shepherd  > wrote:
>
> Thank you for the excellent clarifications.
> I couldn't quite figure out how to map the above to my domain.
>
> Nevertheless i have a working demo that performs the following
> pseudo code:
>
> Let's say that each "channel" has slightly different stream
> requirements
> and we can look up the list of operations needed to be performed
> using a channel key.
> (an operation is our term for some element of processing, a FaaS
> call or local routine maybe)
>
> 1. extract channel key from incoming message
> 2. lookup channel info and enrich the stream object with channel
> info and a list of operations
> 3. i...n using the iterative stream API, loop around each
> operation in the list from (2).
> 4. sink
>
> 
> https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52
>
> I've some work to do to understand storing and retrieving state,
> as my demo just stores the loop-state in my main stream object - I
> don't know whether this is bad or bad-practice.
>
> I'd be really grateful if anyone can cast their eye on this little
> demo and see if there are any gotchas or pitfalls I'm likely to
> succumb to with this approach.
>
> With thanks for your help getting started
>
>
>
> Rob Shepherd BEng PhD
>
>
>
> On Tue, 7 Jul 2020 at 19:26, Arvid Heise  > wrote:
>
> Hi Rob,
>
> 1. When you start a flink application, you actually just
> execute a Java main called the driver. This driver submits a
> job graph to the job manager, which executes the job. Since
> the driver is an ordinary Java program that uses the Flink
> API, you can compose the job graph in any way you want. Have a
> look at one example to see what I mean [1]. It's not hard to
> imagine that you can compose a query such as
>
> List extractorQueries = new ArrayList<>(); Table table = 
> tableEnvironment.from("testCatalog.`default`.testTable"); Table errors = 
> tableEnvironment.fromValues(); for (int index = 0; index < 
> extractorQueries.size(); index++) {
>String extractorQuery = extractorQueries.get(index); table = 
> table.addColumns($(extractorQuery).as("extractedValue" + index, "error")); 
> errors = errors.unionAll(table.filter($("error").isNotNull())); table = 
> table.filter($("error").isNull()).dropColumns($("error")); }
> // write table and errors
>
> This query first loads the data from a testTable and then
> successively applies sql expressions that calculate one value
> + one error column each. The value is stored in
> extractedValue0...99 (assuming 100 extractor queries). All
> values that cause errors, will have a value in the error
> column set. These are collected in the table "errors" for side
> output (very useful for debugging and improving the extractor
> queries). All good records (error IS NULL) are retained for
> further processing and the error column gets dropped.
>
> Btw there is also a Python entry point available, which offers
> you more or less the same. I just haven't tried it yet. [2]
>
> Lastly, currently all extractors are executed in succession.
> Of course, it is also possible to run them independently if
> you have different source streams. You can then later join /
> union them.
>
> 2. The downside of this driver approach is that changes in the
> configuration are not directly reflected. However, upon
> restart Flink will adapt the changes and recover 

flink 提交 offset 到 kafka

2020-07-09 Thread liangji
flink:1.6.2(部分集群未升级。)
kafka:0.11
作业从kafka中消费消息,并运行在yarn上,提供的作业未配置checkpoint,autoCommit设置为true。
作业刚启动时通过kafka-console-consumer.sh可以正常观察到提交的offset,大概50分钟左右,通过kafka-console-consumer.sh就看不到相应的offset信息了(期间没有新消息),请问下flink是有什么机制吗?另外在flink
web ui中看到的committed-offset metric一直显示的是 -915623761776,这是为什么?请大佬们指教,多谢。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink 双流join报错,java.lang.AssertionError

2020-07-09 Thread sunfulin
hi,
我使用flink 1.10.1 
blink-planner。运行以下SQL时,抛出异常。其中A和B分别是两个Kafka消息流。任务使用processtime。如果我把join的B表的select
 具体字段名 修改为 select *,貌似就可以执行。但是拿到的B表字段顺序貌似是错乱的。请问这个问题是bug么?


select A.recvTime, A.khh, A.live_id, A.fund_code as product_code, A.fund_name 
as product_name, cast(B.balance as double) as balance
from (
select toLong(behaviorTime, true) as recvTime, user_id,
cast(regexp_extract(btnTitle, 'zbid=\{([^|]*)\}', 1) as int) as live_id,
regexp_extract(btnTitle, 'fundname=\{([^|]*)\}', 1) as fund_name,
regexp_extract(btnTitle, 'fundcode=\{([^|]*)\}', 1) as fund_code, proctime from 
kafka_zl_etrack_event_stream
where pageId = ''
and eventId = 'click'
and btnId = '
and CHARACTER_LENGTH(user_id) > 4
) A
left join
(
select customerNumber, balance, fundCode, lastUpdateTime, proctime
  from lscsp_sc_order_all
   where `status` = '4'
 and businessType IN ('4','5','14','16','17','18')
 and fundCode IS NOT NULL
 and balance IS NOT NULL
 and lastUpdateTime IS NOT NULL
) B
on A.user_id = B.customerNumber and A.fund_code = B.fundCode
group by  A.recvTime, A.user_id, A.live_id, A.fund_code, A.fund_name, 
cast(B.balance as double)






Exception in thread "main" java.lang.AssertionError
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.getRootField(SqlToRelConverter.java:4448)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.adjustInputRef(SqlToRelConverter.java:3765)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:3737)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.access$2200(SqlToRelConverter.java:217)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4796)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.visit(SqlToRelConverter.java:4092)
at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317)
at 
org.apache.calcite.sql2rel.SqlToRelConverter$Blackboard.convertExpression(SqlToRelConverter.java:4656)
at 
org.apache.calcite.sql2rel.StandardConvertletTable.convertCast(StandardConvertletTable.java:522)
at 
org.apache.calcite.sql2rel.SqlNodeToRexConverterImpl.convertCall(SqlNodeToRexConverterImpl.java:63)

Re: Heterogeneous or Dynamic Stream Processing

2020-07-09 Thread Arvid Heise
Hi Rob,

your approach looks good, but I haven't used iterations in streams yet. If
it works for you, it can't be bad anyways.

If it is indeed working as expected, I'd recommend checking out broadcasts
to distribute the rules [1]. This pattern will allow you to dynamically add
new rules via a special input source (like Kafka topic).
Also on datastream API level, you have the option to emit side-outputs [2].
That would be a natural choice for error handling.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/broadcast_state.html
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/stream/side_output.html

On Wed, Jul 8, 2020 at 2:28 AM Rob Shepherd  wrote:

> Thank you for the excellent clarifications.
> I couldn't quite figure out how to map the above to my domain.
>
> Nevertheless i have a working demo that performs the following pseudo code:
>
> Let's say that each "channel" has slightly different stream requirements
> and we can look up the list of operations needed to be performed using a
> channel key.
> (an operation is our term for some element of processing, a FaaS call or
> local routine maybe)
>
> 1. extract channel key from incoming message
> 2. lookup channel info and enrich the stream object with channel info and
> a list of operations
> 3. i...n using the iterative stream API, loop around each operation in the
> list from (2).
> 4. sink
>
>
> https://gist.github.com/robshep/bf38b7753062e9d49d365e505e86385e#file-dynamicstramjob-java-L52
>
> I've some work to do to understand storing and retrieving state, as my
> demo just stores the loop-state in my main stream object - I don't know
> whether this is bad or bad-practice.
>
> I'd be really grateful if anyone can cast their eye on this little demo
> and see if there are any gotchas or pitfalls I'm likely to succumb to with
> this approach.
>
> With thanks for your help getting started
>
>
>
> Rob Shepherd BEng PhD
>
>
>
> On Tue, 7 Jul 2020 at 19:26, Arvid Heise  wrote:
>
>> Hi Rob,
>>
>> 1. When you start a flink application, you actually just execute a Java
>> main called the driver. This driver submits a job graph to the job manager,
>> which executes the job. Since the driver is an ordinary Java program that
>> uses the Flink API, you can compose the job graph in any way you want. Have
>> a look at one example to see what I mean [1]. It's not hard to imagine that
>> you can compose a query such as
>>
>> List extractorQueries = new ArrayList<>();
>> Table table = tableEnvironment.from("testCatalog.`default`.testTable");
>> Table errors = tableEnvironment.fromValues();
>> for (int index = 0; index < extractorQueries.size(); index++) {
>>String extractorQuery = extractorQueries.get(index);
>>table = table.addColumns($(extractorQuery).as("extractedValue" + index, 
>> "error"));
>>errors = errors.unionAll(table.filter($("error").isNotNull()));
>>table = table.filter($("error").isNull()).dropColumns($("error"));
>> }
>> // write table and errors
>>
>> This query first loads the data from a testTable and then successively
>> applies sql expressions that calculate one value + one error column each.
>> The value is stored in extractedValue0...99 (assuming 100 extractor
>> queries). All values that cause errors, will have a value in the error
>> column set. These are collected in the table "errors" for side output (very
>> useful for debugging and improving the extractor queries). All good records
>> (error IS NULL) are retained for further processing and the error column
>> gets dropped.
>>
>> Btw there is also a Python entry point available, which offers you more
>> or less the same. I just haven't tried it yet. [2]
>>
>> Lastly, currently all extractors are executed in succession. Of course,
>> it is also possible to run them independently if you have different source
>> streams. You can then later join / union them.
>>
>> 2. The downside of this driver approach is that changes in the
>> configuration are not directly reflected. However, upon restart Flink will
>> adapt the changes and recover from the last checkpoint [3] (= snapshot of
>> the current processing state, which can be done every second in your case
>> as the state is rather small). So now you just need to find a way to force
>> a restart.
>>
>> One approach is to kill it manually and start again, but that's not
>> scaling well. However, Flink's fault tolerance feature can be somewhat
>> exploited: You can have one part of your program fail on config change,
>> which will restart the whole application automatically if configured
>> correctly and thus using the latest configuration.
>>
>> [1]
>> https://github.com/apache/flink/blob/master/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/basics/StreamSQLExample.java#L77-L100
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-master/dev/table/python/python_udfs.html
>> [3]
>> 

回复:ddl es 报错

2020-07-09 Thread Evan
Hello,


这个报错,在flink 1.11 最新版本我也遇见了,跟你同样的操作
真正原因是这个ddl 是flink 的sink table,是数据写入端,不能打印数据。
而tableEnv.toRetractStream(table, Row.class).print();
这个打印的数据方法只适合flink 的Source Table,也就是数据输入端,比如kafka table就可以正常使用。


2020年7月9日15:31:56




--原始邮件--
发件人:"出发"<573693...@qq.com;
发送时间:2020年3月23日(星期一) 晚上11:30
收件人:"user-zh"http://localhost:9200 
connector.index=buy_cnt_per_hour connector.type=elasticsearch 
connector.version=6 format.type=json schema.0.data-type=BIGINT 
schema.0.name=hour_of_day schema.1.data-type=BIGINT schema.1.name=buy_cnt 
update-mode=append

Re: kafka connector问题

2020-07-09 Thread Benchao Li
首先,从checkpoint/savepoint
恢复的话,一定会以checkpoint/savepoint中的offset为准,所以它的优先级是最高的,
不管你配置哪种startup mode。
如果你没有开启checkpoint,那么如果你用了group-offsets,那它就会从保存在kafka中的offset进行启动。
提交offset到kafka这个应该是默认就开了的。

op <520075...@qq.com> 于2020年7月9日周四 上午11:25写道:

> 官网给的kafka table配置里的scan.startup.modeCREATE TABLE kafkaTable (
> user_id BIGINT,  item_id BIGINT,  category_id BIGINT,  behavior STRING,  ts
> TIMESTAMP(3) ) WITH (  'connector' = 'kafka',  'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = 'localhost:9092',  'properties.group.id'
> = 'testGroup',  'format' = 'csv',  'scan.startup.mode' = 'earliest-offset'
> )看了总共有以下几总'earliest-offset','latest-offset','group-offsets','timestamp'and'specific-offsets'如果我作业重启的话选择group-offsets能否从上次消费到的位置开始?这种情况下需要配置提交offset到kafka
> broker相关的东西吗?有没有从savepoint保存的offset继续消费的配置?



-- 

Best,
Benchao Li


??????DataStream????uv????

2020-07-09 Thread Yichao Yang
Hi,


??uv??uv[1]??


[1]https://lists.apache.org/thread.html/rbe00ee38e2d07310d4e3c796de86c65205d1f5deecfc1678d9ebbdea%40%3Cuser-zh.flink.apache.org%3E




----
??:"?g???U?["

Re: 回复:flink时间窗口

2020-07-09 Thread Congxian Qiu
对于 window 来说,你需要判断下是没有数据进来,还是有数据进来但是 window 没有触发。
如果是数据没有进来,那么需要看 window 节点之前的逻辑,如果是数据进来了,但是没有触发,需要看下 wateramrk 是不是符合预期

Best,
Congxian


爱吃鱼  于2020年7月9日周四 下午1:42写道:

>
>
>
> Hi,
>
>
>
>
>
>
> 因为业务原因具体的keyby字段没有写清楚,我是根据warningPojo类里面的字段进行排序,源数据
> 是从kafka实时流传输过来的,每一分钟滑动窗口计算一次
>
>
> SingleOutputStreamOperator> operator1 =
> env.addSource(stringFlinkKafkaConsumerBase)
> .filter((String s) -> (s.split(",", -1).length == 34))
> .flatMap(new RichFlatMapFunction() {
> .keyBy("src", "msg")
> .timeWindow(Time.minutes(1))
> .process(new ProcessWindowFunction Tuple2, Tuple, TimeWindow>()
> .setParallelism(1);
>
>
>
>
> 每次执行这段流代码就只有第一次的一分钟时间窗口有数据传输到es,之后就没有数据了。
>
>
>
>
>
> 在 2020-07-09 13:09:32,"Yichao Yang" <1048262...@qq.com> 写道:
> >Hi,
> >
> >
> >根据你的keyby字段来看,你是根据 warningPojo + String 进行了keyby,可以看下是否相同的key只有一条相同数据。
> >并且可以看下使用到的是处理时间还是事件时间?
> >如果是事件时间,可以看下 timestamp assigner 是否正确,上游数据和时间戳是否符合预期。
> >
> >
> >Best,
> >Yichao Yang
> >
> >
> >
> >
> >--原始邮件--
> >发件人:"爱吃鱼" >发送时间:2020年7月9日(星期四) 中午11:37
> >收件人:"user-zh" >
> >主题:flink时间窗口
> >
> >
> >
> >你好,我最近业务上需要处理一个流式的时间处理窗口,部分业务代码如下
> >SingleOutputStreamOperator flatMap.keyBy(0,1)
> >
> .timeWindow(Time.minutes(1))
> >
> .process(new ProcessWindowFunction)
> >当我运行的时候只有第一分钟的时间窗口会有数据进来,之后便没有数据进来了,业务逻辑代码也没有报错,请问这是什么原因。
>