Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container

2021-01-06 文章 Yang Peng
Hi, 大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下: Dump of the process-tree for container_e06_1603181034156_0137_01_02 : |- PID PPID PGRPID SESSID CMD_NAME

Re: Flink1.9设置TTL不生效

2020-12-04 文章 Yang Peng
没人遇到这种问题吗? Yang Peng 于2020年12月3日周四 下午8:49写道: > Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下: > > private static final String EV_STATE_FLAG = "EV_EID_FLAG"; > > StateTtlConfig ttlConfig = StateTtlConfig > .newBuilder(Time.minutes(60)) >

Flink1.9设置TTL不生效

2020-12-03 文章 Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下: private static final String EV_STATE_FLAG = "EV_EID_FLAG"; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(60)) .updateTtlOnCreateAndWrite() .neverReturnExpired()

Flink1.9设置TTL不生效

2020-12-03 文章 Yang Peng
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下: private static final String EV_STATE_FLAG = "EV_EID_FLAG"; StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.minutes(60)) .updateTtlOnCreateAndWrite() .neverReturnExpired()

Flink实时两个stream进行union,结果只有其中一个流的数据发送到了下游

2020-11-20 文章 Yang Peng
Hi,请教一个问题 我们的一个Flink实时任务中对两个流A和B进行union操作,然后和C流进行connect操作,数据流程大概是这样的:D = A.union(B) C.connect(D).keyby(C.key,D.key).process().addsink(kafkaProducer); A流和B流的数据类型是一样的,但是并发不一样,A流的并发大,B流的并发小,在某一时刻A流出现一波流量尖刺,之后发现输出到kafka的的数据中没有A流的数据都是B流的

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
> Best, > tison. > > > Yang Peng 于2020年9月30日周三 下午5:26写道: > > > 感谢回复,是的,之前确实怀疑是业务逻辑导致的 > > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 > > > > tison 于2020年9月30日周三 下午2:33写道: > > > > > Hi Yang, > > > > > > 你的意思是上游输出没变,

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
tison 于2020年9月30日周三 下午5:33写道: > > > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了... > > > > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink > > 有问题,比如可能依赖了外部环境或者内部积累错误等等。 > > > > Best, > > tison. > > > > > > Yang Peng 于2020年9月

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-30 文章 Yang Peng
感谢回复,是的,之前确实怀疑是业务逻辑导致的 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题 tison 于2020年9月30日周三 下午2:33写道: > Hi Yang, > > 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么? > > 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。 > > Best, > tison. > > > Yang Peng

Re: Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。 > Best, > Hailong Wang > 在 2020-09-29 20:06:50,"Yang Peng" 写道: > > >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 > >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以

Re: Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90 flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗? hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道: > > > > Hi Yang Peng: > 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况: > 1. Kafk

Flink消费kafka,突然间任务输出结果从原来的每秒几十万降低到几万每秒

2020-09-29 文章 Yang Peng
请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1 kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题 kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了 tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、

Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-19 文章 Yang Peng
tadata 这个文件吗? > Best, > Congxian > > > Yang Peng 于2020年8月17日周一 下午5:47写道: > > > 找到了 具体日志如下:2020-08-13 19:45:21,932 ERROR > > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error > > occurred in the cluster entrypoint. > > > > org.apache.

Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-16 文章 Yang Peng
好的 感谢 JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道: > hi > > 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-14 文章 Yang Peng
我们这个任务的operator没有分配uid,之前也没有分配uid但是从cp恢复过好几次都成功了 就这次没有成功 宇 于2020年8月14日周五 下午1:57写道: > 有没有可能是没分配uid,然后dag发生了变化,导致的恢复不了状态 > > > > ---原始邮件--- > 发件人: "Yang Peng" 发送时间: 2020年8月14日(周五) 中午1:02 > 收件人: "user-zh" 主题: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压s

Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压source端日志量堆积从cp恢复失败

2020-08-13 文章 Yang Peng
Hi,咨询各位一个问题我们有个任务,statebackend为rocksdb 增量执行cp,flink读取kafka经过处理然后写入到kafka,producer开启了EOS,最近发现任务有反压,source端日志量有积压,然后准备改一下资源分配多加一些资源(没有修改并行度,代码未做修改)从cp恢复任务,任务被cancel之后然后从cp恢复发现起不来了连续两次都不行,由于客户端日志保存时间太短当时没来得及去查看客户端日志,所以没有找到客户端日志,

Re: Flink任务大状态使用filesystem反压

2020-08-11 文章 Yang Peng
感谢唐云老师的回复,问题已经找到了,是我们切换了statebackend为filesystem时使用的state 清除策略配置有误导致的,这样设置清除策略cleanupIncrementally(1, true) 执行cp就会非常慢,后来我们修改成了 .cleanupIncrementally(100, false)这样执行cp速度就马上上来了,感谢各位的帮助

Re: Flink任务大状态使用filesystem反压

2020-08-10 文章 Yang Peng
teBackend有直接关系。 > > 祝好 > 唐云 > ____ > From: Yang Peng > Sent: Monday, August 10, 2020 15:55 > To: user-zh > Subject: Flink任务大状态使用filesystem反压 > > Hi,咨询各位一个问题,我们线上任务使用rocksdb作为statebackend > > 时间久了发现会出现反压,查看服务器监控发现机器io经常是满的,为了保证业务稳定性,现在将statebackend改为filesyste

Flink任务大状态使用filesystem反压

2020-08-10 文章 Yang Peng
Hi,咨询各位一个问题,我们线上任务使用rocksdb作为statebackend 时间久了发现会出现反压,查看服务器监控发现机器io经常是满的,为了保证业务稳定性,现在将statebackend改为filesystem,但是发现已经配置了很大的内存,使用filesystem之后执行cp时间特别长,而且kafka数据源积压很大,大家有遇到这种情况的吗?是使用filesystem的姿势不对吗?

Re: 广播状态是否可以设置ttl过期时间

2019-11-07 文章 Yang Peng
哦哦,我目前的做法是在processElement方法里面注册Timer 然后再onTimer方法里面手动删除state中过期的数据; Dian Fu 于2019年11月8日周五 上午10:42写道: > 是的。不过及时对于keyed state,如果你没有用TTL state这个功能,也是有可能返回过期的state的。 > > > 在 2019年11月8日,上午10:24,Yang Peng 写道: > > > > 嗯嗯,谢谢 付典老师,我理解的是虽然他不会删除但是也不应该返回过期的state值吧,应该是茶干老师说的那样只有ke

Re: 广播状态是否可以设置ttl过期时间

2019-11-07 文章 Yang Peng
的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章: > https://flink.apache.org/2019/05/19/state-ttl.html < > https://flink.apache.org/2019/05/19/state-ttl.html> > > 在 2019年11月7日,下午3:06,Yang Peng 写道: > > > > > > > > ---

Re: 广播状态是否可以设置ttl过期时间

2019-11-07 文章 Yang Peng
.org/2019/05/19/state-ttl.html> > > 在 2019年11月7日,下午3:06,Yang Peng 写道: > > > > > > > > -- Forwarded message - > > 发件人: yangpengklf007 yangpengklf...@gmail.com>> > > Date: 2019年11月7日周四 下午3:00 > > Subject: 广播状态是否可以设置ttl过期时间 > &

Fwd: 广播状态是否可以设置ttl过期时间

2019-11-06 文章 Yang Peng
-- Forwarded message - 发件人: yangpengklf007 Date: 2019年11月7日周四 下午3:00 Subject: 广播状态是否可以设置ttl过期时间 To: user-zh@flink.apache.org 如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中 设置ttl是1s然后让程序sleep10s

Re: 求助flink Buffer pool is destroyed异常

2019-04-09 文章 Yang Peng
一般情况下是内存太小了,导致的问题 应聘程序员 北京邮电大学 <13341000...@163.com> 于2019年4月9日周二 下午1:37写道: > hi, 大家好! > 今天运行flink时抛出了Buffer pool is > destroyed异常,数据源是kafka;消费前kafka队列中堆积了8G左右的数据。详细报错如下: > > > > org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException: > Could not forward element to

Re: 写HBase慢造成消息堆积,有没有异步IO可以用于sink或者outputFormat的方法?

2019-04-08 文章 Yang Peng
有没有参考flink官方源码示例中的这个例子: org.apache.flink.addons.hbase.example.HBaseWriteExample 这个类写的就是flink插入HBase 效率很高 我们实际生产也用到了插入HBase但是效率很高,你可以看一下这个源码; 张作峰 于2019年4月6日周六 下午4:38写道: > 业务场景中,需要将处理后的消息写入到HBase中,由于写入HBase慢,引起消息堆积。 > 通过Stream API 有没有方法可以异步批量发送? > 谢谢!

Re: flink on yarn 模式 日志问题

2019-04-08 文章 Yang Peng
flink的historyserver 貌似只能查看completed jobs 不能查看日志,这个跟spark的historyserver有差别吧 Biao Liu 于2019年4月8日周一 下午3:43写道: > 1. 这个日志确实会存在,如果你觉得5秒打印两行不能接受的话,我能想到的几种解决方法 > 1.1. 加大 checkpoint 间隔 > 1.2. 单独指定该 logger 的 level,修改 > >