flink connector kafka,单条 msg 为 7kb,poll 较慢

2023-02-12 文章 xuhaiLong


hi all


最近遇到一个问题,flink消费 kafka,kafka单条 msg 大概在 7kb(json 
格式,value值大),流量较大,很快会产生堆积。处理逻辑已经过调优,目前的瓶颈是在 poll 
消息的时候,1w条消息(总共70mb)大概要3s才能拉取结束,已经尝试调节过consumer配置,没有较好的性能提升,请问有较好解决的方案吗?
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");

props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10485760"); //10m
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");




| |
xuhaiLong
|
|
xiagu...@163.com
|

回复: 如何监控kafka延迟

2021-07-28 文章 xuhaiLong
参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。


在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道:
Hi comsir

kafka的控制台能力比较弱,想知道延迟只能自己维护。

维护方式:

1. 每个服务的topic的offset 减去 groupid的offset

2. 尽量可以计算出各种消费速度

3. rocketmq控制台,可看到消费进度,可以参照下。


在 2021/7/28 上午11:02, 龙逸尘 写道:
Hi comsir,
采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。
group  id 需要自己维护。

comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道:

hi all
以kafka为source的flink任务,各位都是如何监控kafka的延迟情况??
监控这个延迟的目的:1.大盘展示,2.延迟后报警
小问题:
1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标?
2.怎么获取groupId呢,多个group消费的话,如何区分呀?
3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗?
4.有比较优雅的实现方式吗?
非常感谢 期待解答 感谢感谢


回复:将每个tm的slot数从2降低到1,任务反而无法启动

2021-03-10 文章 xuhaiLong
hi


flink 1 slot != 1 core
可以看下 yarn.containers.vcores 这个参数设置为多少。
如果该值为1,tm slot为2,那么每启动一个tm容器就会占用1core,但是每个tm 会有两个slot,反之,如果该值为1,每个tm slot 
也为1,就会需要max parallelism core 数量。




在2021年3月11日 14:34,lzwang 写道:
您好:




任务的拓扑图如下,parallelism的设置是140,但是中间有个操作的并行度设置成了50。
集群剩余的slot总数是195。




如果将每个tm的slot数设置为2,任务能够正常启动,并且分配了70个tm和140个slot,符合预期。




可如果将每个tm的slot数设置为1,便只分配了115个slot。任务会卡在creating状态,并且几分钟后,会抛出异常,“Could not 
allocate all requires slots within timeout of 30 ms. Slots required: 470, 
slots allocated: 388”




这里面有几个问题:
1. 将slot数设置为1后,异常中提示“Slots required: 470”,这个470似乎完全没有考虑slot 
share(我们并没有手动设置SlotSharingGroup)。这是为啥?
2. 将slot数设置为1后,异常中提示“slots allocated: 388”,而整个集群剩余的slot其实只有195个,这个388怎么来的?
3. 最大的并行度应是140,为何只分配了115个slot呢?




我们使用的flink版本是1.6.2。


期待你们的回复~


---
科大讯飞股份有限公司
AI营销平台  汪李之
Tel:15209882175
QQ/WeChat:992424538
Add:安徽省合肥市望江西路666号



flink sql tumble window 时区问题

2021-02-24 文章 xuhaiLong
hi all:

使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。


使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 
后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。


代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/





Re: Flink 1.10 table setIdleStateRetentionTime

2021-01-26 文章 xuhaiLong
抱歉..弄错了


On 1/27/2021 11:39,xuhaiLong wrote:


hi,


Flink 1.10.1 on 
yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), 
Time.days(181))` 修改为 
`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), 
Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 
异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?

Flink 1.10 table setIdleStateRetentionTime

2021-01-26 文章 xuhaiLong


hi,


Flink 1.10.1 on 
yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), 
Time.days(181))` 修改为 
`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), 
Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 
异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?

回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 
userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试?


在2021年1月21日 18:24,张锴 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1






回复: 根据业务需求选择合适的flink state

2021-01-21 文章 xuhaiLong
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 
的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的
在2021年1月21日 18:24,张锴 写道:
你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和
context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。
下面是我的部分代码逻辑:

val ds = dataStream
.filter(_.liveType == 1)
.keyBy(1, 2)
.window(EventTimeSessionWindows.withGap(Time.minutes(1)))
.process(new myProcessWindow()).uid("process-id")

class myProcessWindow() extends
ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple,
TimeWindow] {

override def process(key: Tuple, context: Context, elements:
Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit
= {
var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间
var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间

val currentDate = DateUtil.currentDate
val created_time = currentDate
val modified_time = currentDate
。。。

val join_time: String =
DateUtil.convertTimeStamp2DateStr(startTime,
DateUtil.SECOND_DATE_FORMAT)
val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime,
DateUtil.SECOND_DATE_FORMAT)
val duration = (endTime - startTime) / 1000  //停留多少秒
val duration_time = DateUtil.secondsToFormat(duration)  //停留时分秒
out.collect(CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime))

CloudliveWatcher(id, partnerId, courseId, customerId,
courseNumber, nickName, ip, device_type, net_opretor, net_type, area,
join_time, leave_time, created_time, modified_time
, liveType, plat_form, duration, duration_time,
network_operator, role, useragent, uid, eventTime)

}


这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中?




赵一旦  于2020年12月28日周一 下午7:12写道:

按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。

或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。

session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。


张锴  于2020年12月28日周一 下午5:35写道:

能描述一下用session window的考虑吗

Akisaya  于2020年12月28日周一 下午5:00写道:

这个可以用 session window 吧



https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows

news_...@163.com  于2020年12月28日周一 下午2:15写道:

这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。



news_...@163.com

发件人: 张锴
发送时间: 2020-12-28 13:35
收件人: user-zh
主题: 根据业务需求选择合适的flink state
各位大佬帮我分析下如下需求应该怎么写

需求说明:

公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A




在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。

我的想法:
我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数
减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。

不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。

flink 版本1.10.1






回复:flink版本升级问题咨询

2021-01-12 文章 xuhaiLong
描述的不太对,具体可以参考下这个 
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html
在2021年1月11日 20:12,xuhaiLong 写道:
我试过 flink 1.7 升级到 1.10。如果使用到了 table api 涉及 group By 的话 
savePoint恢复有问题,其他没发现过什么问题。或者可以使用下 process api 写一份数据,启动 job


在2021年1月7日 09:50,zhang hao 写道:
目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn
基于flink
session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?


回复:flink版本升级问题咨询

2021-01-11 文章 xuhaiLong
我试过 flink 1.7 升级到 1.10。如果使用到了 table api 涉及 group By 的话 
savePoint恢复有问题,其他没发现过什么问题。或者可以使用下 process api 写一份数据,启动 job


在2021年1月7日 09:50,zhang hao 写道:
目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn
基于flink
session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?


回复: 回复: re:Re: 回复:一个关于实时合并数据的问题

2020-12-06 文章 xuhaiLong
这个我也不太清楚,没有做过对应的是测试。


@吴磊 想到一个问题,如果 process 中使用了 agg state,keyBy(userId % 10) 后会有问题吧?使用 mapState 做 
agg 操作? 
在2020年12月6日 20:30,赵一旦 写道:
所以说,ckpt的性能/时间和key的数量有关对吗?即使总体数据量不变,key少些,每个key的状态变大,会降低ckpt时间?
按照你们的分析来看??

bradyMk  于2020年12月4日周五 下午7:23写道:

对对对,可以取hashCode,我短路了,谢谢哈~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/



回复: re:Re: 回复:一个关于实时合并数据的问题

2020-12-04 文章 xuhaiLong
id 是字符串 走哈希取余试试看?


在2020年12月4日 18:12,502347601<502347...@qq.com> 写道:
hello~不能按照keyId来keyby,这样state的个数也就10亿个了,checkpoint会有性能问题。你可以先求余一下,比如求余分成10组。类似这样keyid%10。


-- Original Message --
From: "bradyMk";
Date: 2020-12-04 18:05
To: "user-zh";
Subject: Re: re:Re: 回复:一个关于实时合并数据的问题



所以您说的这个思路应该是和我上面说的是一样的了吧,根据10亿id做keyby,不会有什么问题么?
- Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink 1.10 on Yarn

2020-09-11 文章 xuhaiLong
 @Congxian Qiu Sorry,刚看到。

之前使用的 flink 1.7,没有出现过这个问题。升级到 flink 1.10 后这个问题必现,但是时间不定。


On 8/9/2020 15:00,Congxian Qiu wrote:
Hi xuhaiLong
请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗?
Best,
Congxian


xuhaiLong  于2020年8月7日周五 下午3:14写道:

感谢回复!我这边的确是这个bug 引起的


On 8/7/2020 13:43,chenkaibit wrote:
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error
has occurred. This can mean two things: either the job requires a larger
size of JVM metaspace to load classes or there is a class loading leak. In
the first case 'taskmanager.memory.jvm-metaspace.size' configuration option
should be increased. If the error persists (usually in cluster after
several job (re-)submissions) then there is probably a class loading leak
which has to be investigated and fixed. The task executor has to be
shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载



回复:手动修改CK状态

2020-09-09 文章 xuhaiLong
可以参考下这个 stateProcess Api
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html


在2020年9月9日 11:41,Shuai Xia 写道:
Hi,各位大佬,如果我想手动读取修改CK存储的状态内容,可以使用什么办法,我记得之前有工具类可以支持



Flink 1.10.1 on Yarn

2020-08-20 文章 xuhaiLong


Hi


datastream 转为 table。使用 `JDBCOutputFormat.buildJDBCOutputFormat()` 输出到 
mysql,出现这个[1]异常 任务 failover, 
2点58分开始,1小时一次。导致 任务出现 [2] 异常,metaspace 为 256M,猜测是由于启动过于频繁 classLoder 为同一个引起的。


期望解答:
关于[1] 异常,是什么原因引起的?有没有什么合适的解决方案。flink 1.10 有没有其他输出在 mysql 的 connector?
关于[2]异常,这个问题是我猜测的原因吗?flink 有没有对这个的解决方案


[1] java.lang.RuntimeException: Execution of JDBC statement failed.
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:102)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:93)
at 
org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:40)
at 
org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87)
at 
org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53)
at 
com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:540)
at 
com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:451)
at 
org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
at 

Re: Flink 1.10 on Yarn

2020-08-07 文章 xuhaiLong
感谢回复!我这边的确是这个bug 引起的


On 8/7/2020 13:43,chenkaibit wrote:
hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。
你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint 
nullpointer,可以把jdk升级下版本试一下
https://issues.apache.org/jira/browse/FLINK-18196
https://issues.apache.org/jira/browse/FLINK-17479




在 2020-08-07 12:50:23,"xuhaiLong"  写道:

sorry,我添加错附件了


是的,taskmanager.memory.jvm-metaspace.size 为默认配置
On 8/7/2020 11:43,Yangze Guo wrote:
日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么?

Best,
Yangze Guo

On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong  wrote:



Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载


Flink 1.10 on Yarn

2020-08-06 文章 xuhaiLong


Hi


场景:1 tm 三个slot,run了三个job


三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 
`java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has 
occurred. This can mean two things: either the job requires a larger size of 
JVM metaspace to load classes or there is a class loading leak. In the first 
case 'taskmanager.memory.jvm-metaspace.size' configuration option should be 
increased. If the error persists (usually in cluster after several job 
(re-)submissions) then there is probably a class loading leak which has to be 
investigated and fixed. The task executor has to be shutdown...
`


附件为部分异常信息


疑问:
1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题)
2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启?


感谢~~~
从网易邮箱大师发来的云附件
08-07error.txt(730.4KB,2020年8月22日 11:37 到期)
下载

Re: Position out of bounds.

2020-07-02 文章 xuhaiLong
感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


On 7/2/2020 18:39,夏帅 wrote:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
if (this.position >= this.buffer.length) {
resize(1);
}
this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.


flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



回复:Position out of bounds.

2020-07-02 文章 xuhaiLong
感谢
没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案?


| |
夏*
|
|
邮箱:xiagu...@163.com
|

签名由 网易邮箱大师 定制

在2020年07月02日 18:39,夏帅 写道:
你好,请问解决了么,我看了下源码,好像是一个bug
DataOutputSerializer

@Override
public void write(int b) throws IOException {
  if (this.position >= this.buffer.length) {
 resize(1);
  }
  this.buffer[this.position++] = (byte) (b & 0xff);
}
此处position应该自增


--
发件人:xuhaiLong 
发送时间:2020年7月2日(星期四) 17:46
收件人:flink 中文社区 
主 题:Position out of bounds.

 
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
https://www.helloimg.com/image/Pe1QR
程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5



Position out of bounds.

2020-07-02 文章 xuhaiLong
flink 1.10  onYarn
job 中 有一个MapState[Long,Bean]
 https://www.helloimg.com/image/Pe1QR
 程序启动一段时间(20分钟)后出现了 附件中的异常
查看对应源码也没看懂是什么引起的异常
https://www.helloimg.com/image/Peqc5

2020-07-02 17:06:19,409 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d847db42ed1d92ac373f9ccf27b846f0) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,410 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (3/3) (ca825ba9712eb520ff6de6b0f9de4dc1) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,426 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (3/3) (b05f5b66fd4c65a9032bb0140a4ce3d1) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,427 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,472 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- groupBy: 
(userId, sourceUuid, categoryId, gender), select: (userId, sourceUuid, 
categoryId, gender, MAX(siteId) AS siteId, MAX(score) AS score) -> select: 
(userId, categoryId, gender, score, siteId) (1/3) 
(f8f49357b9121d97816d5f83569cd6ac) switched from DEPLOYING to RUNNING.
2020-07-02 17:06:19,472 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (37befed4aefab35588e5f6d4c372b8c4) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,492 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (2de7f2a3809e8d7e97197cbc0f7c8b4b) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:19,498 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (1/3) (1fe6456a019617839a573d55b1194541) switched from DEPLOYING 
to RUNNING.
2020-07-02 17:06:52,263 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> 
Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from RUNNING to 
FAILED on 
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@61d8240a.
java.lang.IllegalArgumentException: Position out of bounds.
at 
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139)
at 
org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:368)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:189)
at 
org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(RocksDBSerializedCompositeKeyBuilder.java:144)
at 
org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:149)
at 
org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:120)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:61)
at 
org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:60)
at 
org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:93)
at 
org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
at 
com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:272)
at 
com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:237)
at 
org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151)
at 
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
   

Blink

2020-06-29 文章 xuhaiLong


hello,请教下


  "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
  "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided",
  "org.apache.flink" % "flink-table" % "1.10.1" % "provided",


我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常
`Could not instantiate the executor. Makesure a planner module is on the 
classpath`


而我添加上这个依赖
`"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了


但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗?


猜测是我没有正确引入 blink  ?

Re: Flink DataStream

2020-06-23 文章 xuhaiLong
是我的问题,引用了old planner。感谢!


On 6/23/2020 21:05,LakeShen wrote:
Hi xuhaiLong,

看你的依赖,应该用的 old planner,你要使用 blink planner 才能使用row_number 函数。要使用
flink-table-planner-blink_2.11
具体文档参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/#table-program-dependencies

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午8:14写道:

"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",
看下粘贴的 sbt 依赖
On 6/23/2020 20:06,Jark Wu wrote:
图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。

On Tue, 23 Jun 2020 at 19:59, xuhaiLong  wrote:

使用的是1.10.1,在 table api 无法使用ROW_NUMBER
On 6/23/2020 19:52,Jark Wu  wrote:

Hi xuhaiLong,

1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old
planner 呢?

Best,
Jark

On Tue, 23 Jun 2020 at 19:44, LakeShen  wrote:

Hi xuhaiLong,

看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午7:18写道:

Hi

请教一个问题


我需要对一个类似这样的数据进行计算获取用户 categoryId
| userId | articleID | categoryId | score |
| 01 | A | 1 | 10 |
| 01 | B | 1 | 20 |
| 01 | C | 2 | 30 |




目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合
再通过状态做TopN排序,有没有其他更好的方案来实现?


我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API

还有其他方法实现吗?





感谢!!









Re: Flink DataStream

2020-06-23 文章 xuhaiLong
"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1",
"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",
 看下粘贴的 sbt 依赖
On 6/23/2020 20:06,Jark Wu wrote:
图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。

On Tue, 23 Jun 2020 at 19:59, xuhaiLong  wrote:

使用的是1.10.1,在 table api 无法使用ROW_NUMBER
On 6/23/2020 19:52,Jark Wu  wrote:

Hi xuhaiLong,

1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old
planner 呢?

Best,
Jark

On Tue, 23 Jun 2020 at 19:44, LakeShen  wrote:

Hi xuhaiLong,

看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午7:18写道:

Hi

请教一个问题


我需要对一个类似这样的数据进行计算获取用户 categoryId
| userId | articleID | categoryId | score |
| 01 | A | 1 | 10 |
| 01 | B | 1 | 20 |
| 01 | C | 2 | 30 |




目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合
再通过状态做TopN排序,有没有其他更好的方案来实现?


我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API

还有其他方法实现吗?





感谢!!








Re: Flink DataStream

2020-06-23 文章 xuhaiLong
使用的是1.10.1,在 table api 无法使用ROW_NUMBER
On 6/23/2020 19:52,Jark Wu wrote:
Hi xuhaiLong,

1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old
planner 呢?

Best,
Jark

On Tue, 23 Jun 2020 at 19:44, LakeShen  wrote:

Hi xuhaiLong,

看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。

Best,
LakeShen

xuhaiLong  于2020年6月23日周二 下午7:18写道:

Hi

请教一个问题


我需要对一个类似这样的数据进行计算获取用户 categoryId
| userId | articleID | categoryId | score |
| 01 | A | 1 | 10 |
| 01 | B | 1 | 20 |
| 01 | C | 2 | 30 |




目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合
再通过状态做TopN排序,有没有其他更好的方案来实现?


我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API
还有其他方法实现吗?




感谢!!







Flink DataStream

2020-06-23 文章 xuhaiLong
Hi

请教一个问题


我需要对一个类似这样的数据进行计算获取用户 categoryId
| userId | articleID | categoryId | score |
| 01 | A | 1 | 10 |
| 01 | B | 1 | 20 |
| 01 | C | 2 | 30 |




目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现?


我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗?




感谢!!