Re: 如何正确的利用StateTtlConfig为State设置过期时间

2019-02-09 文章 Yun Tang
Hi 银兵 1. StateTtlConfig目前只能做到state层级的,目前还不支持每个key设置一个超时时间的。如果过期时间比较短的话,其实可以考虑给key附上自定义的时间戳用window来满足你的需求。 2. 就我所知,queryable state的工业级应用的话,有一个拿来做记账系统的[1],他们也在Github上分享了相关源码demo[2]。还有一个是实验性质或者说更像爱好领域的应用,将queryable state用在在线机器学习中[3]。国内似乎没有大规模使用queryable state的,如果您那里有什么进展或者尝试也欢迎分享。 [1] htt

Re: flink 1.7.2集群异常退出

2019-03-12 文章 Yun Tang
Hi 你是不是没有配置checkpoint path,且没有显式的配置FsStateBackend或者RocksDBStateBackend,这应该是一个MemoryStateBackend 在配置HA却没有配置checkpoint path时候的bug,参见我之前创建的JIRA https://issues.apache.org/jira/browse/FLINK-11107 相关PR已经提交了,不过社区认为MemoryStateBackend更多的是debug用 或者 实验性质的toy,不会有生产环境直接使用,加之最近忙于release-1.8的发布,所以暂时还没有revie

Re: flink异常触发savepoint

2019-03-22 文章 Yun Tang
Hi 如果异常退出,你在enable externalized checkpoint时候会配置作业退出时候的checkpoint保留策略,默认是作业failed的时候会保留externalized checkpoint。 你可以在相关目录下找到相关checkpoint的目录(判断条件chk-X目录下是否有_metadata文件,如果存在的话,就可以认为这个checkpoint是有效的),这样你就可以使用-s 参数指定从该externalized checkpoint恢复,Flink会将该checkpoint当作savepoint处理进行作业恢复。 祝好 唐云 ___

Re: RocksDB中指定nameNode 的高可用

2019-03-26 文章 Yun Tang
Hi Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。 祝好 唐云 From: 戴嘉诚 Sent: Tuesday, March 26, 2019 16:57 To: user-zh@flink.apache.org Subject: RocksDB中指定nameNode 的高可用   嘿,我想询问一下,flink中的RocksDB位置 我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNod

Re: flink ha hdfs目录权限问题

2019-04-01 文章 Yun Tang
Hi 孙森, 将提交用户root加到hadoop的hdfs用户组内,或者使用hadoop的hdfs用户提交程序[1],或者修改整个目录HDFS:///flink/ha的权限[2] 放开到任意用户应该可以解决问题,记得加上 -R ,保证对子目录都生效。 [1] https://stackoverflow.com/questions/11371134/how-to-specify-username-when-putting-files-on-hdfs-from-a-remote-machine [2] https://hadoop.apache.org/docs/r2.4.1/had

Re: flink ha hdfs目录权限问题

2019-04-01 文章 Yun Tang
0 2019-04-01 15:12 /flink/ha/dee74bc7-d450-4fb4-a9f2-4983d1f9949f drwxrwxrwx - hdfs hdfs 0 2019-04-01 15:13 /flink/ha/edd59fcf-8413-4ceb-92cf-8dcd637803f8 drwxrwxrwx - hdfs hdfs 0 2019-04-01 15:13 /flink/ha/f6329551-56fb-4c52-a028-51fd838c4af6 > 在 2019年4月1日,下午4:02,

Re: 使用hdfs保存checkpoint一段时间后报错

2019-04-30 文章 Yun Tang
Hi 志鹏 核心原因是HDFS的问题 Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /flink/checkpoints/PaycoreContextHopJob/cbb3a580d0323fbace80e71a25c966d0/chk-11352/fc4b8b08-2c32-467c-a1f4-f384eba246ff could only be replicated to 0 nodes instead of minReplication (=1). There are 3

Re:[State Backend] 请教个问题,checkpoint恢复失败

2019-05-02 文章 Yun Tang
Hi 错误栈是恢复state时候,读取的stream被关闭了,如果HDFS本身没有出问题的话,这个应该不是root cause,日志里面还有其他异常么? 祝好 唐云 发自我的小米手机 在 eric ,2019年4月30日 16:30写道: 大家好: 刚接触flink, 跑了个测试state checkpoint的程序: 1) 数据源是socket模式,用的是keyed state backend; 提交job跑一会 2) 然后关闭数据源的socket,这时job会进入failed状态 3) 停几秒,把数据源socket重新打开 4) 这时flink会重连socket, 对job

Re: Rocksdb作为状态后端启动时报错

2019-05-10 文章 Yun Tang
Hi Root cause其实是最后一行 ”Caused by: java.lang.NoClassDefFoundError: Could not initialize class org.apache.hadoop.hdfs.protocol.HdfsConstants“ 与rocksDB无关,检查一下运行时classpath里面有没有这个类,可以先确认一下flink-shaded-hadoop2-xx.jar 在不在你的classpath里面。 祝好 唐云 From: zhang yue Sent: Frid

Re: flink metrics的 Reporter 问题

2019-05-15 文章 Yun Tang
Hi 嘉诚 不清楚你使用的Flink具体版本,不过这个显示host-name第一部分的逻辑是一直存在的,因为大部分场景下host-name只需要取第一部分即可表征。具体实现代码可以参阅 [1] 和 [2] 。 受到你的启发,我创建了一个JIRA [3] 来追踪这个问题,解法是提供一个metrics options,使得你们场景下可以展示metrics的完整hostname 祝好 唐云 [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runt

Re: checkpoint stage size的问题

2019-06-26 文章 Yun Tang
你好 这个问题问得有点稍微宽泛,因为并没有描述你所认为的checkpoint state size越来越大的周期。checkpoint state size变大有几个原因: 1. 上游数据量增大。 2. window设置时间较长,尚未触发,导致window内积攒的数据比较大。 3. window的类型决定了所需要存储的state size较大。 可以参考社区的文档[1] window state的存储空间问题。另外,在上游数据量没有显著变化的时候,若干窗口周期后的checkpoint state size应该是比较稳定的,由于未明确你的观察周期,所以只能给出比较宽

Re: checkpoint stage size的问题

2019-06-26 文章 Yun Tang
你好 从附件的web监控看,其实你的整体checkpoint state其实很小(只有20几MB),所以对于这个问题其实有些过度关注了。 关于checkpoint state的变化,需要观察不同operator的情况,可以点开详细页看每个并发的情况。对比operator state和window所使用的keyed state的变化情况。我估计keyed state部分会有些许波动,主要是因为你使用的是RocksDB state backend,其实上传的是rocksDB的sst文件,当register timer时,window state会进行存储,当onTimer时,相关stat

Re: Re:Flink1.8+Hadoop3.1.2 编译问题

2019-06-28 文章 Yun Tang
你好 因为从Flink-1.8 开始,flink的默认编译选项里面就不再带上hadoop依赖了。可以参考[1] 了解更多信息。实际上从官方的下载链接[2]里面也说明了从Flink-1.8开始shaded-hadoop的相关jar包需要单独下载并放置在lib目录下。 如果需要shaded-hadoop jar包,可以单独去编译好的 flink-shaded-hadoop 子项目目录下找到相关的jar包。 [1] https://issues.apache.org/jira/browse/FLINK-11266 [2] https://flink.apache.org/downloads

Re: Flink 1.9 进度跟踪方法

2019-07-01 文章 Yun Tang
hi Luan 你可以在 Flink的邮件列表里面去追踪 http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Features-for-Apache-Flink-1-9-0-td28701.html,据我所知,在JIRA上似乎没有一个总览的issue去追踪进度。 祝好 唐云 From: Luan Cooper Sent: Monday, July 1, 2019 16:17 To: user-zh@flink.apache.org

Re: Flink中对于state不是很大,但是需要频繁checkpoint的任务,backendstate是选file还是rockdb?

2019-07-02 文章 Yun Tang
hi 首先,就算选择rocksDB statebackend,也是需要写HDFS的,只是在开启了incremental checkpoint方式情况下可以减少每次hdfs数据写入。 我觉得这个问题核心是一个trade off。不做checkpoint的时候,RocksDBStateBackend的读写性能不如纯内存的FsStateBackend。而在checkpoint的同步阶段,RocksDB stateBackend需要全量写本地磁盘,比FsStateBackend的内存操作可能要慢一些,也会影响吞吐。在checkpoint的异步阶段,由于RocksDB stateBack

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 文章 Yun Tang
Hi A1: chk-x文件下面的文件个数是跟operator个数并行度是有关系的,主要是operator state的文件。对于checkpoint场景,_metadata只是元数据,真实的operator数据都是在其他文件内。 A2: 不可以将这些文件合并在一起。因为_metadata内主要记录了文件路径,如果合并的话,找不到原始路径会有问题,无法从checkpoint进行restore 祝好 唐云 From: 陈冬林 <874269...@qq.com> Sent: Thursday, July 18, 201

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 文章 Yun Tang
com> Sent: Thursday, July 18, 2019 15:34 To: user-zh@flink.apache.org Subject: Fwd: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗? 谢谢您的解答, 那些文件的数量是只和operator的并行度相关吗?是不是还有key 的个数等相关?有没有具体的公式呢?我没有在源码里找到这块的逻辑 还有一个最重要的问题,这些文件即然不能合并,state小文件合并指的是那些文件呢? 祝安 Andrew > 下面是被转发的邮件: > > 发件

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 文章 Yun Tang
hi 首先先要确定是否是大量创造文件导致你的namenode RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 [https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint interval太小,导致文件不断被创建和删除(subsume old checkpoint),先找到NN压力大的root cause吧。 至于使用FsStateBackend能否减少checkpoint文件数量,这是另外一个话题。首先,我需要弄清楚

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-18 文章 Yun Tang
是另外一个话题 请问这个话题有没有什么优化点可以启发一下吗? 在 2019年7月18日,下午9:34,Yun Tang mailto:myas...@live.com>> 写道: hi 首先先要确定是否是大量创造文件导致你的namenode RPC相应堆积多,RPC请求有很多种,例如每个task创建checkpoint目录也是会向namenode发送大量RPC请求的(参见 [https://issues.apache.org/jira/browse/FLINK-11696]);也有可能是你的checkpoint interval太小,导致文件不断被创建和删除

Re: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗?

2019-07-19 文章 Yun Tang
nt 小文件合并:都是规模惹的祸,随着整个集群 Flink JOB 越来越多,CP 文件数也水涨船高,最后压的 HDFS NameNode 不堪重负,阿里巴巴通过把若干 CP 小文件合并成一个大文件的组织方式,最终把 NameNode 的压力减少了几十倍 ” > 下面是被转发的邮件: > > 发件人: Yun Tang > 主题: 回复: checkpoint 文件夹Chk-no 下面文件个数是能计算出来的吗? > 日期: 2019年7月19日 GMT+8 上午11:05:53 > 收件人: "user-zh@flink.apache

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章 Yun Tang
Hi all 你们讨论的已经越来越偏了,出问题的是operator state backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。 To 戴嘉诚 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么? 祝好 唐云 From: 戴嘉诚 Sent: Thursday, July 25, 2019 19:04 To: user-zh@flink.apache.org Subject: Re:

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章 Yun Tang
包的, operator state descriptor是使用MapStateDescriptor, 谢谢! Yun Tang 于2019年7月25日周四 下午7:10写道: > Hi all > > 你们讨论的已经越来越偏了,出问题的是operator state > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。 > > To 戴嘉诚 > 你使用的Flink版本是什么?这个operator 里面的operator state descr

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章 Yun Tang
time >= next.getValue()) { iterator.remove(); --stateSize; ++removeState; } } if (stateSize == 0) { accumulateStateMap.clear(); } //把这个定时器删除掉 ctx.timerService().deleteEventTimeTimer(time

Re: Re: Flink RocksDBStateBackend 问题

2019-08-05 文章 Yun Tang
@lvwenyuan 首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file system,还是FsStateBackend,建议下次提问前可以把需要咨询的内容表述清楚一些。 * 如果指的是存储checkpoint数据的远程file system,在incremental checkpoint场景下,这些数据与RocksDB的创建checkpoint时刷写到本地的sst文件和meta文件是二进制相同的,只是文件名会重命名。如果是savepoint或者全量checkpoin

Re: Re:Re: Re: Flink RocksDBStateBackend 问题

2019-08-06 文章 Yun Tang
般会将数据保存在HDFS上,那么这个是,我的命令是 flink cancel -s 。做完savepoint就退出,那么这个时候rocksdb还需要去写数据吗?因为做完savepoint,整个任务就结束了。 望解答,谢谢老师! 在 2019-08-06 13:44:18,"Yun Tang" 写道: >@lvwenyuan<mailto:lvwenyuan...@163.com> >首先需要明确的一点是,你这里的“FileSystem”指的究竟是checkpoint时存储数据的file >system,还是FsS

Re: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 文章 Yun Tang
Hi 中锋 恐怕不能通过代码来回复savepoint,目前一共只有有两个地方可以传入savepoint path,分别是 1. CliFrontendParser#createSavepointRestoreSettings [1] 2. JarRunHandler#getSavepointRestoreSettings [2] 分别对应命令行,网页(REST)提交,没办法在代码里面进行恢复请求,其实我理解REST或者网页提交应该也满足你们的需求。 [1] https://github.com/apache/flink/blob/f400fb

Re: flink启动等待10分钟问题

2019-08-21 文章 Yun Tang
Hi Flink on YARN作业启动时间长,有很多原因,例如资源不够在等待,container申请的时候又退出了。默认的slot request的timeout时间是5min,感觉你的作业应该是可能遇到了一个slot request timeout,然后又重新申请。最好能提供一下jobmanager的日志才好进一步分析。 祝好 唐云 From: 々守护々 <346531...@qq.com> Sent: Thursday, August 22, 2019 11:04 To: user-zh Subject: fl

Re: 回复: flink启动等待10分钟问题

2019-08-21 文章 Yun Tang
tamps/Watermarks -> from: (request, curuserid, timelong, rowtime) -> select: (rowtime, 0 AS $f1, curuserid) -> time attribute: (rowtime) (4/4) (78f520582607e26b365fca483fc98d4c) switched from DEPLOYING to RUNNING. 2019-08-21 20:00:40,006 INFO org.apache.flink.runtime.executiongraph.Execution

Re: 任务内存增长

2019-08-25 文章 Yun Tang
hi 张坤 使用的是RocksDBStateBackend么,一般被YARN的node manager内存超用而kill是native 内存超用导致的。可以在Flink 参数env.java.opts.taskmanager里面加上 -XX:NativeMemoryTracking=detail [1],这样可以观察内存是否增长。另外你使用的内存配置和被kill时候的YARN的日志分别是什么呢,可以考虑增大JVM heap 申请的资源来变相加大向YARN申请的总内存,某种程度上可以缓解被kill的概率。 [1] https://docs.oracle.com/javase/8/

Re: 关于flink中端对端的精确一次性理解问题

2019-08-27 文章 Yun Tang
Hi 可以看一下TwoPhaseCommitSinkFunction的实现,preCommit是在snapshotState时调用,会将当前的currentTransactionHolder存储到pendingCommitTransactions,直到notifyCheckpointComplete时(也就是commit时),将pendingCommitTransactions取出进行事务性操作。所以preCommit时候不需要写是不符合语义的。 如果借助TwoPhaseCommitSinkFunction,确实需要适当减少checkpoint interval,否则可能很久都没有输出

Re: PathIsNotEmptyDirectoryException 异常

2019-08-27 文章 Yun Tang
在Flink看来,删除操作都是同一个线程内顺序执行的,S3AFileSystem.delete 接口看上去也不是异步执行的。出异常的时候,能检查一下chk-50832 目录里面还有什么么? From: Andrew Lin Sent: Tuesday, August 27, 2019 16:53 To: wang...@cmcm.com ; user-zh@flink.apache.org Subject: Fwd: PathIsNotEmptyDirectoryException 异常 https://issu

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 Yun Tang
Hi 蒋涛涛 Flink的kafka consumer一共有三种offset commit模式: 1. OffsetCommitMode.DISABLED 完全disable offset的commit 2. OffsetCommitMode.ON_CHECKPOINTS Flink的默认行为,只有当Flink checkpoint完成时,才会将offset commit到Kafka 3. OffsetCommitMode.KAFKA_PERIODIC 使用Kafka的internal client的默认行为,周期性将

Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论

2019-08-28 文章 Yun Tang
Sent: Thursday, August 29, 2019 11:45 To: user-zh@flink.apache.org Subject: Re: flink不清理state,从checkpoint恢复任务能重置kafka的offset讨论 Hi Yun Tang, 其实默认情况下,我其实是默认想从checkpoint恢复kafka当前消费的进度的,但是遇到特别情况下,从某个时间点开始消费数据,就像您说的想要主要恢复keyed state相关数据,如果把setCommitOffsetsOnCheckpoints(false),kakfa properties里面设置

Re: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-01 文章 Yun Tang
Hi 向0.0.0.0:8030 尝试提交作业是因为提交作业时找不到正确的YARN配置,就会向默认的本地8030端口提交,检查一下HADOOP_CONF_DIR 或者 HADOOP_HOME 这些环境变量有没有设置正确。可以设置一下这些配置文件的目录地址就可以提交作业了。 BTW,这个不是一个Flink的问题,是所有使用YARN管理作业的大数据计算引擎都有可能遇到的问题。 祝好 唐云 From: 周��岗 Sent: Sunday, September 1, 2019 15:31 To: user-zh@flink.

Re: 回复: flink使用StateBackend问题

2019-09-03 文章 Yun Tang
Hi 1. Checkpoint 超时时间设置是多少(是默认的10min么),如果超时时间太短,容易checkpoint failure 2. 所有的subtask都是n/a 么,source task的checkpoint没有rocksDb的参与,与使用默认的MemoryStateBackend其实是一样的,不应该source task也没有完成checkpoint(除非一直都拿不到StreamTask里面的锁,一直都在process element) 3. 作业的反压情况如何,是不是使用RocksDB时候存在严重的反压(back pressure)情况?如果作

Re: 回复: flink1.9 webui exception日志显示问题

2019-10-10 文章 Yun Tang
hi Web上保存的历史异常数目是有限的,只会保存20个 [1],如果更旧的异常被冲掉了,直接去jobmanager日志里面检索异常信息吧。另外,你的问题是彻底看不到任何一条历史异常,还是看不到最老的历史异常? [1] https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L54 祝好 唐云

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-10 文章 Yun Tang
Hi 戴嘉诚 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了 [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志? [1] https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/s

Re: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章 Yun Tang
ent: Friday, October 11, 2019 15:17 To: user-zh Subject: Re: 回复: flink 缓存本地文件被删除疑问 Hi 这是早上发生异常后,我下载的日志,请麻烦查看一下。 taskmanager.log <https://drive.google.com/file/d/17nP8yxSpdAnDDgBEbEUrDXYx-rwosi52/view?usp=drive_web> Yun Tang 于2019年10月11日周五 下午2:56写道: > Hi 戴嘉诚 > > 你的异常发生在failove

Re: 关于使用RocksDBStateBackend 启用state.backend.rocksdb.ttl.compaction.filter.enabled 配置的问题

2019-10-11 文章 Yun Tang
Hi 我觉得你的担心是在TTL尚未过期的周期内,数据就已经写满磁盘了,这个肯定不是TTL能涵盖的问题,从作业规模上尝试限制写入量,或者增大并发,降低单个rocksDB需要承担的数据量(前提是你的所有机器的磁盘空间是大于你的数据量的)。另外如果真的很担心的话,换一个压缩率更小的算法 也有一些帮助(代价是更耗时更耗CPU, rocksDB 官方推荐ZTSD或者Zlib)[1],设置compression type可以参考rocksdb ColumnFamilyOptions的setCompressionType 方法 [2] [1] https://github.com/facebook

Re: 如何过滤异常的timestamp?

2019-10-30 文章 Yun Tang
Hi 瑞斌 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter operator,将ingestion的time与message中的event time进行比较,超过一定阈值的可以丢弃掉不传到下游去。 祝好 唐云 From: 邢瑞斌 Sent: Wednesday, October 30, 2019 17:57 To: user-z

Re: 如何过滤异常的timestamp?

2019-10-31 文章 Yun Tang
个设为IngestionTime,后续的Operator可以再使用EventTime吗? Yun Tang 于2019年10月31日周四 上午2:26写道: > Hi 瑞斌 > > 如果你要使用Flink的IngestionTime的话,其实就不存在与Flink提供的EventTime混用的情况了,而source端的IngestionTime,拿的就是source端的系统时间,可以在source端后面记一个filter > operator,将ingestion的time与message中的e

Re: Flink State 过期清除 TTL 问题

2019-10-31 文章 Yun Tang
Hi 王磊 从你的配置以及使用Flink-1.7版本看,并没有启用主动清理过期数据的功能 [1],建议对StateTtlConfig 配置 cleanupFullSnapshot,这样你在执行full snapshot(也就是Savepoint)的时候,会把过期的数据不保存在你的Savepoint内容中。如果你不启用主动清理,只有当原先的数据被读取时,才会进行清理,这可能就导致你的Savepoint内容不断增长的原因。 另外,建议升级到Flink-1.8+,对state TTL的特性支持更好,可以参考中文文章 [2] [1] https://ci.apache.org/proje

Re: Checkpoint failed all the time

2019-11-03 文章 Yun Tang
Sure, this feature has been implemented in FLINK-12364 [1], all you need do is set the tolerable checkpoint failure numbers via like env.getCheckpointConfig().setTolerableCheckpointFailureNumber(5); [1] https://issues.apache.org/jira/browse/FLINK-12364 Best Yun Tang From: "

Re: 广播状态是否可以设置ttl过期时间

2019-11-07 文章 Yun Tang
Hi Broadcast State 可以看做一种operator state,只能在DefaultOperatorStateBackend里面创建 [1],TTL state目前仅是对keyed state来说的 [2]。 [1] https://github.com/apache/flink/blob/809533e5b5c686e2d21b64361d22178ccb92ec26/flink-runtime/src/main/java/org/apache/flink/runtime/state/DefaultOperatorStateBackend.java#L149 [2

Re: 回复: 回复: 本地checkpoint 文件190G了

2019-12-01 文章 Yun Tang
Hi 为什么你知道本地checkpoint文件达到190GB了,具体是哪个目录撑到了190GB? 如果没有启用 state.backend.local-recovery: * 使用FsSateBackend/Memory StateBackend, 本地不应该有什么checkpoint文件残留,因为执行checkpoint时,直接写HDFS了 * 使用 RocksDB state backend,无论是否开启incremental checkpoint本地也不应该有任何checkpoint文件残留(因为会被及时清理掉),除非你的DB目录本身就达到了1

Re: Flink State 过期清除 TTL 问题

2019-12-10 文章 Yun Tang
ink run -s ,从已有 savepoint 目录中恢复的数据所有的 updateTime 都变成了当前时间? 谢谢, 王磊 wangl...@geekplus.com.cn Sender: Yun Tang Send Time: 2019-11-01 01:38 Receiver: user-zh@flink.apache.org Subject: Re: Flink State 过期清除 TTL 问题 Hi 王磊 从你的配置以及使用Flink-1.7

Re: flink savepoint checkpoint

2019-12-10 文章 Yun Tang
Hi Checkpoint 是自动的,你可以配置retain checkpoint[1] 然后从checkpoint 恢复[2],可以不需要一定触发Savepoint。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#retained-checkpoints [2] https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-

Re: yarn per job 模式这个报错原因是什么?随机出现

2019-12-20 文章 Yun Tang
Hi 这个异常是因为无法绑定随机端口,在出问题的JM机器上检查一下 netstat,看是不是有大量的连接占用了很多端口。一般这种问题都是因为大量对外连接未关闭导致的,找到是什么类型的进程占用了大量端口。 祝好 唐云 From: rockey...@163.com Sent: Friday, December 20, 2019 15:04 To: user-zh Subject: yarn per job 模式这个报错原因是什么?随机出现 嗨,大家好,flink per job模式下随机出现如下错误?有知道是什么

Re: 回复:如何获取算子处理一条数据记录的时间

2020-01-02 文章 Yun Tang
Hi,张江 Flink官方支持追踪record的latency,你可以参考[1] 启用这个功能,不过这个功能会极大地降低你的处理性能,只能用作debug使用。 如果想知道真实使用场景下的性能指标,可以参考latency的metrics [2] 来衡量operator的处理性能。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#latency-tracking [2] https://ci.apache.org/projects/flink/flink-docs-st

Re: 使用influxdb作为flink metrics reporter

2020-01-03 文章 Yun Tang
Hi 张江 * Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention policy. * kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 [1],在Flink-1.9 版本下可以忽略这些异常。 [1] https://issues.apache.org/jira/browse/FLINK-12147 祝好 唐云 From: 张江 Se

Re: 回复:使用influxdb作为flink metrics reporter

2020-01-05 文章 Yun Tang
.163.com/dashi/dlpro.html?from=mail88> 定制 在2020年01月04日 00:56,Yun Tang<mailto:myas...@live.com> 写道: Hi 张江 * Retention policy 需要现在InfluxDB端创建,InfluxDBReporter不会自行创建不存在的 retention policy. * kafka的一些metrics在使用influxDB reporter的时候,会出现一些cast exception,可以参考 [1],在Flink-1.9 版本下可以忽略这些异常。

Flink Weekly | 每周社区动态更新 - 2020/01/07

2020-01-06 文章 Yun Tang
大家好 2020年转眼就来了,先恭喜大家新年快乐,Flink社区也会在新的一年中继续陪伴大家,一起来把Flink做大做好。 本周社区主要新闻是Flink-1.10.0的发布进展,将blink planner设置为SQL client默认planner的讨论,以及如何支持SQL client gateway的FLIP。 Flink开发进展 == * [**Release**] Yu分享了目前Flink-1.10发布的喜人进展,release-1.10分支刚被拉出来时有46个blocker,11个critical级别的issue,目前已经下降到只有12个bloc

Re: FLINK 不同 StateBackend ProcessWindowFunction的差别

2020-01-07 文章 Yun Tang
Hi 使用iterator.remove() 去除state中已经计算过的数据不是一个标准做法,标准的做法应该是 clear掉相应的state [1] 至于为什么使用MemoryStateBackend会去除数据是因为 get 返回的结果是backend中on heap直接存储的对象[2],存在修改的副作用。 而RocksDB state backend get返回的结果是反序列化的list,而不是RocksDB自身存储的数据 [3],也就不存在修改的副作用了。 [1] https://github.com/apache/flink/blob/b195383b6b792ea1363a

Re: flink算子状态查看

2020-01-08 文章 Yun Tang
Hi 没开启Checkpoint但是想知道状态存储的用量的话,对于FsStateBackend来说没有什么好办法;但是对于RocksDBStateBackend来说可以通过开启RocksDB native metrics [1] 的方式来观察memtable 以及 sst文件的 size,来近似估算整体状态存储数据量。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics 祝好 唐云 ___

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Yun Tang
Hi Kevin State TTL 是清理的state中的数据条目entry,不是清理state在map函数中的对象本身。所以无论如何,作为value state对象的uniqMark 是不会因为TTL而变成null的。 我注意到你的作业即使发生failover之后,立刻恢复的时候,仍然遇到了这个NPE问题,我怀疑你实际运行的代码第39行并不是你贴出来的代码,很有可能是对应你的代码的第34行,也就是map方法的输入RDLog是null,这也符合作业成功restore之后,又再次立即遇到failover的场景,也就是处理到了非法“脏数据”,导致作业不断failover。建议你按照这个思

Re: flink遇到 valueState 自身的 NPE

2020-01-09 文章 Yun Tang
/state/ttl/AbstractTtlDecorator.java#L96 祝好 唐云 From: Kevin Liao Sent: Friday, January 10, 2020 1:08 To: Yun Tang Cc: user-zh@flink.apache.org Subject: Re: flink遇到 valueState 自身的 NPE 谢答,首先贴的代码确实是运行的程序 此外刚刚又通过打印 log 确认了 uniqMark == null 是 false 我现在的怀疑点是这个地方 if

Re: 回复:flink checkpoint不生成

2020-01-13 文章 Yun Tang
Hi , 你好 恭喜解决问题,不过关于社区邮件列表的使用有几点小建议: 1. 如果是全中文的邮件,就不要抄送英文社区邮件列表 (u...@flink.apache.org)了,毕竟社区有很多看不懂中文的开发者。中文邮件列表(user-zh)还是很活跃的,相信大家可以一起帮助解决问题。 2. 因为开源社区邮件列表对附件图片支持不友好,建议使用超链接的方式,有助于更快收到回复和解答。 祝好 唐云 From: 起子 Sent: Monday, January 13, 2020 18:01 To: user-zh

Re: 咨询一下 RocksDB 状态后端的调优经验

2020-01-13 文章 Yun Tang
Hi Dong RocksDB无论如何都是要使用native内存的,您的YARN pmem-check相比JVM heap的buffer空间是多少,是否合适呢? FLINK-7289的基本所需task都已经完成在release-1.10 分支中了,您可以直接使用release-1.10 分支打包,最近也要发布1.10的rc版本,欢迎试用该功能。 如果你的所有checkpoint size是50GB,其实不是很大,但是如果单个state backend有50GB的话,对于Flink这种低延迟流式场景是稍大的,建议降低单并发state数据量。 至于目前的问题,也就是您加了相关参数,但

Re: 关于使用Flink RocksDBStateBackend问题

2020-01-14 文章 Yun Tang
Hi 使用自定义options factory的话,我们会认为是高级用户,自然也就完全交由用户进行配置,至于write buffer size如何配置,可以参考PredefinedOptions [1] 的使用方法。 [1] https://github.com/apache/flink/blob/8f67d1d7e6809d528fe957cb4eb78308d87da324/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/P

Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint error .

2020-01-15 文章 Yun Tang
leCheckpointFailureNumber(tolerableDeclinedCheckpointNumber); Best Yun Tang From: jose farfan Sent: Wednesday, January 15, 2020 23:21 To: ouywl Cc: user ; user-zh@flink.apache.org Subject: Re: When I use flink 1.9.1 and produce data to Kafka 1.1.1, The streamTask checkpoint err

Re: [ANNOUNCE] Dian Fu becomes a Flink committer

2020-01-16 文章 Yun Tang
Congratulations, Dian! Best Yun Tang From: Benchao Li Sent: Thursday, January 16, 2020 22:27 To: Congxian Qiu Cc: d...@flink.apache.org ; Jingsong Li ; jincheng sun ; Shuo Cheng ; Xingbo Huang ; Wei Zhong ; Hequn Cheng ; Leonard Xu ; Jeff Zhang ; user

Re: 从checkpoint恢复任务失败

2020-01-16 文章 Yun Tang
Hi 确定每次恢复的时候没有其他异常么,之前有用户遇到是因为其他异常,触发cancel task的逻辑,导致清理了本地下载的文件,所以在进行硬链的时候会遇到no such file的异常。 祝好 唐云 From: claylin <1012539...@qq.com> Sent: Thursday, January 16, 2020 22:00 To: user-zh Subject: 从checkpoint恢复任务失败 用的版本1.9.1,我这里只要遇到异常,譬如空指针异常,然后从checkpoint恢复,总是恢

Re: 回复: 怎么不使用checkpoint

2020-01-16 文章 Yun Tang
Hi 默认情况下,不调用env.enableCheckpoint,也就是不会启用checkpoint的。默认情况下的restart strategy就是NoRestart,也就是不会自动failover的。 祝好 唐云 获取 Outlook for Android From: sun <1392427...@qq.com> Sent: Friday, January 17, 2020 1:43:53 PM To: user-zh Subject: 回复: 怎么不使用ch

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-18 文章 Yun Tang
Hi 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 因此,加载的checkpoint被赋予了savepoint的property [2]。 这个CheckpointProperties#SAVEPOINT 里面的 discardSubsumed 属性是false,也就是当新的checkpo

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 Yun Tang
,就永久保留。 > 非常感谢 > > > Yun Tang 于2020年1月19日周日 下午2:06写道: > >> Hi >> >> 如果你从chk-94040 进行checkpoint恢复的话,这个checkpoint是不会被删除清理的,这个行为是by >> design的。原因是因为从checkpoint resume在行为上被认为从Savepoint resume行为是一致的,也复用了一套代码 >> [1],Savepoint的生命周期由用户把控,Flink框架自行不会去删除。 &

Re: Flink 增量 Checkpoint ,容错恢复后,随着时间的推移,之前的 Checkpoint 状态没有清理

2020-01-19 文章 Yun Tang
hdfs:xxx/taskowned > 如果有什么理解错误,请指出,非常感谢。 > > 祝好, > 沈磊 > > Yun Tang 于2020年1月19日周日 下午4:11写道: > >> Hi >> >> 目前Flink的checkpoint目录都会有一个job-id 的子目录,所有chk- 和 shared 目录都在该目录下存储。 >> 如果没有开启增量checkpoint::在确保当前作业的checkpoint有最新完成的情况下,直接删除掉其他job-id的子目录即可。 >> 如

Re: Flink 1.6, increment Checkpoint, the shared dir stored the last year checkpoint state

2020-01-19 文章 Yun Tang
removed by checkpoint coordinator but takes too long to complete before job shut down. 3. This file is still useful. This is possible in theory because some specific rocksDB sst file might not be selected during compactions for a long time. Best Yun Tang From

Re: blink(基于flink1.5.1版本)可以使用两个hadoop集群吗?

2020-01-26 文章 Yun Tang
Hi Yong 首先,这封邮件就不要抄送开发者邮件列表了,中文的邮件只需要发中文邮件列表。 Flink当然可以用两个YARN集群,关键在于Flink提交作业到YARN的时候,读取的HADDOP配置是什么,其实官方文档[1] 有相关的介绍,主要是 YARN_CONF_DIR, HADOOP_CONF_DIR or HADOOP_CONF_PATH 这些环境变量的配置是什么,在你提交的终端内配置一个你搭建的集群环境变量即可。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/deployment/yarn_s

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-26 文章 Yun Tang
Hi Yi Can the official doc of writing broad cast state [1] satisfies your request? [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html#broadcast-state-1 Best Yun Tang From: Jin Yi Sent: Thursday, January 23, 2020 8:12

Re: [State Processor API] how to convert savepoint back to broadcast state

2020-01-27 文章 Yun Tang
che/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperator.java#L101 Best Yun Tang From: Jin Yi Sent: Monday, January 27, 2020 14:50 To: Yun Tang Cc: user ; user-zh@flink.apache.org Subject: Re: [State Processor API] how to convert savepoint back to broadcast state

Re: 关于在读和写频率都很高的情况下怎么优化rocksDB

2020-02-26 文章 Yun Tang
Hi 你的单节点rocksDB state size多大呢?(可以通过打开相关metrics [1] 或者登录到RocksDB所在机器观察一下RocksDB目录的size) 造成反压是如何确定一定是rocksDB 状态大导致的呢?看你的IO情况绝对值很大,但是百分比倒不是很高。是否用jstack观察过TM的进程,看一下是不是task主线程很容易打在RocksDB的get等读操作上。 RocksDB本质上还是面向磁盘的kv存储,如果是每次读写都更新的话,block cache发挥的作用会很有限。如果达到磁盘瓶颈,需要考虑提高磁盘性能或者想办法降低单个rocksDB实例的state大小

Re: Question about RocksDBStateBackend Compaction Filter state cleanup

2020-03-17 文章 Yun Tang
compaction_filter.h#L140 Best Yun Tang From: LakeShen Sent: Tuesday, March 17, 2020 15:30 To: dev ; user-zh ; user Subject: Question about RocksDBStateBackend Compaction Filter state cleanup Hi community , I see the flink RocksDBStateBackend state

Re: [Third-party Tool] Flink memory calculator

2020-03-29 文章 Yun Tang
Very interesting and convenient tool, just a quick question: could this tool also handle deployment cluster commands like "-tm" mixed with configuration in `flink-conf.yaml` ? Best Yun Tang From: Yangze Guo Sent: Friday, March 27, 2020 18:00 To: user

Re: Re:Re: flink savepoint问题

2020-03-30 文章 Yun Tang
Hi 首先,如果这个问题很容易复现的话,我们需要定位到是什么导致了OOMkilled。 1. 打开block-cache usage [1] 观察metrics中block cache的使用量。 2. 麻烦回答一下几个问题,有助于进一步定位 * 单个TM有几个slot * 单个TM的managed memory配置了多少 * 一共声明了多少个keyed state,(如果使用了window,也相当于会使用一个state),其中有多少个map state,是否经常遍历那个map state * 被kill的contai

Re: ProcessWindowFunction中如何有效清除state呢

2020-03-31 文章 Yun Tang
Hi 从代码看 if(stateDate.equals("") || stateDate.equals(date)) 无法判断究竟是从哪里获取到stateDate变量赋值,不清楚你这里里面的判断逻辑是否能生效。 其次,state.clear() 之后,再次获取时,返回值会是null,代码片段里面也没有看出来哪里有对这个的校验。 祝好 唐云 From: 守护 <346531...@qq.com> Sent: Tuesday, March 31, 2020 12:33 To: user-zh Subject: Proces

Re: 回复: ProcessWindowFunction中如何有效清除state呢

2020-03-31 文章 Yun Tang
) + c_st),是和这个能有关系吗 -- 原始邮件 ------ 发件人: "Yun Tang"

Re: 关于使用RocksDBStateBackend TTL 配置的问题

2020-04-03 文章 Yun Tang
Hi 只是配置state.backend.rocksdb.ttl.compaction.filter.enabled 还需要相关的state descriptor也配置上state ttl config,不确定这里所谓的“不理想”的效果是没有及时删除,还是彻底没有删除? 目前RocksDB的后台清理确实需要依赖于compaction的执行,换言之,如果有部分数据一直没有进入compaction,确实存在理论上的可能性一直不会因为过期而删除,但是这个可能性很低不应该对你的使用体验带来很大的影响。 现在的这两种策略是更新时间戳的策略,只要不再访问,到时间都会因为TTL而自动后台清除的。

Re: 请问有没有什么方法可以把checkpoint打到集群外的hdfs?

2020-04-16 文章 Yun Tang
Hi 如果旧作业开启了incremental checkpoint,并从那边进行恢复的话,需要注意的是旧的checkpoint目录下的文件是不能删除的,这个是incremental checkpoint语义导致的,如果想要切割掉对旧目录的依赖,需要执行一次savepoint,并启动新作业从savepoint进行恢复。 祝好 唐云 From: zhisheng Sent: Thursday, April 16, 2020 16:37 To: user-zh Subject: Re: 请问有没有什么方法可以把check

Re: flink-1.10 checkpoint 偶尔报 NullPointerException

2020-04-20 文章 Yun Tang
Hi 这个NPE有点奇怪,从executeCheckpointing方法[1]里面其实比较难定位究竟是哪一个变量或者变量的取值是null。 一种排查思路是打开 org.apache.flink.streaming.runtime.tasks 的DEBUG level日志,通过debug日志缩小范围,判断哪个变量是null 这个异常出现的时候,相关task上面的日志有什么异常么,触发这个NPE的条件是什么,稳定复现么? [1] https://github.com/apache/flink/blob/aa4eb8f0c9ce74e6b92c3d9be5dc8e8cb536239d/f

Re: flink checkpoint savepoint问题

2020-04-21 文章 Yun Tang
Hi 原因是因为新增字段或者修改字段类型后,新的serializer无法(反)序列化原先存储的数据,对于这种有字段增改需求的场景,目前Flink社区主要借助于Pojo或者avro来实现 [1],建议对相关的state schema做重新规划,以满足这种有后续升级需求的场景。 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/schema_evolution.html 祝好 唐云 From: xyq Sent: Tu

Re: Flink监控: promethues获取到有的metrics没有包含flink 对应的job_name或者job_id

2020-05-05 文章 Yun Tang
Hi flink_jobmanager_Status 这种metrics属于jobmanager层级的metrics,这种metrics与job level的metrics,从概念上来说是不一样的。因为Flink是支持一个JM里面同时运行多个作业的,但是JM的JVM实际上只有一个,所以如果给JM的metrics增加其从属的job_id 的tag是不符合语义的。当然,如果一个host上有多个JM,现在Flink不太好区分,目前只有TM级别的tm_id可以区分不同的TM。 如果非要加上job_name 或者 job_id 才能识别的话,只能按照你分享的文章中修改reporter的代码。不过

Re: What is the RocksDB local directory in flink checkpointing?

2020-05-08 文章 Yun Tang
/BootstrapTools.java#L478-L489 Best Yun Tang From: Till Rohrmann Sent: Wednesday, May 6, 2020 17:35 To: LakeShen Cc: dev ; user ; user-zh Subject: Re: What is the RocksDB local directory in flink checkpointing? Hi LakeShen, `state.backend.rocksdb.localdir` defin

Re: 对flink源码中watermark对齐逻辑的疑惑

2020-05-11 文章 Yun Tang
Hi 正是因为取各个input channel的最小值,所以如果某一个上游一直没有获取到真实数据,发送下来的watermark一直都是Long.MIN_VALUE,这样会导致无法触发window,社区采用idle source [1]的方式walk around该问题 [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#idling-sources 祝好 唐云 From: Benchao Li Sent: M

Re: checkpoint失败讨论

2020-06-01 文章 Yun Tang
Hi 这个错误“could only be replicated to 0 nodes instead of minReplication (=1)”是HDFS不稳定导致的,无法将数据进行duplicate与Flink本身并无关系。 祝好 唐云 From: yanggang_it_job Sent: Monday, June 1, 2020 15:30 To: user-zh@flink.apache.org Subject: checkpoint失败讨论 最近多个以rocksdb作为状态后端,hdfs作为远程文

Re: 如何做checkpoint的灾备

2020-06-13 文章 Yun Tang
Hi Xingxing 由于作业仍在运行,所以checkpoint目录下的文件是不断新增以及删除的,其实在使用distcp的时候加上 “-i” [1] 来忽略失败的拷贝(例如FileNotFoundException) 文件即可。因为作业的原始checkpoint目录最终一定可以做到正常restore,所以即使部分文件因为在拷贝时被原作业不需要而删除时,只要最终目录结构一致,是可以做到在另外一个HDFS上实现容灾备份的。 [1] https://hadoop.apache.org/docs/current/hadoop-distcp/DistCp.html#Command_Line

Re: 回复:Flink异常及重启容错处理

2020-06-13 文章 Yun Tang
Hi 我想你的问题是数据源中存在之前代码中没有很好处理的corner case,导致在处理某一条“脏数据”时,作业进入FAILED状态。此时即使从之前的checkpoint恢复,由于作业代码逻辑未变,之前的corner case依然无法处理,作业只能无限进去失败状态。 这种场景可以一开始时候将checkpoint的保留策略设置成RETAIN_ON_CANCELLATION [1],这样cancel作业之后,更改业务代码逻辑,从而可以处理之前的问题,再降作业上线从之前的checkpoint恢复 [2],这样做的话,数据是不会丢失的。 [1] https://ci.apache.o

Re: flink 1.10 on yarn 内存超用,被kill

2020-06-18 文章 Yun Tang
Hi 单个Slot的managed memory是多少(可以通过webUI或者TM的日志观察到),rocksDB的 block cache usage会增长到多少,是一直在增长最终超过单个slot的managed memory么? RocksDB的内存托管在绝大部分场景下是work的,但是RocksDB本身的实现限制了这个功能完美发挥作用。具体涉及到LRUcache和Writebuffer manager之间的对应关系,目前RocksDB的strict cache limit和将write buffer manager的内存申请“托管”到cache的功能是不完整的,即使在cache

Re: flink的state过期设置

2020-07-01 文章 Yun Tang
Hi TTL的时间戳实际是会存储在 state 里面 [1],与每个entry在一起,也就是说从Checkpoint恢复的话,数据里面的时间戳是当时插入时候的时间戳。 [1] https://github.com/apache/flink/blob/ba92b3b8b02e099c8aab4b2b23a37dca4558cabd/flink-runtime/src/main/java/org/apache/flink/runtime/state/ttl/TtlValueState.java#L50 祝好 唐云 Fro

Re: rocksdb的block cache usage应该如何使用

2020-07-02 文章 Yun Tang
Hi 默认Flink启用了rocksDB 的managed memory,这里涉及到这个功能的实现原理,简单来说,一个slot里面的所有rocksDB实例底层“托管”内存的LRU block cache均是一个,这样你可以根据taskmanager和subtask_index 作为tag来区分,你会发现在同一个TM里面的某个subtask对应的不同column_family 的block cache的数值均是完全相同的。所以不需要将这个数值进行求和统计。 祝好 唐云 From: SmileSmile Sent:

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
Hi 观察block cache usage的显示数值是否超过你的单个slot的managed memory,计算方法是 managed memory / slot数目,得到一个slot的managed memory,将该数值与block cache usage比较,看内存是否超用。重启之后容易被os kill,使用的是从savepoint恢复数据么? 祝好 唐云 From: SmileSmile Sent: Friday, July 3, 2020 14:20 To: Yun Tang Cc: Flink

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
3, 2020 15:07 To: Yun Tang Cc: user-zh@flink.apache.org Subject: 回复:rocksdb的block cache usage应该如何使用 hi yun tang! 因为网络或者不可抗力导致pod重生,作业会重启,目前作业没有开启checkpoint,恢复等价继续消费最新数据计算,运行一段时间很容易内存超用被os kill,然后重启,再运行一段时间,间隔变短,死的越来越频繁。 从现象上看很像是内存没有释放,这种场景下,上一次作业残留的未到水位线还没有被触发计算的数据是否在作业重启过程中被清除了? <ht

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-03 文章 Yun Tang
-bug.html 祝好 唐云 From: SmileSmile Sent: Friday, July 3, 2020 15:22 To: Yun Tang Cc: user-zh@flink.apache.org Subject: 回复:rocksdb的block cache usage应该如何使用 Hi 作业只配置了重启策略,作业如果fail了,只会重启,没有恢复历史数据。 【作业一旦发生failover,state backend的数据都需要清空然后再启动的时候进行加载。】 我目前遇到的情况是作业fail重

Re: 回复:rocksdb的block cache usage应该如何使用

2020-07-07 文章 Yun Tang
From: SmileSmile Sent: Monday, July 6, 2020 14:15 To: Yun Tang Cc: user-zh@flink.apache.org Subject: 回复:rocksdb的block cache usage应该如何使用 hi yun tang! 我在容器内加入了libjemalloc.so.2并且在配置中加上了 containerized.master.env.LD_PRELOAD: "/opt/jemalloc/lib/libjemalloc

Re: Re:Re: 如何在窗口关闭的时候清除状态

2020-07-08 文章 Yun Tang
Hi TTL需要state descriptor明确声明enableTimeToLive[1],而一旦使用window,window内使用的timer和window state实际上不暴露给用户 的,没法开启TTL,二者在使用方式上存在一定互斥。从语义上来说TTL可以清理过期数据,而默认的window实现都会清理已经trigger过的window内的state,所以二者在语义上其实也是有一定互斥的。 从性能角度考虑,一天的窗口显得有点大了,往往性能不好,如果能把类似逻辑迁移到TTL上实现会对性能更友好。 [1] https://ci.apache.org/projects/fli

Re: flink 1.11 on kubernetes 构建失败

2020-07-08 文章 Yun Tang
Hi 你是不是对 /opt/flink/conf 目录下的文件进行了sed相关写操作?社区文档中使用的方法是将configmap挂载成本地的flink-conf.yaml 等文件,而这个挂载的目录其实是不可写的。 直接修改configmap里面的内容,这样挂载时候就会自动更新了。 祝好 唐云 From: SmileSmile Sent: Wednesday, July 8, 2020 13:03 To: Flink user-zh mailing list Subject: flink 1.11 on kubern

Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。 Flink-1.11 支持将savepoint(但是不支持Checkpoint)进行位置迁移 [1],而对于Flink-1.9,二者均不支持。 [1] https://issues.apache.org/jira/browse/FLINK-5763 祝好 唐云 From: Dream-底限 Sent: Tuesday, July 14, 2020 11:07 To: user-zh@flink.

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-13 文章 Yun Tang
Hi Peihui 你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root cause。 [1] https://github.com/apache/flink/blob/2a3b642b1efb957f3d4f20502c40398786ab1469/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/st

Re: flink1.9状态及作业迁移

2020-07-13 文章 Yun Tang
Sent: Tuesday, July 14, 2020 11:57 To: user-zh@flink.apache.org Subject: Re: flink1.9状态及作业迁移 hi、 请问对于下面的情况,Checkpoint meta中存储的hdfs namespace可以修改吗 》》Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,导致没办法直接迁移。 Yun Tang 于2020年7月14日周二 上午11:54写道: > Checkpoint meta中存储的是完整路径,所以一般会把hdfs的namespace存储起来,

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-15 文章 Yun Tang
Hi Robin 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要还是维护其太耗费精力,但是实际从代码角度来说,在合理使用state schema evolution [2]的前提下,目前跨版本checkpoint恢复基本都是兼容的. 另外 @Peihui 也请麻烦对你的异常描述清晰一些,我的第一次回复已经推测该异常不是root cause,还请在日志中找一下无法恢复的root cause,如果不知道怎么从日志里面找,可以把相关日志分享出来。 [1] https://ci.apac

Re: flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复

2020-07-16 文章 Yun Tang
情况和@chenxyz 类似。 http://apache-flink.147419.n8.nabble.com/rocksdb-Could-not-restore-keyed-state-backend-for-KeyedProcessOperator-td2232.html 换成1.10.1 就可以了 Best wishes. Yun Tang 于2020年7月15日周三 下午4:35写道: > Hi Robin > > 其实你的说法不是很准确,社区是明文保证savepoint的兼容性 > [1],但是并不意味着跨大版本时无法从checkpoint恢复,社区不承诺主要

  1   2   3   >