flink connector kafka,单条 msg 为 7kb,poll 较慢
hi all 最近遇到一个问题,flink消费 kafka,kafka单条 msg 大概在 7kb(json 格式,value值大),流量较大,很快会产生堆积。处理逻辑已经过调优,目前的瓶颈是在 poll 消息的时候,1w条消息(总共70mb)大概要3s才能拉取结束,已经尝试调节过consumer配置,没有较好的性能提升,请问有较好解决的方案吗? props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "10485760"); //10m props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "1000"); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000"); | | xuhaiLong | | xiagu...@163.com |
回复: 如何监控kafka延迟
参考下kafka_exporter,获取所有的 group 的消费情况,然后配置不同的规则去监控。 在2021年7月28日 17:39,laohu<2372554...@qq.com.INVALID> 写道: Hi comsir kafka的控制台能力比较弱,想知道延迟只能自己维护。 维护方式: 1. 每个服务的topic的offset 减去 groupid的offset 2. 尽量可以计算出各种消费速度 3. rocketmq控制台,可看到消费进度,可以参照下。 在 2021/7/28 上午11:02, 龙逸尘 写道: Hi comsir, 采用 kafka 集群元数据 的 offset 信息和当前 group offset 相减得到的 lag 是比较准确的。 group id 需要自己维护。 comsir <609326...@qq.com.invalid> 于2021年7月20日周二 下午12:41写道: hi all 以kafka为source的flink任务,各位都是如何监控kafka的延迟情况?? 监控这个延迟的目的:1.大盘展示,2.延迟后报警 小问题: 1.发现flink原生的相关metric指标很多,研究后都不是太准确,大家都用哪个指标? 2.怎么获取groupId呢,多个group消费的话,如何区分呀? 3.能通过kafka集群侧的元数据,和当前offset做减法,计算lag吗? 4.有比较优雅的实现方式吗? 非常感谢 期待解答 感谢感谢
回复:将每个tm的slot数从2降低到1,任务反而无法启动
hi flink 1 slot != 1 core 可以看下 yarn.containers.vcores 这个参数设置为多少。 如果该值为1,tm slot为2,那么每启动一个tm容器就会占用1core,但是每个tm 会有两个slot,反之,如果该值为1,每个tm slot 也为1,就会需要max parallelism core 数量。 在2021年3月11日 14:34,lzwang 写道: 您好: 任务的拓扑图如下,parallelism的设置是140,但是中间有个操作的并行度设置成了50。 集群剩余的slot总数是195。 如果将每个tm的slot数设置为2,任务能够正常启动,并且分配了70个tm和140个slot,符合预期。 可如果将每个tm的slot数设置为1,便只分配了115个slot。任务会卡在creating状态,并且几分钟后,会抛出异常,“Could not allocate all requires slots within timeout of 30 ms. Slots required: 470, slots allocated: 388” 这里面有几个问题: 1. 将slot数设置为1后,异常中提示“Slots required: 470”,这个470似乎完全没有考虑slot share(我们并没有手动设置SlotSharingGroup)。这是为啥? 2. 将slot数设置为1后,异常中提示“slots allocated: 388”,而整个集群剩余的slot其实只有195个,这个388怎么来的? 3. 最大的并行度应是140,为何只分配了115个slot呢? 我们使用的flink版本是1.6.2。 期待你们的回复~ --- 科大讯飞股份有限公司 AI营销平台 汪李之 Tel:15209882175 QQ/WeChat:992424538 Add:安徽省合肥市望江西路666号
flink sql tumble window 时区问题
hi all: 使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。 使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。 代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/
Re: Flink 1.10 table setIdleStateRetentionTime
抱歉..弄错了 On 1/27/2021 11:39,xuhaiLong wrote: hi, Flink 1.10.1 on yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), Time.days(181))` 修改为 `tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?
Flink 1.10 table setIdleStateRetentionTime
hi, Flink 1.10.1 on yarn,测试发现,`tableEnv.getConfig.setIdleStateRetentionTime(Time.days(180), Time.days(181))` 修改为 `tableEnv.getConfig.setIdleStateRetentionTime(Time.days(30), Time.days(31))`。在savePoInt中恢复 job 启动正常,运行的时候 在webUI Exception 中有连接 kafka 异常,Timeout expired while fetching topic metadata。不太明白是什么原因产生这个?是由于maxTime 问题吗?
回复: 根据业务需求选择合适的flink state
可以试试这样,mysql 中 设置唯一键为窗口的 startTime 和 userId,然后对用户的每个窗口做停留时间的计算,最终会同一个用户在一天会产生多条记录,不过窗口的 startTime 不同,取值的时候sum 试试? 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minutes(1))) .process(new myProcessWindow()).uid("process-id") class myProcessWindow() extends ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit = { var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 val currentDate = DateUtil.currentDate val created_time = currentDate val modified_time = currentDate 。。。 val join_time: String = DateUtil.convertTimeStamp2DateStr(startTime, DateUtil.SECOND_DATE_FORMAT) val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, DateUtil.SECOND_DATE_FORMAT) val duration = (endTime - startTime) / 1000 //停留多少秒 val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? 赵一旦 于2020年12月28日周一 下午7:12写道: 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 张锴 于2020年12月28日周一 下午5:35写道: 能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: 这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 news_...@163.com 发件人: 张锴 发送时间: 2020-12-28 13:35 收件人: user-zh 主题: 根据业务需求选择合适的flink state 各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1
回复: 根据业务需求选择合适的flink state
Hi, 看了下你的代码,用session window 时长为1分钟,表示的是user1 的窗口在1分钟内没收到数据就进行一个触发计算,所以最终得到的结果应该是需要你把 user1 产生的每条记录的时长做一个sum,如果只看单条维度是不全的 在2021年1月21日 18:24,张锴 写道: 你好,之前我用了你上诉的方法出现一个问题,我并没有用min/max,我是在procss方法里用的context.window.getStart和 context.window.getEnd作为开始和结束时间的,感觉这样也能获得最大和最小值,但是出来的数据最长停留了4分多钟,我跑的离线任务停留的时长有几个小时的都有,感觉出来的效果有问题。 下面是我的部分代码逻辑: val ds = dataStream .filter(_.liveType == 1) .keyBy(1, 2) .window(EventTimeSessionWindows.withGap(Time.minutes(1))) .process(new myProcessWindow()).uid("process-id") class myProcessWindow() extends ProcessWindowFunction[CloudLiveLogOnLine, CloudliveWatcher, Tuple, TimeWindow] { override def process(key: Tuple, context: Context, elements: Iterable[CloudLiveLogOnLine], out: Collector[CloudliveWatcher]): Unit = { var startTime = context.window.getStart //定义第一个元素进入窗口的开始时间 var endTime = context.window.getEnd //定义最后一个元素进入窗口的时间 val currentDate = DateUtil.currentDate val created_time = currentDate val modified_time = currentDate 。。。 val join_time: String = DateUtil.convertTimeStamp2DateStr(startTime, DateUtil.SECOND_DATE_FORMAT) val leave_time:String = DateUtil.convertTimeStamp2DateStr(endTime, DateUtil.SECOND_DATE_FORMAT) val duration = (endTime - startTime) / 1000 //停留多少秒 val duration_time = DateUtil.secondsToFormat(duration) //停留时分秒 out.collect(CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime)) CloudliveWatcher(id, partnerId, courseId, customerId, courseNumber, nickName, ip, device_type, net_opretor, net_type, area, join_time, leave_time, created_time, modified_time , liveType, plat_form, duration, duration_time, network_operator, role, useragent, uid, eventTime) } 这样写是否合适,如果要用min/max应该如何代入上诉逻辑当中? 赵一旦 于2020年12月28日周一 下午7:12写道: 按直播间ID和用户ID分组,使用session window,使用1min作为gap,统计key+window内的count即可,即sum(1)。 或者感觉你打点实际不一定肯定是1min、2min这种整时间点,可以统计key+window内的min/max,然后输出的时候做个减法。 session window的作用就是根据连续2个元素的gap决定是否将2个元素放入同一个window中。 张锴 于2020年12月28日周一 下午5:35写道: 能描述一下用session window的考虑吗 Akisaya 于2020年12月28日周一 下午5:00写道: 这个可以用 session window 吧 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows news_...@163.com 于2020年12月28日周一 下午2:15写道: 这么做的前提是每条记录是顺序进入KAFKA的才行,但真实场景不能保证这条,有可能较早的打点记录却较晚进入kafka队列。 news_...@163.com 发件人: 张锴 发送时间: 2020-12-28 13:35 收件人: user-zh 主题: 根据业务需求选择合适的flink state 各位大佬帮我分析下如下需求应该怎么写 需求说明: 公司是做直播业务方面的,现在需要实时统计用户的在线时长,来源kafka,每分钟产生直播打点记录,数据中包含eventTime字段。举个例子,用户A 在1,2,3分钟一直产生打点记录,那么他的停留时长就是3分钟,第5,6分钟又来了,那他的停留时长就是2分钟,只要是连续的打点记录就合成一条记录,每个直播间每个用户同理。 我的想法: 我现在的想法是按直播间ID和用户ID分组,然后process,想通过state方式来做,通过截取每条记录的event Time中的分钟数 减去上一条的分钟数,如果他们差值等于1,说明是连续关系,则继续。如果不为1,说明断开了,直接输出这条记录,同时情况当前key的状态。 不知道各位大佬对我的这个想法有什么建议,或者说有更好的处理方式。 flink 版本1.10.1
回复:flink版本升级问题咨询
描述的不太对,具体可以参考下这个 https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/libs/state_processor_api.html 在2021年1月11日 20:12,xuhaiLong 写道: 我试过 flink 1.7 升级到 1.10。如果使用到了 table api 涉及 group By 的话 savePoint恢复有问题,其他没发现过什么问题。或者可以使用下 process api 写一份数据,启动 job 在2021年1月7日 09:50,zhang hao 写道: 目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn 基于flink session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?
回复:flink版本升级问题咨询
我试过 flink 1.7 升级到 1.10。如果使用到了 table api 涉及 group By 的话 savePoint恢复有问题,其他没发现过什么问题。或者可以使用下 process api 写一份数据,启动 job 在2021年1月7日 09:50,zhang hao 写道: 目前现状:公司flink任务都是跑在flin1.7当中,通过公司开发的流计算平台进行提交到flink yarn集群,flink on yarn 基于flink session进行部署,运行任务有接近500个流式任务,许多都是有状态应用,现在如果想把flink集群升级到1.11或者1.12,如何平滑的进行版本升级,而不影响现有的任务?
回复: 回复: re:Re: 回复:一个关于实时合并数据的问题
这个我也不太清楚,没有做过对应的是测试。 @吴磊 想到一个问题,如果 process 中使用了 agg state,keyBy(userId % 10) 后会有问题吧?使用 mapState 做 agg 操作? 在2020年12月6日 20:30,赵一旦 写道: 所以说,ckpt的性能/时间和key的数量有关对吗?即使总体数据量不变,key少些,每个key的状态变大,会降低ckpt时间? 按照你们的分析来看?? bradyMk 于2020年12月4日周五 下午7:23写道: 对对对,可以取hashCode,我短路了,谢谢哈~ - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
回复: re:Re: 回复:一个关于实时合并数据的问题
id 是字符串 走哈希取余试试看? 在2020年12月4日 18:12,502347601<502347...@qq.com> 写道: hello~不能按照keyId来keyby,这样state的个数也就10亿个了,checkpoint会有性能问题。你可以先求余一下,比如求余分成10组。类似这样keyid%10。 -- Original Message -- From: "bradyMk"; Date: 2020-12-04 18:05 To: "user-zh"; Subject: Re: re:Re: 回复:一个关于实时合并数据的问题 所以您说的这个思路应该是和我上面说的是一样的了吧,根据10亿id做keyby,不会有什么问题么? - Best Wishes -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: Flink 1.10 on Yarn
@Congxian Qiu Sorry,刚看到。 之前使用的 flink 1.7,没有出现过这个问题。升级到 flink 1.10 后这个问题必现,但是时间不定。 On 8/9/2020 15:00,Congxian Qiu wrote: Hi xuhaiLong 请问你这个作业在这个版本是是必然出现 NPE 问题吗?另外 1.10 之前的版本有出现过这个问题吗? Best, Congxian xuhaiLong 于2020年8月7日周五 下午3:14写道: 感谢回复!我这边的确是这个bug 引起的 On 8/7/2020 13:43,chenkaibit wrote: hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。 你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint nullpointer,可以把jdk升级下版本试一下 https://issues.apache.org/jira/browse/FLINK-18196 https://issues.apache.org/jira/browse/FLINK-17479 在 2020-08-07 12:50:23,"xuhaiLong" 写道: sorry,我添加错附件了 是的,taskmanager.memory.jvm-metaspace.size 为默认配置 On 8/7/2020 11:43,Yangze Guo wrote: 日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么? Best, Yangze Guo On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong wrote: Hi 场景:1 tm 三个slot,run了三个job 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown... ` 附件为部分异常信息 疑问: 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题) 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启? 感谢~~~ 从网易邮箱大师发来的云附件 08-07error.txt(730.4KB,2020年8月22日 11:37 到期) 下载
回复:手动修改CK状态
可以参考下这个 stateProcess Api https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/libs/state_processor_api.html 在2020年9月9日 11:41,Shuai Xia 写道: Hi,各位大佬,如果我想手动读取修改CK存储的状态内容,可以使用什么办法,我记得之前有工具类可以支持
Flink 1.10.1 on Yarn
Hi datastream 转为 table。使用 `JDBCOutputFormat.buildJDBCOutputFormat()` 输出到 mysql,出现这个[1]异常 任务 failover, 2点58分开始,1小时一次。导致 任务出现 [2] 异常,metaspace 为 256M,猜测是由于启动过于频繁 classLoder 为同一个引起的。 期望解答: 关于[1] 异常,是什么原因引起的?有没有什么合适的解决方案。flink 1.10 有没有其他输出在 mysql 的 connector? 关于[2]异常,这个问题是我猜测的原因吗?flink 有没有对这个的解决方案 [1] java.lang.RuntimeException: Execution of JDBC statement failed. at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.flush(JDBCOutputFormat.java:102) at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:93) at org.apache.flink.api.java.io.jdbc.JDBCOutputFormat.writeRecord(JDBCOutputFormat.java:40) at org.apache.flink.streaming.api.functions.sink.OutputFormatSinkFunction.invoke(OutputFormatSinkFunction.java:87) at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:53) at com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:540) at com.netease.wm.trace.RecTraceV2$JdbcEnrichProcessFunction.processElement(RecTraceV2.scala:451) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at
Re: Flink 1.10 on Yarn
感谢回复!我这边的确是这个bug 引起的 On 8/7/2020 13:43,chenkaibit wrote: hi xuhaiLong,看日志发生的 checkpoint nullpointer 是个已知的问题,具体可以查看下面两个jira。 你用的jdk版本是多少呢?目前发现使用 jdk8_40/jdk8_60 + flink-1.10 会出现 checkpoint nullpointer,可以把jdk升级下版本试一下 https://issues.apache.org/jira/browse/FLINK-18196 https://issues.apache.org/jira/browse/FLINK-17479 在 2020-08-07 12:50:23,"xuhaiLong" 写道: sorry,我添加错附件了 是的,taskmanager.memory.jvm-metaspace.size 为默认配置 On 8/7/2020 11:43,Yangze Guo wrote: 日志没有贴成功,taskmanager.memory.jvm-metaspace.size目前是默认配置么? Best, Yangze Guo On Fri, Aug 7, 2020 at 11:38 AM xuhaiLong wrote: Hi 场景:1 tm 三个slot,run了三个job 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown... ` 附件为部分异常信息 疑问: 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题) 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启? 感谢~~~ 从网易邮箱大师发来的云附件 08-07error.txt(730.4KB,2020年8月22日 11:37 到期) 下载
Flink 1.10 on Yarn
Hi 场景:1 tm 三个slot,run了三个job 三个job 运行的时候 出现了 ck 过程中空指针异常,导致任务一致重启。最终导致`Metaspace` 空间占满,出现 `java.lang.OutOfMemoryError: Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case 'taskmanager.memory.jvm-metaspace.size' configuration option should be increased. If the error persists (usually in cluster after several job (re-)submissions) then there is probably a class loading leak which has to be investigated and fixed. The task executor has to be shutdown... ` 附件为部分异常信息 疑问: 1. 为什么会在 ck 时候出现空指针?(三个 job 为同一个 kafka topic,通过ck 恢复 job 可以正常运行,应该不是数据的问题) 2. 通过日志看,是可以重启的,为什么自动重启后还存在这个问题,导致一直重启? 感谢~~~ 从网易邮箱大师发来的云附件 08-07error.txt(730.4KB,2020年8月22日 11:37 到期) 下载
Re: Position out of bounds.
感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? On 7/2/2020 18:39,夏帅 wrote: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
回复:Position out of bounds.
感谢 没看明白这个bug引起的原因是什么,或者说有什么合适的解决方案? | | 夏* | | 邮箱:xiagu...@163.com | 签名由 网易邮箱大师 定制 在2020年07月02日 18:39,夏帅 写道: 你好,请问解决了么,我看了下源码,好像是一个bug DataOutputSerializer @Override public void write(int b) throws IOException { if (this.position >= this.buffer.length) { resize(1); } this.buffer[this.position++] = (byte) (b & 0xff); } 此处position应该自增 -- 发件人:xuhaiLong 发送时间:2020年7月2日(星期四) 17:46 收件人:flink 中文社区 主 题:Position out of bounds. flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5
Position out of bounds.
flink 1.10 onYarn job 中 有一个MapState[Long,Bean] https://www.helloimg.com/image/Pe1QR 程序启动一段时间(20分钟)后出现了 附件中的异常 查看对应源码也没看懂是什么引起的异常 https://www.helloimg.com/image/Peqc5 2020-07-02 17:06:19,409 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d847db42ed1d92ac373f9ccf27b846f0) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,410 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (3/3) (ca825ba9712eb520ff6de6b0f9de4dc1) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,426 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (3/3) (b05f5b66fd4c65a9032bb0140a4ce3d1) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,427 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,472 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- groupBy: (userId, sourceUuid, categoryId, gender), select: (userId, sourceUuid, categoryId, gender, MAX(siteId) AS siteId, MAX(score) AS score) -> select: (userId, categoryId, gender, score, siteId) (1/3) (f8f49357b9121d97816d5f83569cd6ac) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,472 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (37befed4aefab35588e5f6d4c372b8c4) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,492 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (2de7f2a3809e8d7e97197cbc0f7c8b4b) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:19,498 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (1/3) (1fe6456a019617839a573d55b1194541) switched from DEPLOYING to RUNNING. 2020-07-02 17:06:52,263 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph- KeyedProcess -> Sink: Unnamed (2/3) (d5fe791177a64ea718eec61b82542e46) switched from RUNNING to FAILED on org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@61d8240a. java.lang.IllegalArgumentException: Position out of bounds. at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:139) at org.apache.flink.core.memory.DataOutputSerializer.setPosition(DataOutputSerializer.java:368) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.resetToKey(RocksDBSerializedCompositeKeyBuilder.java:189) at org.apache.flink.contrib.streaming.state.RocksDBSerializedCompositeKeyBuilder.buildCompositeKeyNamesSpaceUserKey(RocksDBSerializedCompositeKeyBuilder.java:144) at org.apache.flink.contrib.streaming.state.AbstractRocksDBState.serializeCurrentKeyWithGroupAndNamespacePlusUserKey(AbstractRocksDBState.java:149) at org.apache.flink.contrib.streaming.state.RocksDBMapState.get(RocksDBMapState.java:120) at org.apache.flink.runtime.state.ttl.TtlMapState.lambda$getWrapped$0(TtlMapState.java:61) at org.apache.flink.runtime.state.ttl.AbstractTtlDecorator.getWrappedWithTtlCheckAndUpdate(AbstractTtlDecorator.java:92) at org.apache.flink.runtime.state.ttl.TtlMapState.getWrapped(TtlMapState.java:60) at org.apache.flink.runtime.state.ttl.TtlMapState.contains(TtlMapState.java:93) at org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72) at com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:272) at com.netease.wm.trace.usertag.RealTimeUserTag$UserTimeTradeProcess.processElement(RealTimeUserTag.scala:237) at org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:85) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:310) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:469) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
Blink
hello,请教下 "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1", "org.apache.flink" %% "flink-table-planner-blink" % "1.10.1" % "provided", "org.apache.flink" % "flink-table" % "1.10.1" % "provided", 我在项目中添加了这三个依赖,在idea 中 运行的时候出现异常 `Could not instantiate the executor. Makesure a planner module is on the classpath` 而我添加上这个依赖 `"org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided",` 就可以了 但是我要使用 blink 中的ROW_NUMBER() 函数,是我的引入错了吗? 猜测是我没有正确引入 blink ?
Re: Flink DataStream
是我的问题,引用了old planner。感谢! On 6/23/2020 21:05,LakeShen wrote: Hi xuhaiLong, 看你的依赖,应该用的 old planner,你要使用 blink planner 才能使用row_number 函数。要使用 flink-table-planner-blink_2.11 具体文档参考: https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/#table-program-dependencies Best, LakeShen xuhaiLong 于2020年6月23日周二 下午8:14写道: "org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1", "org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided", 看下粘贴的 sbt 依赖 On 6/23/2020 20:06,Jark Wu wrote: 图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。 On Tue, 23 Jun 2020 at 19:59, xuhaiLong wrote: 使用的是1.10.1,在 table api 无法使用ROW_NUMBER On 6/23/2020 19:52,Jark Wu wrote: Hi xuhaiLong, 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old planner 呢? Best, Jark On Tue, 23 Jun 2020 at 19:44, LakeShen wrote: Hi xuhaiLong, 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 Best, LakeShen xuhaiLong 于2020年6月23日周二 下午7:18写道: Hi 请教一个问题 我需要对一个类似这样的数据进行计算获取用户 categoryId | userId | articleID | categoryId | score | | 01 | A | 1 | 10 | | 01 | B | 1 | 20 | | 01 | C | 2 | 30 | 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现? 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗? 感谢!!
Re: Flink DataStream
"org.apache.flink" %% "flink-table-api-scala-bridge" % "1.10.1", "org.apache.flink" %% "flink-table-planner" % "1.10.1" % "provided", 看下粘贴的 sbt 依赖 On 6/23/2020 20:06,Jark Wu wrote: 图片无法查看,你可以把图片上传到某图床,然后将链接贴这里。 On Tue, 23 Jun 2020 at 19:59, xuhaiLong wrote: 使用的是1.10.1,在 table api 无法使用ROW_NUMBER On 6/23/2020 19:52,Jark Wu wrote: Hi xuhaiLong, 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old planner 呢? Best, Jark On Tue, 23 Jun 2020 at 19:44, LakeShen wrote: Hi xuhaiLong, 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 Best, LakeShen xuhaiLong 于2020年6月23日周二 下午7:18写道: Hi 请教一个问题 我需要对一个类似这样的数据进行计算获取用户 categoryId | userId | articleID | categoryId | score | | 01 | A | 1 | 10 | | 01 | B | 1 | 20 | | 01 | C | 2 | 30 | 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现? 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗? 感谢!!
Re: Flink DataStream
使用的是1.10.1,在 table api 无法使用ROW_NUMBER On 6/23/2020 19:52,Jark Wu wrote: Hi xuhaiLong, 1.10 blink planner 是支持 ROW_NUMBER() over 的 (配合 rownum <= N 使用)。你是不是用的 old planner 呢? Best, Jark On Tue, 23 Jun 2020 at 19:44, LakeShen wrote: Hi xuhaiLong, 看下是否能够在 Flink SQL 中,通过自定义 UDTF 来满足你的需求。 Best, LakeShen xuhaiLong 于2020年6月23日周二 下午7:18写道: Hi 请教一个问题 我需要对一个类似这样的数据进行计算获取用户 categoryId | userId | articleID | categoryId | score | | 01 | A | 1 | 10 | | 01 | B | 1 | 20 | | 01 | C | 2 | 30 | 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现? 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗? 感谢!!
Flink DataStream
Hi 请教一个问题 我需要对一个类似这样的数据进行计算获取用户 categoryId | userId | articleID | categoryId | score | | 01 | A | 1 | 10 | | 01 | B | 1 | 20 | | 01 | C | 2 | 30 | 目前我的实现是使用tableAPI 根据 UserId和categoryID 分组做 score 聚合 再通过状态做TopN排序,有没有其他更好的方案来实现? 我使用过 ROW_NUMBER() over() ,在 flink 1.10 上,并无法使用。分组聚合除了使用Table API 还有其他方法实现吗? 感谢!!