Hi,
大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager
heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下:
Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME
没人遇到这种问题吗?
Yang Peng 于2020年12月3日周四 下午8:49写道:
> Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:
>
> private static final String EV_STATE_FLAG = "EV_EID_FLAG";
>
> StateTtlConfig ttlConfig = StateTtlConfig
> .newBuilder(Time.minutes(60))
>
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:
private static final String EV_STATE_FLAG = "EV_EID_FLAG";
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
Hi,咨询一个问题 我们生产环境使用flink1.9版本,使用的statebackend为RocksDB,具体代码设置如下:
private static final String EV_STATE_FLAG = "EV_EID_FLAG";
StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Time.minutes(60))
.updateTtlOnCreateAndWrite()
.neverReturnExpired()
Hi,请教一个问题 我们的一个Flink实时任务中对两个流A和B进行union操作,然后和C流进行connect操作,数据流程大概是这样的:D =
A.union(B)
C.connect(D).keyby(C.key,D.key).process().addsink(kafkaProducer);
A流和B流的数据类型是一样的,但是并发不一样,A流的并发大,B流的并发小,在某一时刻A流出现一波流量尖刺,之后发现输出到kafka的的数据中没有A流的数据都是B流的
> Best,
> tison.
>
>
> Yang Peng 于2020年9月30日周三 下午5:26写道:
>
> > 感谢回复,是的,之前确实怀疑是业务逻辑导致的
> > 但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
> >
> > tison 于2020年9月30日周三 下午2:33写道:
> >
> > > Hi Yang,
> > >
> > > 你的意思是上游输出没变,
tison 于2020年9月30日周三 下午5:33写道:
>
> > 那有审计/监控的话看下每个节点的 in/out 记录呗,总能看到是哪一步跌了...
> >
> > 照你现在提供的信息听起来一切正常那就是业务逻辑本身输出少了,不然总得有哪里不一样。如果只有 sink 跌了,那就是 sink
> > 有问题,比如可能依赖了外部环境或者内部积累错误等等。
> >
> > Best,
> > tison.
> >
> >
> > Yang Peng 于2020年9月
感谢回复,是的,之前确实怀疑是业务逻辑导致的
但是重启任务之后数据输出恢复了,而且让任务从故障点重新消费也没发现问题,我们这个任务已经跑了几个月了第一次遇到这种问题
tison 于2020年9月30日周三 下午2:33写道:
> Hi Yang,
>
> 你的意思是上游输出没变,全链路没有负载升高甚至反而降低,sink 输出变少么?
>
> 如果全链路没有异常也没有负载升高、流量阻塞,那感觉就是业务逻辑的实际结果,可以看看输入数据的内容有没有变化。
>
> Best,
> tison.
>
>
> Yang Peng
format 能力的话,可以自己测试下单个线程的QPS多少,然后乘以 Partition个数就是了。
> Best,
> Hailong Wang
> 在 2020-09-29 20:06:50,"Yang Peng" 写道:
>
> >感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
> >flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以
感谢回复,我重启完任务之后消费恢复了,我查看了我们的监控(监控kafkamanager上groupid消费速度)发现消费速度并没有下降,目前分区是90
flinkkafkaconsumer消费的并行度也是90 应该不是分区的问题,至于2这个source序列化耗时严重这个有什么方式可以查看吗?
hailongwang <18868816...@163.com> 于2020年9月29日周二 下午8:59写道:
>
>
>
> Hi Yang Peng:
> 根据你的描述,可以猜测瓶颈是在 Source 上,有可能以下情况:
> 1. Kafk
请教大家一个问题,flink实时任务消费kafka写入到kafka,Flink版本是1.9.1 线上kafka集群为1.1.1
kafka集群为容器化集群部署在K8s上,任务运行了很久 今天突然发现任务的数据产出降低了很多,发现cp 没有问题
kafka消费没有积压,也没有反压, 也没有任何异常日志,kafka集群也没有异常,flink集群也没有异常,但是发现监控上tm的cpu负载也降低了
tm上网卡流量也降低了,除此之外没有其他异常信息,大家又遇到这种情况的吗、
tadata 这个文件吗?
> Best,
> Congxian
>
>
> Yang Peng 于2020年8月17日周一 下午5:47写道:
>
> > 找到了 具体日志如下:2020-08-13 19:45:21,932 ERROR
> > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error
> > occurred in the cluster entrypoint.
> >
> > org.apache.
好的 感谢
JasonLee <17610775...@163.com> 于2020年8月14日周五 下午9:22写道:
> hi
>
> 没有日志不太好定位失败的原因 但是没有设置uid的话 是有可能重启失败的 建议还是都设置uid最好
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
我们这个任务的operator没有分配uid,之前也没有分配uid但是从cp恢复过好几次都成功了 就这次没有成功
宇 于2020年8月14日周五 下午1:57写道:
> 有没有可能是没分配uid,然后dag发生了变化,导致的恢复不了状态
>
>
>
> ---原始邮件---
> 发件人: "Yang Peng" 发送时间: 2020年8月14日(周五) 中午1:02
> 收件人: "user-zh" 主题: Flink任务写入kafka 开启了EOS,statebackend为rocksdb,任务反压s
Hi,咨询各位一个问题我们有个任务,statebackend为rocksdb
增量执行cp,flink读取kafka经过处理然后写入到kafka,producer开启了EOS,最近发现任务有反压,source端日志量有积压,然后准备改一下资源分配多加一些资源(没有修改并行度,代码未做修改)从cp恢复任务,任务被cancel之后然后从cp恢复发现起不来了连续两次都不行,由于客户端日志保存时间太短当时没来得及去查看客户端日志,所以没有找到客户端日志,
感谢唐云老师的回复,问题已经找到了,是我们切换了statebackend为filesystem时使用的state
清除策略配置有误导致的,这样设置清除策略cleanupIncrementally(1, true) 执行cp就会非常慢,后来我们修改成了
.cleanupIncrementally(100, false)这样执行cp速度就马上上来了,感谢各位的帮助
teBackend有直接关系。
>
> 祝好
> 唐云
> ____
> From: Yang Peng
> Sent: Monday, August 10, 2020 15:55
> To: user-zh
> Subject: Flink任务大状态使用filesystem反压
>
> Hi,咨询各位一个问题,我们线上任务使用rocksdb作为statebackend
>
> 时间久了发现会出现反压,查看服务器监控发现机器io经常是满的,为了保证业务稳定性,现在将statebackend改为filesyste
Hi,咨询各位一个问题,我们线上任务使用rocksdb作为statebackend
时间久了发现会出现反压,查看服务器监控发现机器io经常是满的,为了保证业务稳定性,现在将statebackend改为filesystem,但是发现已经配置了很大的内存,使用filesystem之后执行cp时间特别长,而且kafka数据源积压很大,大家有遇到这种情况的吗?是使用filesystem的姿势不对吗?
哦哦,我目前的做法是在processElement方法里面注册Timer 然后再onTimer方法里面手动删除state中过期的数据;
Dian Fu 于2019年11月8日周五 上午10:42写道:
> 是的。不过及时对于keyed state,如果你没有用TTL state这个功能,也是有可能返回过期的state的。
>
> > 在 2019年11月8日,上午10:24,Yang Peng 写道:
> >
> > 嗯嗯,谢谢 付典老师,我理解的是虽然他不会删除但是也不应该返回过期的state值吧,应该是茶干老师说的那样只有ke
的是在ttl到达之前,数据不会清空,但是不保证ttl到达之后,数据一定清空。1.8.0之后提供了更完善的功能,可以看一下这个文章:
> https://flink.apache.org/2019/05/19/state-ttl.html <
> https://flink.apache.org/2019/05/19/state-ttl.html>
> > 在 2019年11月7日,下午3:06,Yang Peng 写道:
> >
> >
> >
> > ---
.org/2019/05/19/state-ttl.html>
> > 在 2019年11月7日,下午3:06,Yang Peng 写道:
> >
> >
> >
> > -- Forwarded message -
> > 发件人: yangpengklf007 yangpengklf...@gmail.com>>
> > Date: 2019年11月7日周四 下午3:00
> > Subject: 广播状态是否可以设置ttl过期时间
> &
-- Forwarded message -
发件人: yangpengklf007
Date: 2019年11月7日周四 下午3:00
Subject: 广播状态是否可以设置ttl过期时间
To: user-zh@flink.apache.org
如下图是我设置得测试代码:在类中定义一个mapstatedesc,然后在processBroadcastElement方法中获取广播数据放入到state中
设置ttl是1s然后让程序sleep10s
一般情况下是内存太小了,导致的问题
应聘程序员 北京邮电大学 <13341000...@163.com> 于2019年4月9日周二 下午1:37写道:
> hi, 大家好!
> 今天运行flink时抛出了Buffer pool is
> destroyed异常,数据源是kafka;消费前kafka队列中堆积了8G左右的数据。详细报错如下:
>
>
>
> org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException:
> Could not forward element to
有没有参考flink官方源码示例中的这个例子:
org.apache.flink.addons.hbase.example.HBaseWriteExample
这个类写的就是flink插入HBase 效率很高 我们实际生产也用到了插入HBase但是效率很高,你可以看一下这个源码;
张作峰 于2019年4月6日周六 下午4:38写道:
> 业务场景中,需要将处理后的消息写入到HBase中,由于写入HBase慢,引起消息堆积。
> 通过Stream API 有没有方法可以异步批量发送?
> 谢谢!
flink的historyserver 貌似只能查看completed jobs 不能查看日志,这个跟spark的historyserver有差别吧
Biao Liu 于2019年4月8日周一 下午3:43写道:
> 1. 这个日志确实会存在,如果你觉得5秒打印两行不能接受的话,我能想到的几种解决方法
> 1.1. 加大 checkpoint 间隔
> 1.2. 单独指定该 logger 的 level,修改
>
>
25 matches
Mail list logo