[ANNOUNCE] Apache Flink 1.17.2 released

2023-11-28 文章 Yun Tang
The Apache Flink community is very happy to announce the release of Apache 
Flink 1.17.2, which is the second bugfix release for the Apache Flink 1.17 
series.

Apache Flink® Is a framework and distributed processing engine for stateful 
computations over unbounded and bounded data streams. Flink has been designed 
to run in all common cluster environments, perform computations at in-memory 
speed and at any scale.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/2023/11/29/apache-flink-1.17.2-release-announcement/

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12353260

We would like to thank all contributors of the Apache Flink community who made 
this release possible!


Feel free to reach out to the release managers (or respond to this thread) with 
feedback on the release process. Our goal is to constantly improve the release 
process. Feedback on what could be improved or things that didn't go so well 
are appreciated.


Regards,
Release Manager


Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache Paimon(incubating)

2023-03-27 文章 Yun Tang
Congratulations!
Unlike other data-lakes, Paimon might be the first one to act as a stream-first 
(not batch-first) data-lake.

Best
Yun Tang

From: Xianxun Ye 
Sent: Tuesday, March 28, 2023 10:52
To: d...@flink.apache.org 
Cc: Yu Li ; user ; user-zh 
; d...@flink.apache.org 
Subject: Re: [ANNOUNCE] Flink Table Store Joins Apache Incubator as Apache 
Paimon(incubating)

Congratulations!

Best regards,

Xianxun

On 03/27/2023 22:51,Samrat 
Deb<mailto:decordea...@gmail.com> wrote:
congratulations

Bests,
Samrat

On Mon, Mar 27, 2023 at 7:19 PM Yanfei Lei  wrote:

Congratulations!

Best Regards,
Yanfei

ramkrishna vasudevan  于2023年3月27日周一 21:46写道:

Congratulations !!!

On Mon, Mar 27, 2023 at 2:54 PM Yu Li  wrote:

Dear Flinkers,


As you may have noticed, we are pleased to announce that Flink Table
Store has joined the Apache Incubator as a separate project called Apache
Paimon(incubating) [1] [2] [3]. The new project still aims at building a
streaming data lake platform for high-speed data ingestion, change data
tracking and efficient real-time analytics, with the vision of supporting a
larger ecosystem and establishing a vibrant and neutral open source
community.


We would like to thank everyone for their great support and efforts for
the Flink Table Store project, and warmly welcome everyone to join the
development and activities of the new project. Apache Flink will continue
to be one of the first-class citizens supported by Paimon, and we believe
that the Flink and Paimon communities will maintain close cooperation.


亲爱的Flinkers,


正如您可能已经注意到的,我们很高兴地宣布,Flink Table Store 已经正式加入 Apache 孵化器独立孵化 [1] [2]
[3]。新项目的名字是 Apache
Paimon(incubating),仍致力于打造一个支持高速数据摄入、流式数据订阅和高效实时分析的新一代流式湖仓平台。此外,新项目将支持更加丰富的生态,并建立一个充满活力和中立的开源社区。


在这里我们要感谢大家对 Flink Table Store 项目的大力支持和投入,并热烈欢迎大家加入新项目的开发和社区活动。Apache
Flink 将继续作为 Paimon 支持的主力计算引擎之一,我们也相信 Flink 和 Paimon 社区将继续保持密切合作。


Best Regards,

Yu (on behalf of the Apache Flink PMC and Apache Paimon PPMC)


致礼,

李钰(谨代表 Apache Flink PMC 和 Apache Paimon PPMC)


[1] https://paimon.apache.org/

[2] https://github.com/apache/incubator-paimon

[3]
https://cwiki.apache.org/confluence/display/INCUBATOR/PaimonProposal



Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

2023-01-30 文章 Yun Tang
Thanks Yuanfei for driving the frocksdb release!

Best
Yun Tang

From: Yuan Mei 
Sent: Tuesday, January 31, 2023 15:09
To: Jing Ge 
Cc: Yanfei Lei ; d...@flink.apache.org 
; user ; user-zh@flink.apache.org 

Subject: Re: [ANNOUNCE] FRocksDB 6.20.3-ververica-2.0 released

Thanks Yanfei for driving the release!

Best
Yuan

On Mon, Jan 30, 2023 at 8:46 PM Jing Ge via user 
mailto:u...@flink.apache.org>> wrote:
Hi Yanfei,

Thanks for your effort. Looking forward to checking it.

Best regards,
Jing

On Mon, Jan 30, 2023 at 1:42 PM Yanfei Lei 
mailto:fredia...@gmail.com>> wrote:
It is very happy to announce the release of FRocksDB 6.20.3-ververica-2.0.

Compiled files for Linux x86, Linux arm, Linux ppc64le, MacOS x86,
MacOS arm, and Windows are included in FRocksDB 6.20.3-ververica-2.0
jar, and the FRocksDB in Flink 1.17 would be updated to
6.20.3-ververica-2.0.

Release highlights:
- [FLINK-30457] Add periodic_compaction_seconds option to RocksJava[1].
- [FLINK-30321] Upgrade ZLIB of FRocksDB to 1.2.13[2].
- Avoid expensive ToString() call when not in debug[3].
- [FLINK-24932] Support build FRocksDB Java on Apple silicon[4].

Maven artifacts for FRocksDB can be found at:
https://mvnrepository.com/artifact/com.ververica/frocksdbjni

We would like to thank all efforts from the Apache Flink community
that made this release possible!

[1] https://issues.apache.org/jira/browse/FLINK-30457
[2] https://issues.apache.org/jira/browse/FLINK-30321
[3] https://github.com/ververica/frocksdb/pull/55
[4] https://issues.apache.org/jira/browse/FLINK-24932

Best regards,
Yanfei
Ververica(Alibaba)


Re: 怀疑源码中的一个方法是never reached code

2022-06-14 文章 Yun Tang
Hi,育锋

我觉得你的分析应该是没问题的。可以创建一个ticket来修复该问题。另外,关于代码实现的具体讨论,建议在dev邮件列表讨论。

祝好
唐云

From: 朱育锋 
Sent: Tuesday, June 14, 2022 19:33
To: user-zh@flink.apache.org 
Subject: 怀疑源码中的一个方法是never reached code

Hello Everyone

在阅读ProcessMemoryUtils类的代码时,我怀疑sanityCheckTotalProcessMemory方法[1]中的主体逻辑永远都不会执行:

1. 
在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中,判断了是否显式配置了TotalProcessMemory[2]
2. 
false分支(也就是没有显式配置TotalProcessMemory)的逻辑中调用了sanityCheckTotalProcessMemory方法,而sanityCheckTotalProcessMemory方法的主体逻辑
只有在显式配置了TotalProcessMemory时[3]才会执行,所以sanityCheckTotalProcessMemory方法的主体逻辑应该永远不会执行

参照TaskExecutorFlinkMemoryUtils类中的sanityCheckTotalFlinkMemory方法(该方法与sanityCheckTotalProcessMemory方法逻辑类似,都是比较衍生的内存大小与显式配置的内存大小是否一致)的调用位置[4][5],
我猜测sanityCheckTotalProcessMemory方法是不是应该放在deriveJvmMetaspaceAndOverheadFromTotalFlinkMemory方法中if分支的后面,而不是在分支里面

也有可能是对这段代码的理解不够,没有揣测到这么写的意图,希望大佬们帮忙确认下

[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L239
 

[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L170
 

[3] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/ProcessMemoryUtils.java#L247
 

[4] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L101
 

[5] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/taskmanager/TaskExecutorFlinkMemoryUtils.java#L192
 


Best regards
YuFeng


Re: 1.13.5版本sql大小64k限制bug

2022-05-25 文章 Yun Tang
Hi

请使用英文在dev社区发送邮件。另外关于使用方面的问题,建议向user-zh 频道发送,已经帮你转发到相关邮件列表了。


祝好
唐云

From: Lose control ./ <286296...@qq.com.INVALID>
Sent: Tuesday, May 24, 2022 9:15
To: dev 
Subject: 1.13.5版本sql大小64k限制bug

请问各位大神,1.13.5版本sql大小64k限制如何修改啊?谢谢


Re: RocksDB 读 cpu 100% 如何调优

2022-03-21 文章 Yun Tang
Hi,

RocksDB 的CPU栈能卡在100%,很有可能是大量解压缩 index/filter block导致的,可以enable partition 
index/filter [1] 看看问题是否解决。
相关内容也可以参考我之前线下做过的分享 [2]


[1] 
https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/#state-backend-rocksdb-memory-partitioned-index-filters

[2] https://developer.aliyun.com/article/784995 《Flink 1.13,State Backend 
优化及生产实践分享》

祝好
唐云


From: Peihui He 
Sent: Friday, March 18, 2022 20:16
To: user-zh@flink.apache.org 
Subject: Re: RocksDB 读 cpu 100% 如何调优

OK,我这边加个metric,先观察下

yue ma  于2022年3月18日周五 12:23写道:

> hi
> 我觉得这里可以注意两地方
> 首先 你可以观察一下这个时候 task 的吞吐量是多少 ,如果 qps 特别高 ,比如作业重最旧的offset 消费,我觉得这个时候 cpu 100%
> 是符合预期的。
> 其次 你可以在代码中加一些内存缓存的逻辑 类似于 mini-batch, 来减少和 state 交互的频率,也许这样能缓解一部分问题。
>
> deng xuezhao  于2022年3月18日周五 11:19写道:
>
> > 退订
> >
> >
> >
> > 在 Peihui He ,2022年3月18日 上午11:18写道:
> >
> > Hi, all
> >
> > 如题,flink 任务使用rocksdb 做为状态后端,任务逻辑大概意思是:
> > 来一条数据先判断该数据的key 是否再mapstat 中, 然后再将该key 写入mapstat中。
> >
> > 产生问题是当数据跑一段时间后,判断是否存在线程cpu总是100%,堆栈如下:
> >
> > "process (6/18)#0" Id=80 RUNNABLE (in native)
> > at org.rocksdb.RocksDB.get(Native Method)
> > at org.rocksdb.RocksDB.get(RocksDB.java:2084)
> > at
> >
> >
> org.apache.flink.contrib.streaming.state.RocksDBMapState.contains(RocksDBMapState.java:173)
> > at
> >
> >
> org.apache.flink.runtime.state.UserFacingMapState.contains(UserFacingMapState.java:72)
> > at
> >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:156)
> > at
> >
> >
> com.huanju.security.soc.internal.hs.bigdata.FileScanToTiDB$$anon$12.processElement(FileScanToTiDB.scala:145)
> > at
> >
> >
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:233)
> > at
> > org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
> > at
> > org.apache.flink.streaming.runtime.io
> >
> .AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
> > at
> > org.apache.flink.streaming.runtime.io
> > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$$Lambda$624/715942770.runDefaultAction(Unknown
> > Source)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> > at
> >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> > at
> >
> org.apache.flink.runtime.taskmanager.Task$$Lambda$773/520411616.run(Unknown
> > Source)
> > at
> >
> >
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> > at
> > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> > at java.lang.Thread.run(Thread.java:748)
> >
> > 但是看checkpoint数据,才100m左右
> >
> > 请问大家 rocksdb 是出现什么性能瓶颈了呢? 改怎么调优呢?
> >
>


Re: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

2022-03-08 文章 Yun Tang
Hi

一般是卡在最后一步从JM写checkpoint meta上面了,建议使用jstack等工具检查一下JM的cpu栈,看问题出在哪里。


祝好
唐云

From: Sun.Zhu <17626017...@163.com>
Sent: Tuesday, March 8, 2022 14:12
To: user-zh@flink.apache.org 
Subject: Re:flink s3 checkpoint 一直IN_PROGRESS(100%)直到失败

图挂了

https://postimg.cc/Z9XdxwSk













在 2022-03-08 14:05:39,"Sun.Zhu" <17626017...@163.com> 写道:

hi all,
flink 1.13.2,将checkpoint 写到S3但是一直成功不了,一直显示IN_PROGRESS,直到超时失败,有大佬遇到过吗?








Re: 状态初始化

2022-02-27 文章 Yun Tang
Hi,

这个需求在社区里面称之为 state bootstrapping, 以前在state processor API没有引入时,还有第三方的工具 bravo 
[1]。
我理解你的需求完全可以有state processor API完成,生成一个savepoint,由新作业消费。目前社区也在考虑支持生成native 
savepoint,用以加快生成速度 [2]


[1] https://github.com/king/bravo
[2] https://issues.apache.org/jira/browse/FLINK-25528


Best
Yun Tang


From: Jiangang Liu 
Sent: Thursday, February 24, 2022 10:00
To: user-zh 
Subject: Re: 状态初始化

作业在启动时可以使用 Processor API加载状态,可以参考
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/libs/state_processor_api/

huangzhi...@iwgame.com  于2022年2月23日周三 20:28写道:

>
> flink是否能够做到程序第一次启动还没有checkpoint的情况下,对状态进行初始化?
>
>
> huangzhi...@iwgame.com
>


Re: Re: flink sql支持细粒度的状态配置

2021-12-09 文章 Yun Tang
Hi,

如果你们可以自己实现一套SQL语句到jobgraph的预编译转换IDE,然后在IDE中可以手动配置jobgraph每个算子的配置,应该是可以达到你们的目的 
(可能还需要结合细粒度调度模式)。

祝好
唐云

From: gygz...@163.com 
Sent: Thursday, December 9, 2021 16:14
To: user-zh 
Subject: 回复: Re: flink sql支持细粒度的状态配置

Hi Yun Tang

感谢你的回复,我们在调研的过程中也发现,正如你所说的生成的plan可能差异很大

但是每个operator的TTL生效时间是在execNode转换成对应的Transformation时,通过传入的StreamPlanner带进去的,TableConfig属性中包含了全局的TTL时间

在每个ExecNode转换的过程translateToPlanInternal((PlannerBase) 
planner)中使用这个TTL时间生成对应的operator

所以我们在考虑是否可以在,每个Transformation生成阶段,先去修改一下TableConfig中TTL的配置再调用每个execNode转换成operator的方法,来做到Transformation级别的TTL控制,这个配置开放给平台的用户,通过Transformation的id做识别,是否能给一些建议




gygz...@163.com

发件人: Yun Tang
发送时间: 2021-12-09 10:57
收件人: user-zh
主题: Re: flink sql支持细粒度的状态配置
Hi 你好,

我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。


祝好
唐云


From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置

Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?



gygz...@163.com


Re: flink sql支持细粒度的状态配置

2021-12-08 文章 Yun Tang
Hi 你好,

我认为这是一个很好的需求,对于data stream以及python API来说,state 
TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy
 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。


祝好
唐云


From: gygz...@163.com 
Sent: Tuesday, December 7, 2021 18:38
To: user-zh 
Subject: flink sql支持细粒度的状态配置

Hi all

在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效

如果我存在一个如下sql

select count(1),region from (select * from A join B on a.uid = b.uid)  group by 
region

如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零

如果不配置,又会导致Regular join的状态增大

这是其中一个场景,这里只是举一个例子

主要是想询问针对 Sql中需要配置局部State的ttl时间,或者同一个任务每个sql配置不同的TTL时间,这种场景应该如何去做 ?



gygz...@163.com


Re: 检查点和保存点

2021-11-12 文章 Yun Tang
Hi

checkpoint 以及 savepoint是否可以生效取决于相关source的实现,Kafka这种是支持replay非常好的source,至于file 
reader,目前 split file reader [1] 相关的实现是支持 容错的

[1] 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/sources/#the-split-reader-api


祝好
唐云

From: lei-tian 
Sent: Friday, November 12, 2021 16:13
To: user-zh 
Subject: 检查点和保存点

您好:

flink的如果读hbase或者读文件,可以设置检查点和保存点么,我设置的手动保存点停止显示成功,但是去hdfs上看的savepoint的文件大小为0.从保存点启动的话文件也会从头开始执行而不是接着上次的处理进度进行,只有kafka才能设置保存点和检查点么。



| |
lei-tian
|
|
totorobabyf...@163.com
|
签名由网易邮箱大师定制


Re: MongoDB sink

2021-11-10 文章 Yun Tang
Hi,

具体问题建议直接在相关ticket上进行讨论,邮件列表上可能相关人士没有注意到。


祝好
唐云

From: 不许人间见白头 
Sent: Wednesday, November 10, 2021 22:28
To: user-zh 
Subject: MongoDB sink

你好,



请问一下,关于New feature: FLINK-24477 预计什么时候创建PR呢?


Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

2021-10-29 文章 Yun Tang
Hi

可以使用jstack,async profiler [1] 
等工具勘察一下checkpoint期间的CPU栈。oss需要先写本地再上传,确实可能CPU消耗多一些,但是明显高很多有一些超出预期。


[1] https://github.com/jvm-profiling-tools/async-profiler

祝好
唐云

From: Lei Wang 
Sent: Tuesday, October 19, 2021 14:01
To: user-zh@flink.apache.org 
Subject: Re: flink 以阿里云 oss 作为 checkpoint cpu 过高

确实是跟 OSS 有关,我换成 HDFS 作为 checkpoint 后端就没有这种现象了,但我也不明白为什么会这样。
程序中设置了增量 checkpoit,但 flink web UI 中显示的 checkpoint data size 一直不断变高,三天就到了 1G


On Mon, Oct 18, 2021 at 10:44 AM Michael Ran  wrote:

> 应该和OSS没关系吧,毕竟只是个存储。
> 我们CPU 你先看看消耗在哪个线程或者方法类呗
>
>
>
> 在 2021-10-08 16:34:47,"Lei Wang"  写道:
>
>
>
> flink 程序以 RocksDB 作为 stateBackend,  aliyun OSS 作为 checkpoint 数据最终的物理位置。
> 我们的监控发现节点 cpu 间隔性地变高,这个间隔时间恰好就是程序的 checkpoint 时间间隔。
>
>
>
>
>
>
> 这个可能的原因是什么?会跟 OSS 有关吗?
>
>
> 谢谢,
> 王磊


Re: 关于作业失败从checkpoint重启,触发了过期的窗口计算

2021-10-29 文章 Yun Tang
Hi,

先问个版本问题,你的Flink版本是1.3 而不是1.13?

Checkpoint里面会存储timer,所以重启之后会触发窗口的计算,但确实这种一天的窗口累计有点太多了,除非你的作业存在比较严重的反压,导致checkpoint内积攒了大量没有触发的timer。

祝好
唐云


From: claylin <1012539...@qq.com.INVALID>
Sent: Friday, October 29, 2021 11:33
To: user-zh 
Subject: 关于作业失败从checkpoint重启,触发了过期的窗口计算

作业失败后从checkpoint重启,重启后会触发之前已经过期的时间窗口计算,求问这个该怎么解决。我的运行环境是flink1.3/1.4+sql举例:作业每小时做一次checkpoint,状态设置了1小时过期,状态后端使用rocksdb,同时使用了一天的滚动时间窗口,然后今天15点30分重启,但是重启后会有昨天的窗口计算结果触发。
按理说不应该会触发已经过期的窗口计算,而且flink 1.3/1.4 
下state.backend.rocksdb.timer-service.factory 这个配置默认是rocksdb, 
也就是说rocksdb里面存储了状态的有效时间,
不管怎么样也不应该触发已经过期的窗口计算,
请问大家有没有遇到过这种问题,怎么解决。


Re: Re:re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-23 文章 Yun Tang
Hi

用github账号登陆之后,可以使用添加package的方式[1]自行上传共享。

[1] https://flink-packages.org/new-package

祝好
唐云

From: casel.chen 
Sent: Thursday, September 23, 2021 12:40
To: user-zh@flink.apache.org 
Cc: myasuka 
Subject: Re:re:Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

>我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区

问一下自己开发的连接器要怎么添加到 https://flink-packages.org/ 网站给大家搜索到?这位朋友能够将你们的连接器贡献上去呢?






在 2021-09-23 09:32:39,"2572805166" <2572805...@qq.com.INVALID> 写道:
>我司基于最新提供流批一体的接口,实现了mongodb的连接器,支持source和sink,实现了控制批量插入频率、控制缓存批的数据量和mongo文档对象和java对象转换,同时还可选择批量更新,并且使用的mongo最新异步驱动,后期我还会不断优化性能,看大佬能否推动一下,把这个连接器贡献给社区
>
>
>-- 原始邮件 ------
>发件人: "Yun Tang";
>发件时间: 2021-09-22 10:55
>收件人: "user-zh@flink.apache.org";
>主题: Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?
>
>
>
>Hi,
>
>其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。
>你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。
>
>
>[1] https://flink-packages.org/
>
>
>祝好
>唐云
>
>
>From: 黑色
>Sent: Saturday, September 18, 2021 17:17
>To: user-zh@flink.apache.org
>Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器?
>
>这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的
>
>
>
>
>--原始邮件--
>发件人:   
> "user-zh" 
>   
>发送时间:2021年9月17日(星期五) 下午4:39
>收件人:"user-zh@flink.apache.org"
>主题:Flink SQL官方何时能支持redis和mongodb连接器?
>
>
>
>redis和mongodb经常在工作中用到,但Flink官方一直没有提供这两个标准连接器,想问一下什么时候能正式release方便大家使用呢?
>ps: behair库已经很久没更新了,对应的flink版本太低。







Re: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

2021-09-21 文章 Yun Tang
Hi,

其实目前Flink社区并不是那么欢迎新增官方支持的connector,主要原因就是社区的开发人员有限,没有精力维护太多的connector,尤其是一些connector的实现需要一定的相关背景,但很难保证review代码的开发人员具有相关背景,毕竟大家都需要为自己approve的代码负责。
你可以在 flink-packages [1] 里面找一下,或者考虑自己实现并维护(基础实现应该是复杂度不高的)。


[1] https://flink-packages.org/


祝好
唐云


From: 黑色 
Sent: Saturday, September 18, 2021 17:17
To: user-zh@flink.apache.org 
Subject: 回复:Flink SQL官方何时能支持redis和mongodb连接器?

这个可以自已定义一个,参考源码写一个,自己写出来的才是自己的,直接用别人的还是别人的




--原始邮件--
发件人:
"user-zh"   
 


Re: flink oss ha

2021-08-30 文章 Yun Tang
Hi

这个看上去更像是oss配置的问题,你能使用目前配置的 oss.endpoint,accessKeyId以及accessKeySecret 
结合ossutil等工具访问对应的 oss://bucket-logcenter/flink-state/flink-session-recovery 么?

祝好
唐云

From: dker eandei 
Sent: Monday, August 30, 2021 12:36
To: user-zh@flink.apache.org 
Subject: 回复: flink oss ha

您好:
 附件是使用oss作高可用时的报错,以下是启动flink时的脚本:

../bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=flink-session-1 \
-Dkubernetes.container.image=test/flink:1.13.2-scala_2.12-oss \
-Dkubernetes.container.image.pull-policy=Always \
-Dkubernetes.namespace=flink-session \
-Dkubernetes.service-account=flink-session-sa \
-Dkubernetes.rest-service.exposed.type=ClusterIP \
-Dtaskmanager.numberOfTaskSlots=6 \
-Djobmanager.memory.process.size=1024m \
-Dtaskmanager.memory.process.size=2048m \
-Dkubernetes.jobmanager.cpu=1 \
-Dkubernetes.taskmanager.cpu=2 \
-Dfs.oss.endpoint="http://oss-.local; \
-Dfs.oss.accessKeyId="j0BAJ" \
-Dfs.oss.accessKeySecret="7mzTPiC4w" \

-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
 \

-Dhigh-availability.storageDir=oss://bucket-logcenter/flink-state/flink-session-recovery
 \

-Dcontainerized.master.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar
 \

-Dcontainerized.taskmanager.env.ENABLE_BUILT_IN_PLUGINS=flink-oss-fs-hadoop-1.13.2.jar

-----邮件原件-
发件人: Yun Tang 
发送时间: 2021年8月30日 11:36
收件人: user-zh@flink.apache.org
主题: Re: flink oss ha

Hi,
你好,图片无法加载,可以直接粘贴文字出来

祝好
唐云

From: dker eandei 
Sent: Friday, August 27, 2021 14:58
To: user-zh@flink.apache.org 
Subject: flink oss ha


您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]



从 Windows 
版邮件<https://apac01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgo.microsoft.com%2Ffwlink%2F%3FLinkId%3D550986data=04%7C01%7C%7Cd552b12a5a1f4a92aaee08d96b674cd0%7C84df9e7fe9f640afb435%7C1%7C0%7C637658913686219405%7CUnknown%7CTWFpbGZsb3d8eyJWIjoiMC4wLjAwMDAiLCJQIjoiV2luMzIiLCJBTiI6Ik1haWwiLCJXVCI6Mn0%3D%7C1000sdata=9Z9mxYxfkKqWfTCHYPThD3I97KFAFRMKINYExBuge80%3Dreserved=0>发送




Re: flink oss ha

2021-08-29 文章 Yun Tang
Hi,
你好,图片无法加载,可以直接粘贴文字出来

祝好
唐云

From: dker eandei 
Sent: Friday, August 27, 2021 14:58
To: user-zh@flink.apache.org 
Subject: flink oss ha


您好:

看文档OSS可以用作 FsStatebackend,那么Flink on k8s 
做高可用时,high-availability.storageDir可以配置成oss吗,我试了下,报以下错误:

[cid:image002.png@01D79B53.F4C71E80]



从 Windows 版邮件发送




Re: table.exec.state.ttl

2021-08-29 文章 Yun Tang
Hi 航飞

可以参照[1] 看是不是类似的问题


[1] https://issues.apache.org/jira/browse/FLINK-23721

祝好
唐云

From: 李航飞 
Sent: Thursday, August 26, 2021 19:02
To: user-zh 
Subject: table.exec.state.ttl

Configuration conf = new Configuration();
conf.setString("table.exec.mini-batch.enabled","true");
conf.setString("table.exec.mini-batch.allow-latency","15s");
conf.setString("table.exec.mini-batch.size","50");
conf.setString("table.exec.state.ttl","15 s");StreamExecutionEnvironment 
execEnv = StreamExecutionEnvironment.getExecutionEnvironment(); 
execEnv.configure(conf,this.getClass().getClassLoader()); EnvironmentSetting 
setting = ... StreamTableEnvironment.create(execEnv,setting); 基于flink1.13.2
微批配置信息如上,kafka流数据,我感觉我这个设置根本没有产生效果
上次那个 allow 也就算了,这次这个 table.exec.state.ttl 
设置了15秒,我等了1分钟,再次输入数据,计算结果还是累加了,没有重0开始计算。
程序是通过StatementSet .execute()执行的


[ANNOUNCE] Apache Flink 1.13.2 released

2021-08-05 文章 Yun Tang

The Apache Flink community is very happy to announce the release of Apache 
Flink 1.13.2, which is the second bugfix release for the Apache Flink 1.13 
series.

Apache Flink® is an open-source stream processing framework for distributed, 
high-performing, always-available, and accurate data streaming applications.

The release is available for download at:
https://flink.apache.org/downloads.html

Please check out the release blog post for an overview of the improvements for 
this bugfix release:
https://flink.apache.org/news/2021/08/06/release-1.13.2.html

The full release notes are available in Jira:
https://issues.apache.org/jira/secure/ReleaseNote.jspa?version=12350218==12315522

We would like to thank all contributors of the Apache Flink community who made 
this release possible!

Regards,
Yun Tang


Re: 1.14啥时候出呀

2021-08-05 文章 Yun Tang
Hi

Flink-1.13.2 的jar包正在同步到给个maven仓库,顺利的话,明天就可以正式announce了。


祝好
唐云

From: Jingsong Li 
Sent: Wednesday, August 4, 2021 16:56
To: user-zh 
Subject: Re: 1.14啥时候出呀

1.14还有1-2个月
1.13.2马上就出了,估计明天或后天或周一

On Wed, Aug 4, 2021 at 4:48 PM yidan zhao  wrote:

> 如题,1.14或1.13.2啥时候出呀,有人知道吗。
>


--
Best, Jingsong Lee


Re: flink大窗口性能问题

2021-07-16 文章 Yun Tang
目前Flink社区版RocksDB尚不支持ARM架构机器。使用RocksDB的话,内存均是堆外管理,与JVM的堆上内存无关。

另外,有个题外话,你们是云上产品还是自建了ARM集群?有点好奇目前国内的ARM集群使用率情况。

祝好
唐云

From: Wanghui (HiCampus) 
Sent: Thursday, July 15, 2021 11:33
To: user-zh@flink.apache.org 
Subject: Re: flink大窗口性能问题

我在aarch64 + jre 8的环境下,使用rocksdb state backend时,碰到如下错误:

另外,使用rocksdb可以解决大窗口的oom问题吗,原理是什么?



Caused by: java.lang.Exception: Exception while creating 
StreamOperatorStateContext.

 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:222)

 at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:248)

 at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:290)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)

 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)

 at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)

 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)

 at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for KeyedProcessOperator_ae33e81d863e4093619373d1e1f77012_(1/1) from 
any of the 1 provided restore options.

 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)

 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:335)

 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:148)

 ... 9 more

Caused by: java.io.IOException: Could not load the native RocksDB library

 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:948)

 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:489)

 at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:319)

 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)

 at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)

 ... 11 more

Caused by: java.lang.UnsatisfiedLinkError: 
/tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so: 
/tmp/rocksdb-lib-bd8659305e92a27fac27481baf57897b/librocksdbjni-linux64.so: 
cannot open shared object file: No such file or directory (Possible cause: 
can't load AMD 64-bit .so on a AARCH64-bit platform)

 at java.lang.ClassLoader$NativeLibrary.load(Native Method)

 at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1934)

 at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1817)

 at java.lang.Runtime.load0(Runtime.java:810)

 at java.lang.System.load(System.java:1088)

 at 
org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)

 at 
org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)

 at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.ensureRocksDBIsLoaded(RocksDBStateBackend.java:923)

 ... 15 more



On 2021/07/15 02:47:23, Jingsong Li mailto:j...@gmail.com>> 
wrote:

> 没用rocksdb吗?>

>

>

>

> On Thu, Jul 15, 2021 at 10:46 AM Michael Ran 
> mailto:gr...@163.com>> wrote:>

>

>

>

> > 要么内存增大,或者并行增大,要么窗口改小,同时保留数据时间减少>

>

> > 在 2021-07-15 10:23:25,"Hui Wang" 
> > <46...@qq.com.INVALID> 写道:>

>

> > >flink大窗口缓存数据量过大导致jvm频烦full gc,并且处理速度极低,最终OOM,该如何调优>

>

> >>

>

>

>

>

>

> -- >

>

> Best, Jingsong Lee>

>

>


Re: flink 触发保存点失败

2021-07-13 文章 Yun Tang
Hi,

这个看上去是client触发savepoint失败,而不是savepoint本身end-to-end执行超时。建议对照一下JobManager的日志,观察在触发的时刻,JM日志里是否有触发savepoint的相关日志,也可以在flink
 web UI上观察相应的savepoint是否出现在checkpoint tab的历史里面。

祝好
唐云

From: 仙剑……情动人间 <1510603...@qq.com.INVALID>
Sent: Tuesday, July 13, 2021 17:31
To: flink邮件列表 
Subject: flink 触发保存点失败

Hi All,


  我触发Flink 
保存点总是失败,报错如下,一直说是超时,但是没有进一步的信息可以查看,我查资料说可以设置checkpoint超时时间,我设置了2min,但是触发
保存点时在2min之前就会报错,另外我的 状态 并不大
 



The program finished with the following exception:


org.apache.flink.util.FlinkException: Triggering a savepoint for the job 
 failed.
at 
org.apache.flink.client.cli.CliFrontend.triggerSavepoint(CliFrontend.java:777)
at 
org.apache.flink.client.cli.CliFrontend.lambda$savepoint$9(CliFrontend.java:754)
at 
org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:1002)
at 
org.apache.flink.client.cli.CliFrontend.savepoint(CliFrontend.java:751)
at 
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1072)
at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)
Caused by: java.util.concurrent.TimeoutException
at 
org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:1255)
at 
org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:217)
at 
org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$15(FutureUtils.java:582)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Re: local运行模式下不会生成checkpoint吗?

2021-07-09 文章 Yun Tang
Hi

只要enable了checkpoint,一定会生成checkpoint的,这与你的运行模式无关。可以检查一下日志,看看JM端是否正常触发了checkpoint

祝好
唐云

From: casel.chen 
Sent: Tuesday, June 29, 2021 9:55
To: user-zh@flink.apache.org 
Subject: local运行模式下不会生成checkpoint吗?

我在本地使用local运行模式运行flink sql,将数据从kafka写到mongodb,mongodb 
connector是自己开发的,实现了CheckpointedFunction接口,debug的时候发现数据进来的时候有调用invoke方法,但没有调用initialState和snapshotState方法,我有设置enableCheckpoint,同样的程序使用kubernetes部署发现是会调用snapshotState方法。我的问题是:local运行模式下不会生成checkpoint吗?


Re: Flink 1.10 内存问题

2021-07-06 文章 Yun Tang
Hi,

有可能的,如果下游发生了死锁,无法消费任何数据的话,整个作业就假死了。要定位root cause还是需要查一下下游的task究竟怎么了

祝好
唐云

From: Ada Luna 
Sent: Tuesday, July 6, 2021 12:04
To: user-zh@flink.apache.org 
Subject: Re: Flink 1.10 内存问题

反压会导致整个Flink任务假死吗?一条Kafka数据都不消费了。持续几天,不重启不恢复的

Yun Tang  于2021年7月6日周二 上午11:12写道:
>
> Hi,
>
> LocalBufferPool.requestMemorySegment 
> 这个方法并不是在申请内存,而是因为作业存在反压,因为下游没有及时消费,相关buffer被占用,所以上游会卡在requestMemorySegment上面。
>
> 想要解决还是查一下为什么下游会反压。
>
>
> 祝好
> 唐云
> 
> From: Ada Luna 
> Sent: Tuesday, July 6, 2021 10:43
> To: user-zh@flink.apache.org 
> Subject: Re: Flink 1.10 内存问题
>
> "Source: test_records (2/3)" #78 prio=5 os_prio=0
> tid=0x7fd4c4a24800 nid=0x21bf in Object.wait()
> [0x7fd4d581a000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
> - locked <0x00074d8b0df0> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
> at 
> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
> - locked <0x00074cbd3be0> (a java.lang.Object)
> at 
> org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
> at 
> org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
> - locked <0x00074cbd3be0> (a java.lang.Object)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
> at 
> org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
> at 
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
> at 
> com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:66)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
> at 
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
> at 
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
>
> Ada Luna  于2021年7月6日周二 上午10:13写道:
> >
> > 下面报错调大TaskManager内存即可解决,但是我不知道为什么Flink内存不够大会出现如下假死情况。申请内存卡住。整个任务状态为RUNNING但是不再消费数据。
> >
> >
> >
> > "Map -> to: Tuple2 -> Map -> (from: (id, sid, item, val, unit, dt,
> > after_index, tablename, PROCTIME) -> where: (AND(=(tablename,
> > CONCAT(_UTF-16LE't_real', currtime2(dt, _UTF-16LE'MMdd'))),
> > OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, _UTF-16LE'7296'),
> > =(after_index, _UTF-16LE'2.10'))), LIKE(item, _UTF-16LE'??5%'),
> > =(MOD(getminutes(dt), 5), 0))), select: (id AS ID, sid AS STID, item
> > AS ITEM, val AS VAL, unit AS UNIT, dt AS DATATIME), from: (id, sid,
> > item, val, unit, dt, after_index, tablename, PROCTIME) -> where:
> > (AND(=(tablename, CONCAT(_UTF-16LE't_real', currtime2(dt,
> > _UTF-16LE'MMdd'))), OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid,
> > _UTF-16LE'7296'), =(after_index, _UTF-16LE'2.10'))), LIKE(item,
> > _UTF-16LE'??5%'), =(MOD(getminutes(dt), 5), 0))), select: (sid AS
> > STID, val AS VAL, dt AS DATATIME)) (1/1)" #7

Re: Flink 1.10 内存问题

2021-07-05 文章 Yun Tang
Hi,

LocalBufferPool.requestMemorySegment 
这个方法并不是在申请内存,而是因为作业存在反压,因为下游没有及时消费,相关buffer被占用,所以上游会卡在requestMemorySegment上面。

想要解决还是查一下为什么下游会反压。


祝好
唐云

From: Ada Luna 
Sent: Tuesday, July 6, 2021 10:43
To: user-zh@flink.apache.org 
Subject: Re: Flink 1.10 内存问题

"Source: test_records (2/3)" #78 prio=5 os_prio=0
tid=0x7fd4c4a24800 nid=0x21bf in Object.wait()
[0x7fd4d581a000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait(Native Method)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
- locked <0x00074d8b0df0> (a java.util.ArrayDeque)
at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:128)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:107)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:89)
at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
- locked <0x00074cbd3be0> (a java.lang.Object)
at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
- locked <0x00074cbd3be0> (a java.lang.Object)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
at 
com.dtstack.flink.sql.source.kafka.KafkaConsumer011.run(KafkaConsumer011.java:66)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)

Ada Luna  于2021年7月6日周二 上午10:13写道:
>
> 下面报错调大TaskManager内存即可解决,但是我不知道为什么Flink内存不够大会出现如下假死情况。申请内存卡住。整个任务状态为RUNNING但是不再消费数据。
>
>
>
> "Map -> to: Tuple2 -> Map -> (from: (id, sid, item, val, unit, dt,
> after_index, tablename, PROCTIME) -> where: (AND(=(tablename,
> CONCAT(_UTF-16LE't_real', currtime2(dt, _UTF-16LE'MMdd'))),
> OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid, _UTF-16LE'7296'),
> =(after_index, _UTF-16LE'2.10'))), LIKE(item, _UTF-16LE'??5%'),
> =(MOD(getminutes(dt), 5), 0))), select: (id AS ID, sid AS STID, item
> AS ITEM, val AS VAL, unit AS UNIT, dt AS DATATIME), from: (id, sid,
> item, val, unit, dt, after_index, tablename, PROCTIME) -> where:
> (AND(=(tablename, CONCAT(_UTF-16LE't_real', currtime2(dt,
> _UTF-16LE'MMdd'))), OR(=(after_index, _UTF-16LE'2.6'), AND(=(sid,
> _UTF-16LE'7296'), =(after_index, _UTF-16LE'2.10'))), LIKE(item,
> _UTF-16LE'??5%'), =(MOD(getminutes(dt), 5), 0))), select: (sid AS
> STID, val AS VAL, dt AS DATATIME)) (1/1)" #79 prio=5 os_prio=0
> tid=0x7fd4c4a94000 nid=0x21c0 in Object.wait()
> [0x7fd4d5719000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:251)
> - locked <0x00074e6c8b98> (a java.util.ArrayDeque)
> at 
> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:218)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:264)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.copyFromSerializerToTargetChannel(RecordWriter.java:192)
> at 
> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:162)
> at 
> 

Re: 中文教程更新不及时问题

2021-06-22 文章 Yun Tang
Hi Kevin,

欢迎来到Apache Flink开源社区!

因为开源社区的工作,一些参与者很多时候都是工作时间之外参与的,可能难免遇到进度更新不及时,或者长时间不再活跃的问题。

非常欢迎您在相关JIRA 
ticket下面评论和申请权限创建PR,社区一直都欢迎每一位贡献者,对于文档的维护尤其是中文文档的翻译也是非常需要的,如果有任何想要贡献的部分,欢迎直接去JIRA
 ticket下面、github PR下面评论,或者直接创建相关ticket。

祝好
唐云

From: pang fan 
Sent: Monday, June 21, 2021 21:35
To: user-zh@flink.apache.org 
Subject: 中文教程更新不及时问题

大家好,

我是Flink的初学者,在跟着
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/try-flink/table_api/
官方教程(中文版)学习时发现很多中文教程都没有翻译掉,但是去看PR记录又发现很多都已经提了PR但是一直没有合并到主分支,里面很多PR都是几个月前的提的,后来好久都没有更新。

请问现在还有人在跟这些问题吗?如果有,可以更新下JIRA上的工单状态和代码PR状态,这样有需要我们也能申领工单给社区做一些贡献。


谢谢!
Kevin Fan


Re: rocksdb对比filestatebackend

2021-06-22 文章 Yun Tang
Hi Yidan,


  1.  是否我从FileStateBackend切换到RocksDB其实性能也不会降低很多呢?
  2.  这个涉及到RocksDB这种LSM架构的DB读写路径了,即使逻辑数据量可以全部存储在内存中,由于RocksDB的write 
buffer默认会存储相同key的不同value,而且checkpoint时候仍然会触发flush,很难避免数据落盘,数据落盘之后的读路径肯定没有Flink的内存state
 backend性能好,二者性能还是有些差异的,不过实际生产中可能不需要 FsStateBackend 那么高的性能。

  1.  RocksDB本地的checkpoint其实是全量的,只是上传远程存储的时候是增量的,ckpt-3有可能会依赖ckpt-2中的部分文件。

祝好
唐云

From: yidan zhao 
Sent: Tuesday, June 22, 2021 11:55
To: user-zh 
Subject: rocksdb对比filestatebackend

如题,我生产中目前一直都是使用的FileStateBackend,然后使用一个对象存储服务作为后端。

按照我的理解,这种方式下,状态的操作性能很高,都是在内存内部,只有检查点时候才会输出到对象存储中。但是,不支持增量检查点。

RocksDB支持增量检查点,但是缺点是每个状态的操作都是需要序列化/反序列化,至于是文件还是内存操作可能还和rocksdb的块大小,多久刷新等有关。
不过我现在在想,既然我的任务的状态当前使用内存存储,也就是内存存储是能够容纳我的全状态的。
那么是否我从FileStateBackend切换到RocksDB其实性能也不会降低很多呢? 就是牺牲很少性能,换来增量检查点。

此外,还有个点。RocksDB的增量检查点在每次检查点时候,输出到对象存储的部分也是增量?还是全量。
只是rocksdb自身使用增量状态,然后检查点存储的时候是全量到对象存储吗?
还是说,对象存储中存储的ckpt-1,ckpt-2,ckpt-3等也是增量的,即ckpt-3可能依赖ckpt-2等这样。


Re: Flink state evolution with avro

2021-06-17 文章 Yun Tang
Hi,

你可以参照社区的 state-evolution的 E2E 测试代码 [1], 整个程序就是使用的avro作为相关类的声明工具。


[1] 
https://github.com/apache/flink/tree/master/flink-end-to-end-tests/flink-state-evolution-test/src/main

祝好
唐云

From: casel.chen 
Sent: Friday, June 11, 2021 8:13
To: user-zh@flink.apache.org 
Subject: Flink state evolution with avro

Is there any live code example about flink state evolution with avro? Thanks!


Re: 关于flink sql的kafka source的开始消费offset相关问题。

2021-06-06 文章 Yun Tang
hi,

本质上来说,你的做法有点hack其实不推荐,如果非要这么做的话,你还可以通过 numRestarts [1] 的指标来看重启了多少次。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/metrics/#availability

祝好
唐云

From: yidan zhao 
Sent: Friday, June 4, 2021 11:52
To: user-zh 
Subject: Re: 关于flink sql的kafka source的开始消费offset相关问题。

本质需求是我一个转发任务,本身检查点失败以及任务失败一般都是压力高,所以我希望重启是忽略堆积的数据从最新数据开始消费。我希望任务失败了就自动重启从最新开始继续转发。

yidan zhao  于2021年6月4日周五 上午11:51写道:
>
> 那就挺难受的,我之前还想过一个办法是,禁止sql 任务的检查点功能就可以了。但是呢,flink禁止检查点后web
> UI就无法显示重启次数了,任务的重启的监控现在都只能通过开启检查点才能反映出来(ckpt失败数量、ckpt restored数量)。
>
> JasonLee <17610775...@163.com> 于2021年6月4日周五 上午11:49写道:
> >
> > hi
> >
> > sql 也是会从上一次成功的 checkpoint 中保存的 offset 位置开始恢复数据的.
> >
> >
> >
> > -
> > Best Wishes
> > JasonLee
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-02 文章 Yun Tang
Hi,

没有被引用的文件可能也不是完全无用的,可能是当前pending checkpoint正在上传的,所以还需要比较一下那些不在checkpoint 
meta内的文件的修改时间戳,可能比你分析的complete checkpoint的时间戳要大。

总体上来说,我不认为这个问题是一个bug,这个是LSM架构的DB的空间放大问题。如果你对空间放大非常担心的话,可以启用 dynamic level [1] 
来严格控制空间放大,不过这个可能会影响写放大和读放大,导致性能受到一定影响。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/deployment/config/#state-backend-rocksdb-compaction-level-use-dynamic-size

祝好
唐云


From: yujianbo <15205029...@163.com>
Sent: Wednesday, June 2, 2021 15:29
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

Hi,

确认的情况:

大佬,我根据loadCheckpointMeta能够去分析_metadata引用到的/shared目录下的sst文件,然后发现/shared里没有被引用的sst文件即未删除的旧文件,这些旧文件占比很少,只有5%左右。

配置:
   idleStateRetention确实是设置3600秒,保留的ck目录是3个。
目前情况:
 每次checkpoint size增量差不多1-2G,所以size不小。5分钟checkpoint一次。
 最近单次checkpoint的sst文件数是23个,文件大小大约在65M。
 总共目前/checkpoint/shared目录大小是49.4G,然后savepoint结束大小在6.3G。

那我想问一下,那中间是不是有大量的冗余数据存在这还在被应用的sst文件里,有没有什么参数可以调整?还是JM单点删除跟不上相关速度?
JM还能扩展吗??HA不是也是一个JM在工作吗(这个地方不是很明白)





Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1]
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <[hidden email]>
Sent: Tuesday, June 1, 2021 10:51
To: [hidden email] <[hidden email]>
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-06-01 文章 Yun Tang
Hi,

增量checkpoint上传的是sst文件本身,里面可能有一部分空间是被无用数据占据的,你可以理解成增量checkpoint上传的是受到空间放大影响的RocksDB的数据,如果因为单机的数据量较小,没有及时触发compaction的话,确实存在整个远程checkpoint目录数据大于当前实际空间的情况。而关闭增量checkpoint,上传的其实是与savepoint格式一样的kv数据对,Flink会遍历整个DB,将目前有效的数据写出到远程。所以你关闭增量checkpoint,而发现checkpoint目录保持恒定大小的话,说明真实有效数据的空间是稳定的。

另外,其实不建议在日常生产中关闭增量checkpoint,主要原因是对于大规模作业来说,全量checkpoint一方面会对底层DFS来说每次需要上传的数据量变大,另一方面,也会增长单次checkpoint的
 e2e duration,有checkpoint超时失败的风险。

祝好
唐云

From: HunterXHunter <1356469...@qq.com>
Sent: Tuesday, June 1, 2021 11:44
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

我遇到过的问题就是开了 增量checkpoint后,checkpoint会越来越大,关闭之后checkpoint就正常了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

2021-05-31 文章 Yun Tang
Hi,

先确定一下,你的 idleStateRetention 是 3600秒?其次,要想看是否所有数据均有用,可以利用 
Checkpoints.loadCheckpointMeta [1] 去加载你所保留的checkpoint目录下的 _metadata 
文件,然后与当前checkpoint目录下的文件作对比,看是否存在大量的未删除旧文件。

目前仅凭你的描述和一段SQL代码其实很难判断。
可能存在的原因有:

  1.  单次checkpoint文件数目过多,JM单点删除跟不上相关速度
  2.  整体checkpoint 
size不大,RocksDB的compaction频率较低,导致相关包含过期数据的旧文件并没有被及时删除,仍然存在checkpoint目录下(保留多个checkpoint同样会放大该问题)

所以还是最好先分析已有checkpoint的meta文件,再与目前的实际情况做对比。

另外,Flink 
JIRA其实不是一个很好的直接问问题的地方,尤其是这种并不明确的问题,更妥善的方式还是先在邮件列表讨论,具体定位到原因后,再创建相关详细的ticket。

[1] 
https://github.com/apache/flink/blob/b582991b8b2b8dadb89e71d5002c4a9cc2055e34/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/Checkpoints.java#L99

祝好
唐云

From: yujianbo <15205029...@163.com>
Sent: Tuesday, June 1, 2021 10:51
To: user-zh@flink.apache.org 
Subject: Re: Flink Sql 的/checkpoint/shared/文件夹大小不断增长,源数据没有数据激增,应该如何控制?

没有更好的方式吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink upgraded to version 1.12.0 and started from SavePoint to report an error

2021-05-19 文章 Yun Tang
Hi

BaseRowSerializer 已经在Flink-1.11 时候改名成 RowDataSerializer了,即使用 
state-processor-API 也没办法处理当前不存在的类,可能有种比较复杂的办法是自己把 BaseRowSerializer 
的类不改变package的情况下拷贝出来,然后用 state-processor-API 将相关类强制转换成 
RowDataSerializer,不过考虑到你的job graph都是SQL生成的,state-processor-API面向地更多的是data 
stream API,这一块估计还挺难弄的,确实没有想到特别好的办法。

祝好
唐云

From: 王炳焱 <15307491...@163.com>
Sent: Tuesday, May 18, 2021 20:02
To: user-zh@flink.apache.org 
Subject: Flink upgraded to version 1.12.0 and started from SavePoint to report 
an error

我从flink1.10升级到flink1.12时,flink1.10的SQLapi的任务无法从savePoint中进行恢复,报错如下:


2021-05-14 22:02:44,716 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name Calc(select=[((CAST((log_info 
get_json_object2 _UTF-16LE'eventTime')) / 1000) FROM_UNIXTIME 
_UTF-16LE'-MM-dd') AS STAT_DATE, CAST((log_info get_json_object2 
_UTF-16LE'eventTime')) AS EVETN_TIME, CAST((log_info get_json_object2 
_UTF-16LE'data.appId')) AS APP_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.storeId')) AS STORE_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.userId')) AS USER_ID, CAST((log_info get_json_object2 
_UTF-16LE'data.employeeId')) AS EMPLOYEE_ID], where=[(((log_info 
get_json_object2 _UTF-16LE'status') SEARCH 
Sarg[_UTF-16LE'pay':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND ((log_info 
get_json_object2 _UTF-16LE'data.itemType') SEARCH 
Sarg[(-∞.._UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'4':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE".._UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET "UTF-16LE"), 
(_UTF-16LE'5':VARCHAR(2147483647) CHARACTER SET 
"UTF-16LE"..+∞)]:VARCHAR(2147483647) CHARACTER SET "UTF-16LE") AND (log_info 
get_json_object2 _UTF-16LE'eventTime') IS NOT NULL)]) exceeded the 80 
characters length limit and was truncated.
2021-05-14 22:02:44,752 WARN  org.apache.flink.metrics.MetricGroup  
   [] - The operator name 
SourceConversion(table=[default_catalog.default_database.wkb_crm_order], 
fields=[log_info, proctime]) exceeded the 80 characters length limit and was 
truncated.
2021-05-14 22:02:44,879 ERROR 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] - 
Caught unexpected exception.
java.io.IOException: Could not find class 
'org.apache.flink.table.runtime.typeutils.BaseRowSerializer$BaseRowSerializerSnapshot'
 in classpath.
at 
org.apache.flink.util.InstantiationUtil.resolveClassByName(InstantiationUtil.java:722)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readAndInstantiateSnapshotClass(TypeSerializerSnapshotSerializationUtil.java:84)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshot.readVersionedSnapshot(TypeSerializerSnapshot.java:163)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.deserializeV2(TypeSerializerSnapshotSerializationUtil.java:179)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil$TypeSerializerSnapshotSerializationProxy.read(TypeSerializerSnapshotSerializationUtil.java:150)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.api.common.typeutils.TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(TypeSerializerSnapshotSerializationUtil.java:76)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.runtime.state.KeyedBackendSerializationProxy.read(KeyedBackendSerializationProxy.java:145)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation.readMetaData(AbstractRocksDBRestoreOperation.java:191)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKVStateMetaData(RocksDBFullRestoreOperation.java:181)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restoreKeyGroupsInStateHandle(RocksDBFullRestoreOperation.java:168)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:152)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:269)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:565)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:94)
 ~[flink-dist_2.12-1.12.0.jar:1.12.0]
at 

Re: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB

2021-05-06 文章 Yun Tang
Hi,

你可以参阅文档 [1] :
由于 RocksDB 的 JNI API 构建在 byte[] 数据结构之上, 所以每个 key 和 value 最大支持 2^31 字节。 重要信息: 
RocksDB 合并操作的状态(例如:ListState)累积数据量大小可以超过 2^31 字节,但是会在下一次获取数据时失败。这是当前 RocksDB 
JNI 的限制。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/ops/state/state_backends.html#rocksdbstatebackend

祝好
唐云

From: a593700624 <593700...@qq.com>
Sent: Wednesday, April 28, 2021 15:19
To: user-zh@flink.apache.org 
Subject: 流跑着跑着,报出了rocksdb层面的错误 Error while retrieving data from RocksDB

org.apache.flink.util.FlinkRuntimeException: Error while retrieving data from
RocksDB
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:121)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:111)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.get(RocksDBListState.java:60)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:455)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:154)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:568)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:167)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:179)
at
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:101)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:180)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.rocksdb.RocksDBException: Requested array size exceeds VM
limit
at org.rocksdb.RocksDB.get(Native Method)
at org.rocksdb.RocksDB.get(RocksDB.java:810)
at
org.apache.flink.contrib.streaming.state.RocksDBListState.getInternal(RocksDBListState.java:118)
... 20 more


能跑几个小时,总会因为这个问题,一直陷入重启



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:Re: CheckpointedFunction#snapshotState访问键控状态报错

2021-04-13 文章 Yun Tang
Hi,
如果想要可以被外部查询,更合适的方式是 queryable 
state,如果想要把数据同时存储在外部,更合适的方式其实是直接使用外部store,例如HBase,这样子数据的查询是实时的。
checkpoint时候再向外存储的话,一来更新不是及时的,而且数据有冗余存储(分别在Flink的state和外部存储中)。

如果非要想获取实际的kv数据,可以参照我前面说的 KeyedStateBackend#getKeys 或者 
KeyedStateBackend#applyToAllKeys 这两个接口,你可以自己实现一个stream 
operator而不是function,这样就能看到里面的keyed statebackend了。

祝好
唐云

From: cs <58683...@qq.com>
Sent: Monday, April 12, 2021 11:29
To: user-zh 
Subject: 回复:Re: CheckpointedFunction#snapshotState访问键控状态报错

我们需求这个状态需要可以被外部查询,quable 
state我们生产端口会屏蔽掉。所以只能在checkpoint的时候自己将状态输出到外部存储。但是感觉没有很好的api获取statetable里面实际kv状态

---原始邮件---
发件人:Yun Tang

Re: CheckpointedFunction#snapshotState访问键控状态报错

2021-04-10 文章 Yun Tang
Hi

  snapshotState主要是给operator state用的,异常原因是keyed state 
访问时需要设置currentKey的,但是currentKey是当前正在处理的record的key,与snapshotState的执行时候的语义不一样,执行snapshotState方法的时候,是可以没有当前record的。

  如果想要访问整个keyed state,可以通过 KeyedStateBackend#getKeys(String state, N namespace) 
来访问,但还是不建议将keyed state写入到HBase,因为Flink更希望你是按照per record的访问,而不是全局访问,后者效率和性能都不好。


祝好

唐云

From: cs <58683...@qq.com>
Sent: Tuesday, April 6, 2021 21:52
To: user-zh 
Subject: CheckpointedFunction#snapshotState访问键控状态报错

class A extends KeyedProcessFunction

Re: 【疑问】RocksDBStateBackend为什么使用单独封装的frocksdbjni,而不用RocksDB官方提供的RocksJava

2021-03-29 文章 Yun Tang
Hi Feifan,

主要原因是为了支持TTL state的compaction filter 
[1],因为属于定制化内容,RocksDB社区无法纳入主分支,我们后续也在考虑采用插件化形式,复用RocksDB社区的原生RocksJava。当然,现在还有个棘手的问题是RocksJava在5.18
 之后存在性能回退,导致目前无法直接升级。你可以关注 FLINK-14482 跟踪后续的发展


[1] 
https://github.com/ververica/frocksdb/commit/01dca02244522e405c9258000903fee81496f72c
[2] https://issues.apache.org/jira/browse/FLINK-14482

Best
Yun Tang

From: zoltar9264 
Sent: Wednesday, March 24, 2021 13:48
To: user-zh 
Subject: 【疑问】RocksDBStateBackend为什么使用单独封装的frocksdbjni,而不用RocksDB官方提供的RocksJava

大家好,
在RocksDBStateBackend的pom中看到是使用了 
frocksdbjni,看了下这个包是dataArtisans自己的。而RocksDBStateBackend是有提供Java 
sdk的,叫RocksJava。RocksDBStateBackend为什么不直接用 RocksJava呢?


| |
Feifan Wang
|
|
zoltar9...@163.com
|
签名由网易邮箱大师定制



Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

2021-02-07 文章 Yun Tang
Hi 祖安,

state抽象的数据结构,无论是value state,list state还是map state,其都是对应流计算处理中的当前key对应的数据结构。以map 
state具体来说对于每个正在处理的current key (由key selector选择出来 [1]),都有一个对应 
的map存储相关的数据,如果你每次都发现对应的map为空,很有可能是因为你的key 
selector选择出来的key每次都不相同,很大概率是当前处理的record不同导致。

另外,map.isEmpty() 的调用是需要额外开销的(尤其对于RocksDB state backend),如果只是需要处理,仅需要根据 
map.get(value.getTrainNumber()) 是否为null即可。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#keyed-datastream

祝好
唐云

From: 赵一旦 
Sent: Monday, February 8, 2021 10:00
To: user-zh@flink.apache.org 
Subject: Re: procesElement中每天数据都put 进入map,但下一条数据来时map都是空的咨询

keyedStream? key不同可能是。

谌祖安  于2021年2月7日周日 下午6:00写道:

> 您好!
>
>
> 重载procesElement方法,每条stream数据处理时,put数据进入map,后面每条数据处理时先判断是否在map中有相同的key值,无则新增后put进map,有则加工后put进map。
>  在实际代码中发现,每次写入put后,map有数据,但处理下一条数据时,先从map读取数据,发现map已经为空。
> 请问是哪里写错了吗?   和   flink官网中 state.update(current);有什么不同吗?
>
> 以下为代码:
>  private MapState map;  //定义map
>   @Override
> public void processElement(MicsTrainPracticalDetail value, Context ctx,
> Collector out) throws Exception {
>
> MicsTrainPractical current = map.get(value.getTrainNumber());
>
>  System.out.println(map.isEmpty());  //
> 每次数据进来都发现map已经为空,不能保存前面数据进来时put的数据
> Long departTime =
> DateTimeUtils.convertToTimestampSecond(value.getDepartTime());
> Long arrivalTime =
> DateTimeUtils.convertToTimestampSecond(value.getArrivalTime());
> if (current == null) {   //如果map中没有获取到string相同的数据,则新建一条数据put进去
> MicsTrainPractical actual = new MicsTrainPractical();
> actual.setTrainId(value.getTrainNumber());
> actual.setCarId(value.getCarId());
> actual.setStartStationId(value.getStationId());
> actual.setStartPracticalArrivalTime(arrivalTime);
> actual.setEndPracticalArrivalTime(arrivalTime);
> actual.setStartPracticalDepartTime(departTime);
> actual.setEndPracticalDepartTime(departTime);
> actual.setEndStationId(value.getStationId());
> actual.setStopTime(0L);
> actual.setDs(value.getDs());
> actual.setIsInsert(true);
> actual.setTargetStationId(value.getTargetStationId());
> out.collect(actual);
> map.put(value.getTrainNumber(), actual);//向map中写入数据
> return;
> } else {   //如果map有获取到string相同的数据,则转换数据后写入map
>
> MicsTrainPractical actual = new MicsTrainPractical();
> actual.setTrainId(value.getTrainNumber());
> actual.setCarId(value.getCarId());
> actual.setStartStationId(current.getStartStationId());
> actual.setEndStationId(value.getStationId());
>
> actual.setStartPracticalArrivalTime(current.getStartPracticalArrivalTime());
>
> actual.setStartPracticalDepartTime(current.getStartPracticalDepartTime());
> actual.setEndPracticalArrivalTime(arrivalTime);
> actual.setEndPracticalDepartTime(departTime);
> actual.setStopTime(current.getStopTime() + Math.abs(departTime
> - arrivalTime));
> actual.setDs(value.getDs());
> actual.setIsInsert(false);
> actual.setTargetStationId(value.getTargetStationId());
> current.setEndStationId(actual.getEndStationId());
>
> current.setEndPracticalDepartTime(actual.getEndPracticalDepartTime());
>
> current.setEndPracticalArrivalTime(actual.getEndPracticalArrivalTime());
> current.setStopTime(actual.getStopTime());
> current.setIsInsert(actual.getIsInsert());
>
> MicsTrainSectionInfo trainSectionInfo = new
> MicsTrainSectionInfo();
> trainSectionInfo.setTrainId(actual.getTrainId());
> trainSectionInfo.setCarId(actual.getCarId());
> trainSectionInfo.setStartStationId(current.getEndStationId());
> trainSectionInfo.setEndStationId(actual.getEndStationId());
>
> trainSectionInfo.setDepartTime(current.getEndPracticalDepartTime());
>
> trainSectionInfo.setArrivalTime(actual.getEndPracticalArrivalTime());
>
> map.put(value.getTrainNumber(), actual);
> out.collect(actual);
> ctx.output(outputTagSectionFlow, trainSectionInfo);
> }
> }
>
>
>
>
> 谌祖安
> 智能轨道交通业务群 / 产品事业部 / 开发经理
> Intelligent RailTransportation BG /Development Manager
> 广东省广州市天河区新岑四路2号佳都智慧大厦
> PCI Intelligence Building, No. 2 Xincen Fourth Road, Tianhe District,
> Guangzhou, Guangdong
> E shenz...@pcitech.com
> M 86-18680458868
> www.pcitech.com
>


Re: flink升级hadoop3

2021-02-07 文章 Yun Tang
Hi,

Flink自1.11 版本之后就已经支持了hadoop3 [1][2],具体来讲就是将 HADOOP_CLASSPATH 配置成运行机器上的hadoop3 
相关jar包即可。
你也可以参照 [3] 的测试步骤


[1] https://issues.apache.org/jira/browse/FLINK-11086
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/resource-providers/yarn.html#supported-hadoop-versions
[3] https://issues.apache.org/jira/browse/FLINK-17978

祝好
唐云


From: kandy.wang 
Sent: Sunday, February 7, 2021 13:42
To: user-zh@flink.apache.org 
Subject: flink升级hadoop3

flink 如何升级hadoop3 ?








Re: 怎么理解 tolerableCheckpointFailureNumber

2021-01-28 文章 Yun Tang
Hi,

tolerableCheckpointFailureNumber 限制的是最大可容忍的连续失败checkpoint计数 
continuousFailureCounter [1],例如将tolerableCheckpointFailureNumber 
设置成3,连续失败3次,continuousFailureCounter 会累计到3,作业就会尝试重启。
如果中间有一个checkpoint成功了,continuousFailureCounter 就会重置为零 [2]。

checkpoint失败后,如果作业没有发生failover,下一次checkpoint还是周期性的触发,并受 
execution.checkpointing.min-pause [3] 等参数的影响。


[1] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L51
[2] 
https://github.com/apache/flink/blob/4f5747fa0f7226c780742a4549408a38bc95d052/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointFailureManager.java#L161-L171
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#execution-checkpointing-min-pause

祝好
唐云


From: jiangjiguang719 
Sent: Friday, January 29, 2021 9:35
To: user-zh@flink.apache.org 
Subject: 怎么理解 tolerableCheckpointFailureNumber

tolerableCheckpointFailureNumber 是设置可容忍的checkpoint失败次数,具体怎么理解呢?比如 设置为3
1. 当checkpoint 失败时,该值+1,直到 大于 3,实时作业就发生失败或重启?
2. 当checkpoint 失败时,是立即进行下个checkpoint?还是根据周期设置自动触发?
3. 该值是累加值吗


Re: key group from xx to yy does not contain zz异常

2021-01-28 文章 Yun Tang
Hi,

原因是你的key selector引入了随机变量 (也就是下面的方法keyBy),导致其select出来的key不是固定的

public KeySelector keyBy(int parallelism) {
return value -> 
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(), 
ThreadLocalRandom.current().nextInt(parallelism));
}

例如原先的key selector选出的key是 key-A,经过取模得到的key group是44,理应将该record发送给下游key 
group包含44的task,但是相关record进入到对应group的task之后,在加入到timer队列的时候,还会再次进行group的计算,由于你的key
 selector有随机性,导致这次选出的key可能是key-B,而针对key-B的取模运算得到的key group是4,也就有可能不在你的task (key 
group 44-45) 中了,导致了最终的异常。

祝好
唐云

From: restart 
Sent: Thursday, January 28, 2021 17:54
To: user-zh@flink.apache.org 
Subject: key group from xx to yy does not contain zz异常

线上部署flink项目时,启动爆如下错误,在测试环境可正常启动,job依赖的flink版本是1.10,flink
集群版本是1.12-SNAPSHOT,与线上一致。有点摸不着头脑,麻烦各位老师帮分析分析
堆栈信息:
java.lang.IllegalArgumentException: key group from 44 to 45 does not contain
4
at
org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:161)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.globalKeyGroupToLocalIndex(KeyGroupPartitionedPriorityQueue.java:187)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.computeKeyGroupIndex(KeyGroupPartitionedPriorityQueue.java:182)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.getKeyGroupSubHeapForElement(KeyGroupPartitionedPriorityQueue.java:176)
at
org.apache.flink.runtime.state.heap.KeyGroupPartitionedPriorityQueue.add(KeyGroupPartitionedPriorityQueue.java:112)
at
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerEventTimeTimer(InternalTimerServiceImpl.java:217)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:884)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:42)
at
org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger.onElement(EventTimeTrigger.java:30)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:898)
at
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:399)
at
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:161)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:178)
at
org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:153)
at
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:351)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:567)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:536)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

代码逻辑大致:
DataStream stream = dataStream
.keyBy(keyBy(globalParallelism))
.window(window(downsampling))
.reduce(reduce(trackerType), processWindow(trackerType),
TypeInformation.of(Metrics.class))
.keyBy(secondKeyBy())

.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.reduce(reduce(trackerType),
processSecondWindow(trackerType), TypeInformation.of(Metrics.class))
.rebalance()
.addSink(sink())
.setParallelism(globalParallelism/2);

public KeySelector keyBy(int parallelism) {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),ThreadLocalRandom.current().nextInt(parallelism));
}

public KeySelector secondKeyBy() {
return value ->
Joiner.on(SeparatorConstant.BAR).join(value.getMetricsId(),
value.getWindowEnd());
}
备注:二次keyby的原因是为了解决数据倾斜问题,第一个keyby用来基于EventTime的翻滚窗口,第二个keyby使用了基于processTime的session窗口



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink on yarn JDK 版本支持问题

2021-01-24 文章 Yun Tang
Hi,

MaxMetaspaceSize 是在JDK8中新增的,用以取代以前的PermGen[1],JDK7中自然不支持。可以在hadoop集群中再安装JDK8,将 
env.java.home 指向新的JDK


[1] https://www.baeldung.com/java-permgen-metaspace#metaspace

祝好
唐云

From: Jacob <17691150...@163.com>
Sent: Saturday, January 23, 2021 16:17
To: user-zh@flink.apache.org 
Subject: Flink on yarn JDK 版本支持问题

使用Flink1.11.2客户端 往hadoop集群提交job,报错如下:

LogType:jobmanager.err
Log Upload Time:Sat Jan 23 00:06:47 -0800 2021
LogLength:160
Log Contents:
Unrecognized VM option 'MaxMetaspaceSize=268435456'
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

请问是因为hadoop集群jdk版本低的问题吗?


现在已知的是hadoop集群jdk版本为1.7


之前一直以为在flink配置文件中配置的*env.java.home*就应该是hadoop集群的java home,通过测试,发现并不是,这个java
home就是客户端(本地)所在机器的java home。这个java版本已经是1.8+,但提交job时,仍然报错,如上。



是因为hadoop集群的jdk低吗?如果升级了hadoop集群的jdk,那么在flink配置文件中的env.java.home
需要改成hadoop集群的java home吗?还是不用改变,依旧使用本地的java home路径?

这两个jdk对于启动一个flink job的作用是什么呀?( 配置的env.java.home和hadoop集群的java home)







-
Thanks!
Jacob
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 不同程序间checkpoint迁移

2021-01-21 文章 Yun Tang
Hi,

Flink state processor[1] 应该可以满足你的需求。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/libs/state_processor_api.html

祝好
唐云

From: gimlee 
Sent: Friday, January 22, 2021 12:07
To: user-zh@flink.apache.org 
Subject: 不同程序间checkpoint迁移

程序A:jar  source: kafka
程序B:Flink SQL  source: kafka
使用同一个topic, group id
如果需要把A停掉,使用B替换A,需要把A的checkpoint中的数据、kafka的分区和offset信息改成程序B的checkpoint,请问有办法或者有工具实现嘛?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
Hi

  1.  目前没有全局的配置
  2.  开启cleanFullSnapshot 并不会物理清除数据,只是确保checkpoint数据中没有相关过期数据

祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 20:43
To: user-zh@flink.apache.org 
Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化

你好:
非常谢谢,
本地的数据是过期了。
不好意思,还有几个疑问想请教下。
1.看文档,开启cleanFullSnapshot是只能对单个状态设置吗,没查到flink sql 
开启cleanFullSnapshot的配置的地方?因为只看到StateTtlConfig是对于单个状态的设置,没有对job或者对table的config设置。
2.cleanFullSnapshot 开启后,从checkpoint恢复才会触发清理,不是在checkpoint过程中触发清理掉过期数据?


> 在 2021年1月14日,下午4:48,Yun Tang  写道:
>
> Hi,
>
> 你本地的数据肯定是过期了,checkpoint 
> size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full
>  snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。
>
> 其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。
>
> 建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。
>
>
> [1] 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#cleanup-in-full-snapshot
>
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 16:11
> To: user-zh@flink.apache.org 
> Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化
>
> 你好:
> 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。
>
>> 在 2021年1月14日,下午4:07,Yun Tang  写道:
>>
>> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
>>
>> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
>>
>>
>> 祝好
>> 唐云
>> 
>> From: 孙啸龙 
>> Sent: Thursday, January 14, 2021 15:52
>> To: user-zh@flink.apache.org 
>> Subject: Flink sql 状态过期后,checkpoint 大小没变化
>>
>> 大家好:
>>   版本:1.12.0
>>   方式:flink sql
>>   测试sql:
>>   select a.id,b.money,b.createTime from test_state_from a
>>   full join test_state_from1 b on a.id=b.id;
>>   问题:
>>  test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
>> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?
>



Re: 回复: 请教个Flink checkpoint的问题

2021-01-14 文章 Yun Tang
Hi

这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with 
savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain 
checkpoint的数量为1而被subsume掉了,也就是被删掉了。

如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。

另外说一句,即使是已经deprecated的cancel with 
savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。


[1] https://issues.apache.org/jira/browse/FLINK-10354
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained

祝好
唐云

From: yinghua...@163.com 
Sent: Thursday, January 14, 2021 19:00
To: user-zh 
Subject: 回复: 回复: 请教个Flink checkpoint的问题

好的,感谢您的回复!



yinghua...@163.com

发件人: Evan
发送时间: 2021-01-14 18:48
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
是的,应该是机制问题,链接[1]打开有这样一句解释:

If you choose to retain externalized checkpoints on cancellation you have to 
handle checkpoint clean up manually when you cancel the job as well 
(terminating with job status JobStatus#CANCELED).

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

如回答有误,请指正。





发件人: yinghua...@163.com
发送时间: 2021-01-14 18:02
收件人: user-zh
主题: 回复: 回复: 请教个Flink checkpoint的问题
代码如下:
streamEnv.enableCheckpointing(5 * 60 * 1000);
CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
checkPointConfig.setMaxConcurrentCheckpoints(1);
checkPointConfig.setTolerableCheckpointFailureNumber(3);
checkPointConfig

.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

String checkpointPath = this.execOptions.get(CHECKPOINT_PATH);
try {
  StateBackend rocksDBStateBackend = new 
RocksDBStateBackend(checkpointPath);
  streamEnv.setStateBackend(rocksDBStateBackend);



yinghua...@163.com
发件人: Evan
发送时间: 2021-01-14 17:55
收件人: user-zh
主题: 回复: 请教个Flink checkpoint的问题
代码图挂掉了,看不到代码
发件人: yinghua...@163.com
发送时间: 2021-01-14 17:26
收件人: user-zh
主题: 请教个Flink checkpoint的问题
我在yarn上提交任务时,设置flink的checkpoint是5分钟一次,checkpoint使用RocksDBStateBackend保存在HDFS上且任务取消后不删除checkpoint,代码如下
现在我发现,当我停止任务时使用stopWithSavepoint发现任务停止后把最近一次的checkpoint信息给删除了?目前机制是这样的吗还是我使用有问题?是不是调用cancelWithSavepoint停止任务时就不会删除最近一次的checkpoint信息?
yinghua...@163.com


Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
Hi,

你本地的数据肯定是过期了,checkpoint 
size没有变化是因为你的数据总量83MB,且之后没有插入新数据,导致没有触发RocksDB的compaction,所以本地的数据没有物理上清理,而在full 
snapshot时候,估计你并没有开启cleanFullSnapshot [1],所以导致full snapshot时候并没有删除掉过期数据。

其实你可以查询一下状态,默认情况下,已经过期的数据是无法再查询到了。

建议开启增量checkpoint即可,过期数据即使物理不删除,也因为过期而无法再读取到了,没必要过分关注UI上的checkpoint size。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#cleanup-in-full-snapshot

祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 16:11
To: user-zh@flink.apache.org 
Subject: Re: Flink sql 状态过期后,checkpoint 大小没变化

你好:
 使用的state backend是rocksdb,没有开启增量,后续没有再插入过数据。

> 在 2021年1月14日,下午4:07,Yun Tang  写道:
>
> 使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?
>
> 在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?
>
>
> 祝好
> 唐云
> 
> From: 孙啸龙 
> Sent: Thursday, January 14, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: Flink sql 状态过期后,checkpoint 大小没变化
>
> 大家好:
>版本:1.12.0
>方式:flink sql
>测试sql:
>select a.id,b.money,b.createTime from test_state_from a
>full join test_state_from1 b on a.id=b.id;
>问题:
>   test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
> ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?



Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-01-14 文章 Yun Tang
使用的state backend,以及对应的checkpoint 类型是什么(是否开启incremental checkpoint)?

在一开始插入数据后,直到state TTL超过,期间均没有再插入数据过么?还是说一直在以一定的数据量在插入数据?


祝好
唐云

From: 孙啸龙 
Sent: Thursday, January 14, 2021 15:52
To: user-zh@flink.apache.org 
Subject: Flink sql 状态过期后,checkpoint 大小没变化

大家好:
版本:1.12.0
方式:flink sql
测试sql:
select a.id,b.money,b.createTime from test_state_from a
full join test_state_from1 b on a.id=b.id;
问题:
   test_state_from和test_state_from1 分别插入50万条数据,查看 checkpoint 大小为83m,state 
ttl 设  置为16分钟,30分钟后查看checkpoint的值还是83m,状态过期清理后的checkpoint为什么没变小?


Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 文章 Yun Tang
,实际上意味着任务要被调度至JM所在节点才可以。然后,任务调度时,flink这边能人为指定任务的分配吗?我看前边的讨论,好像flink的任务调度基本是自动完成的,人为干预难度较大是吧?

祝好



Yun Tang  于2021年1月12日周二 下午7:11写道:

> Hi,
>
> 从异常日志看,应该是因为你的state.checkpoints.dir 或者说
> statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover
> restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。
>
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Tuesday, January 12, 2021 18:14
> To: user-zh@flink.apache.org 
> Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> 你好,唐老师,谢谢解答。
>
> 不好意思,下面补充一下报错信息,刚才忘记说了。
>
> 主要报错信息如下,重新模拟了下:
> 2021-01-12 18:09:34,236 INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
> Custom Source -> Flat Map -> Timestamps/Watermarks (2/2)
> (3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on
> org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a.
>
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
> at
>
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
> at
>
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
> at java.lang.Thread.run(Thread.java:748)
>
> Caused by: org.apache.flink.util.FlinkException: Could not restore operator
> state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from
> any of the 1 provided restore options.
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
> ... 9 more
>
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
> when trying to restore operator state backend
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
> at
>
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
> at
>
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at
>
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 11 more
>
> Caused by: java.io.FileNotFoundException:
>
> /data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at
>
> org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
> at
>
> org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
> at
>
> org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
> at
>
> org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
> at
>
> org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
> at
>
> org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
> at
>
> org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
> ... 15 more
>
>
> 这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777
>
> Yu

Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 文章 Yun Tang
Hi,

从异常日志看,应该是因为你的state.checkpoints.dir 或者说 
statebackend的checkpoint目录配置成了本地目录,checkpoint保存到了本地机器上,所以在failover 
restore的时候,必须得让原task部署回原来的机器才能正常运行。将state backend的checkpoint目录更换为一个DFS目录即可。


祝好
唐云

From: Carmen Free 
Sent: Tuesday, January 12, 2021 18:14
To: user-zh@flink.apache.org 
Subject: Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

你好,唐老师,谢谢解答。

不好意思,下面补充一下报错信息,刚才忘记说了。

主要报错信息如下,重新模拟了下:
2021-01-12 18:09:34,236 INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph- Source:
Custom Source -> Flat Map -> Timestamps/Watermarks (2/2)
(3b50a7ce56b408c2978260846b76a28a) switched from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@4c107a3a.

java.lang.Exception: Exception while creating StreamOperatorStateContext.
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:191)
at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:255)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:989)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:453)
at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:448)
at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:460)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:708)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:533)
at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.util.FlinkException: Could not restore operator
state backend for StreamSource_cbc357ccb763df2852fee8c4fc7d55f2_(2/2) from
any of the 1 provided restore options.
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.operatorStateBackend(StreamTaskStateInitializerImpl.java:252)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:139)
... 9 more

Caused by: org.apache.flink.runtime.state.BackendBuildingException: Failed
when trying to restore operator state backend
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:86)
at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createOperatorStateBackend(RocksDBStateBackend.java:565)
at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$operatorStateBackend$0(StreamTaskStateInitializerImpl.java:243)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 11 more

Caused by: java.io.FileNotFoundException:
/data/flink/checkpoints/b261de0447d59bb2f1db9c084f5b1a0b/chk-5/4a160901-8ddd-468e-a82d-6efcb8a9dff9
(No such file or directory)
at java.io.FileInputStream.open0(Native Method)
at java.io.FileInputStream.open(FileInputStream.java:195)
at java.io.FileInputStream.(FileInputStream.java:138)
at
org.apache.flink.core.fs.local.LocalDataInputStream.(LocalDataInputStream.java:50)
at
org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:142)
at
org.apache.flink.core.fs.SafetyNetWrapperFileSystem.open(SafetyNetWrapperFileSystem.java:85)
at
org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68)
at
org.apache.flink.runtime.state.OperatorStreamStateHandle.openInputStream(OperatorStreamStateHandle.java:66)
at
org.apache.flink.runtime.state.OperatorStateRestoreOperation.restore(OperatorStateRestoreOperation.java:73)
at
org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder.build(DefaultOperatorStateBackendBuilder.java:83)
... 15 more

这个文件夹在A节点(JM)上是有的,难道是访问权限问题吗?B节点无法访问A节点吗,有点奇怪啊,配置了ssh免密的啊,文件夹/data/flink/checkpoints访问权限也设置成了777

Yun Tang  于2021年1月12日周二 下午5:46写道:

> Hi
>
> Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。
>
> 祝好
> 唐云
> 
> From: Carmen Free 
> Sent: Tuesday, January 12, 2021 15:52
> To: user-zh@flink.apache.org 
> Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> hi,
>
> rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?
>
> 1、环境说明
>
> flink版本:1.10.2
> 操作系统:centos 7
>
> 2、集群说明(当前模拟了2节点)
>
> 节点A   |  节点B
> 角色|   JM、TM|TM
> taskslot   |   4   | 4
>
> 3、statebackend配置
>
> # rocksdb作为状态后备
> state.backend:

Re: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

2021-01-12 文章 Yun Tang
Hi

Flink的容错机制是可以保证TM lost时候会尝试重启作业,“为何任务不能恢复”是需要看完整异常栈的,简单描述是无法帮助排查问题的。

祝好
唐云

From: Carmen Free 
Sent: Tuesday, January 12, 2021 15:52
To: user-zh@flink.apache.org 
Subject: rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

hi,

rocksdb作为statebackend时,TM节点挂掉了,为何任务不能恢复呢?

1、环境说明

flink版本:1.10.2
操作系统:centos 7

2、集群说明(当前模拟了2节点)

节点A   |  节点B
角色|   JM、TM|TM
taskslot   |   4   | 4

3、statebackend配置

# rocksdb作为状态后备
state.backend: rocksdb


# 存储快照的目录(暂时使用的本地目录)
state.checkpoints.dir: file:///data/flink/checkpoints

4、启动任务后,任务自动分配在A节点的TM上,运行一段时间后,检查点快照正常。接着,仅停掉A节点TM(JM仍正常运行),任务被自动调度至B节点的TM上,但是此时任务一直重启,无法恢复,这是为什么呢?

5、如果我启动A节点,任务依旧无法恢复(此时任务仍在B节点运行),直到我停掉B节点TM,此时任务调度至A节点,任务可以正常恢复。所以有点疑问,4中的场景为何不能恢复任务呢?为什么只有在A节点上才可以进行任务恢复呢?最初以为是访问路径的问题,但是仔细想了想,检查点相关的操作一直都是JM进行的,我觉得只要JM没有挂掉,应该就可以将任务进行恢复啊,是我的理解有偏差吗?


Re: Flink代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

2021-01-12 文章 Yun Tang
Hi,

这个错误其实是kryo初始化时候扔出来的。你自定义的类 SockRowV2,WashDetectionSockValue 
等,不符合Flink关于pojo的定义,所以会回退到使用kryo进行序列化/反序列化。建议将相关类在kryo上进行注册 
[1]。特别地,如果是thrift或者protobuf的类,需要单独注册[2],更好的方法其实是建议将你们的自定义类修改为满足Flink的POJO类 [3]


[1] 
https://flink.apache.org/news/2020/04/15/flink-serialization-tuning-vol-1.html#kryo
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/custom_serializers.html
[3] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/types_serialization.html#rules-for-pojo-types

祝好
唐云

From: JackJia 
Sent: Tuesday, January 12, 2021 14:16
To: user-zh@flink.apache.org 
Subject: Flink代码一直报 
java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢

请教个问题,代码一直报 java.lang.NullPointerException错误,但变量明明初始化了,却仍然有该错误,找不到原因,请诸位指教。谢谢。
错误如下:
2021-01-12 04:36:09,950 INFO  org.apache.flink.runtime.taskmanager.Task 
- Window(TumblingEventTimeWindows(6), EventTimeTrigger, 
WashDataDetectionFunction) -> Map -> Map -> Sink: Unnamed (1/1) 
(b015c7cebf71e744f6b50136cdc32e20) switched from RUNNING to FAILED.
java.lang.RuntimeException: Exception occurred while processing valve output 
watermark:
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:216)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:189)
at 
org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:111)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processElement(StreamOneInputProcessor.java:169)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:143)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
at java.util.ArrayList$SubList.checkForComodification(ArrayList.java:1238)
at java.util.ArrayList$SubList.size(ArrayList.java:1048)
at java.util.AbstractList.add(AbstractList.java:108)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:116)
at 
com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:22)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:761)
at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at 
org.apache.flink.contrib.streaming.state.RocksDBValueState.value(RocksDBValueState.java:90)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.temperatureFall(WashDataDetectionFunction.java:264)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.washDetection(WashDataDetectionFunction.java:206)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:94)
at 
com.airen.lasttime.keyedfunction.WashDataDetectionFunction.process(WashDataDetectionFunction.java:33)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:50)
at 
org.apache.flink.streaming.runtime.operators.windowing.functions.InternalIterableProcessWindowFunction.process(InternalIterableProcessWindowFunction.java:32)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.emitWindowContents(WindowOperator.java:549)
at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.onEventTime(WindowOperator.java:457)
at 
org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276)
at 
org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:128)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:784)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamOneInputProcessor.java:213)
... 10 more


涉及的代码如下:
private SingleOutputStreamOperator 
dataClean(DataStream source, ParameterTool pt) {
WindowedStream windowStream = 
source.keyBy(x -> x.mac).window(TumblingEventTimeWindows.of(Time.seconds(60)));

SingleOutputStreamOperator afterWashDetection = 
windowStream.process(new WashDataDetectionFunction());

SingleOutputStreamOperator afterIdleAndShortLiveClean = 
afterWashDetection.keyBy(x -> x.mac)
.window(TumblingEventTimeWindows.of(Time.seconds(60))).process(new 
IdleAndShortLiveDataCleanFunction());



Re: flink编译报错

2021-01-12 文章 Yun Tang
Hi,

国内网络环境不太好,其实问题是node.js 安装有问题,可以考虑单独安装一下node 
js和npm,如果还是不行,在不需要webui的前提下,可以编译时候加上profile “-Pskip-webui-build” 来skip掉该部分的编译。

祝好
唐云

From: Ruguo Yu 
Sent: Tuesday, January 12, 2021 14:00
To: user-zh@flink.apache.org 
Subject: Re: flink编译报错

试下这个命令
mvn clean install -DskipTests -Dfast -Drat.skip=true -Dhaoop.version=2.7.6
-Dinclude-hadoop -Dscala-2.11 -T2C
其中,-Dhaoop.version 为 hadoop 版本号



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 请教个Flink savepoint的问题

2021-01-11 文章 Yun Tang
Hi,

没有暴露现成的API,但是可以参照测试用例的写法,给jobGraph设置 savepointRestoreSettings [1]。

[1] 
https://github.com/apache/flink/blob/ab87cf4ec0260c30194f3d8e75be1f43318aa32a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/ResumeCheckpointManuallyITCase.java#L383

祝好
唐云

From: yinghua...@163.com 
Sent: Monday, January 11, 2021 19:07
To: user-zh 
Subject: 请教个Flink savepoint的问题

Flink APi 中我们在java代码中使用客户端调用类ClusterClient如下的方法停止了任务:
CompletableFuture stopWithSavepoint(JobID var1, boolean var2, @Nullable 
String var3);然后我们保存了该任务的savepoint信息,那恢复时没看到对应的恢复方法,找了些资料都是通过命令来恢复,有没有直接调用Java 
API 层次的方法来恢复任务?



yinghua...@163.com


Re: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing container

2021-01-07 文章 Yun Tang
Hi,

有可能是堆外内存超用,可以参考最近中文社区的一篇投稿 《详解 Flink 容器化环境下的 OOM Killed》进行修改,建议先增大 jvm-overhead 
相关配置

[1] 
https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ===2247490197=1=b0893a9bf12fbcae76852a156302de95

祝好
唐云

From: Yang Peng 
Sent: Thursday, January 7, 2021 12:24
To: user-zh 
Subject: Flink 1.11.2版本 实时任务运行 报错 is running beyond physical memory limits. 
Current usage: 25.0 GB of 25 GB physical memory used; 28.3 GB of 52.5 GB 
virtual memory used. Killing container

Hi,

 
大家好,咨询一个问题,我们有个实时任务运行在Flink1.11.2版本,使用rocksdbstatebackend,最近报警出现了物理内存超限被kill的异常信息,我们查看了监控taskmanager
heap使用量没有超限,direct内存使用量也维持在一个平稳的范围内没有超限,也没有报oom,这种情况是非堆内存异常是吗?完整报错信息如下:

Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address=flink-cm8.jd.163.org -Dweb.port=0
-Dweb.tmpdir=/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c
-Djobmanager.rpc.port=33656 -Drest.address=flink-cm8.jd.163.org
-Dsecurity.kerberos.login.keytab=/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab
|- 180362 180360 180362 180362 (bash) 0 2 116011008 353 /bin/bash -c
/usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC -Xmx11542724608
-Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D
taskmanager.memory.network.max=1073741824b -D
taskmanager.memory.network.min=1073741824b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=12750684160b -D
taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=11408506880b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address='flink-cm8.jd.163.org' -Dweb.port='0'
-Dweb.tmpdir='/tmp/flink-web-9197a884-03b9-4865-a0a0-0b6a1c295f2c'
-Djobmanager.rpc.port='33656' -Drest.address='flink-cm8.jd.163.org'
-Dsecurity.kerberos.login.keytab='/mnt/ssd/3/yarn/local/usercache/portal/appcache/application_1603181034156_0137/container_e06_1603181034156_0137_01_01/krb5.keytab'
1> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.out
2> 
/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.err

Container killed on request. Exit code is 143
Container exited with a non-zero exit code 143

2021-01-07 11:51:00,781 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph   [] -
Source: 银河SDK原始日志 (18/90) (51ac2f29df472d001ce9b4307636ac1c) switched
from RUNNING to FAILED on
org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot@1aad00fa.
java.lang.Exception: Container
[pid=180362,containerID=container_e06_1603181034156_0137_01_02] is
running beyond physical memory limits. Current usage: 25.0 GB of 25 GB
physical memory used; 28.3 GB of 52.5 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e06_1603181034156_0137_01_02 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS)
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES)
FULL_CMD_LINE
|- 180421 180362 180362 180362 (java) 258262921 59979106 30306209792
6553277 /usr/jdk64/jdk1.8.0_152/bin/java -XX:+UseSerialGC
-Xmx11542724608 -Xms11542724608 -XX:MaxDirectMemorySize=1207959552
-XX:MaxMetaspaceSize=268435456
-Dlog.file=/mnt/ssd/8/yarn/log/application_1603181034156_0137/container_e06_1603181034156_0137_01_02/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties
-Dlog4j.configurationFile=file:./log4j.properties
org.apache.flink.yarn.YarnTaskExecutorRunner -D
taskmanager.memory.framework.off-heap.size=134217728b -D

Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-07 文章 Yun Tang
因为numRestarts 是一个累计值,所以你得区分当前值和之前的数值是否发生了增加,来区分是否发生了failover。

另外,不建议使用YARN的application状态来判断Flink作业状态,因为如果Flink作业配置了重试策略,即使作业不断进行failover,整个YARN的application状态仍然是RUNNING,并不能发现问题。

祝好
唐云

From: bradyMk 
Sent: Thursday, January 7, 2021 16:38
To: user-zh@flink.apache.org 
Subject: Re: Re:Re: Re:flink作业通过grafana监控,若想发出报警该如何选择指标的问题

好的,我研究一下,谢谢指导~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

2021-01-06 文章 Yun Tang
Hi

可以使用 numRestarts [1] 
指标进行报警,不过需要维护状态,也就是该值增大时报警。对于旧版本Flink,可以使用以及废弃的fullRestarts 指标。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/metrics.html#availability

祝好
唐云

From: bradyMk 
Sent: Wednesday, January 6, 2021 18:57
To: user-zh@flink.apache.org 
Subject: Re: flink作业通过grafana监控,若想发出报警该如何选择指标的问题

Hi~

我现在也有在用这个办法,可我任务特别多的话,还要求及时报警并发送消息到钉钉群到邮件,这种方法就不太好了



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复: flink 1.12 Cancel Job内存未释放(问)

2021-01-04 文章 Yun Tang
Hi 徐州州

请查看一下checkpoint UI部分的overview,观察restored部分的是否为空,也就是没有从checkpoint恢复,同样可以观察job 
manager 部分日志,看是否从checkpoint resume。
如果没有从checkpoint/savepoint恢复,作业其实相当于是从头重新跑,除非作业有其他的外部访问,否则不应该有任何历史数据能看到。

祝好
唐云

From: 徐州州 <25977...@qq.com>
Sent: Tuesday, January 5, 2021 10:34
To: user-zh@flink.apache.org 
Subject: 回复: flink 1.12 Cancel Job内存未释放(问)

这是我完整的配置文件,并没有设置任何状态后端,和保存点,任务kill执行的命令是/opt/module/hadoop3.2.1/bin/yarn 
application -kill  jobid,启动命令执行的是,/opt/module/flink1.12/bin/flink run -d -m 
yarn-cluster -yjm 660 -ytm 2500 -ys 3 -yqu xjia_queue -ynm 
App_Bs_Drainage_Launch_200105,我猜想会不会是因为队列的问题,我集群中只有一个queue队列。

-- 原始邮件 --
发件人: "user-zh" ;
发送时间: 2021年1月5日(星期二) 上午10:03
收件人: "user-zh@flink.apache.org";
主题: 回复: flink 1.12 Cancel Job内存未释放(问)

这种情况貌似和检查点、保存点还有状态后端有关,可以排查排查,重新启动任务在昨天的基础上累加这个逻辑是正确的(如果配置了检查点、保存点还有状态后端),只是现在昨天你杀死正在执行的job的时候最后保存的状态结果和你实际的结果不一致


| |
刘海
|
|
liuha...@163.com
|
签名由网易邮箱大师定制
在2021年1月5日 09:04,徐州州<25977...@qq.com> 写道:
我一个flink-sql任务,每次隔天计算都会在昨天的计算结果上累加,我使用代码jar的方式提交,代码中设置了MemoryStateBackend,我定时任务是23:57-kill掉正在执行的job,隔天的00:30通过azkaban的启动脚本,重新提交任务执行,可是我发现00:30当天计算的结果,是在昨天的基础上累加的,我查看kill掉任务的那一段时间NodeManager的内存也得到了释放,可是为什么我在00:30的时间点启动,还是会在昨天的基础上累计,而且计算的结果(并没有完全在昨天的基础上累计),比如昨天计算结果1000,今天它可能在900的结果上进行累加。请问这种情况是为什么。试了好多,都没有解决。|insert
 into app_bs_drainage_place
|SELECT
| do.GrouporgName,
| du.Name,
| COUNT(DISTINCT dooi.Code) AS TotalSingular,
|md5(concat(do.GrouporgName,du.Name,cast(current_date as String))) as uuids,
|current_date as As_Of_Date
|FROM dw_od_order_info dooi
|  INNER JOIN dw_worktask_info dwi ON dwi.CustomerId = dooi.CustomerId AND 
dwi.HandlerPersonId = dooi.UserId and dwi.As_Of_Date=current_date
|  INNER JOIN dim_cc_media_placement_label_relation dmplr ON 
dmplr.MediaPlacementId = dwi.PlacementId
|  INNER JOIN dim_cc_media_label dcml ON dmplr.LabelId = dcml.Id AND dcml.Name 
IN ('金装驼奶', '血糖仪')
|  INNER JOIN dim_user du ON dooi.UserId = du.Id
| INNER JOIN dim_org do ON dooi.UserOrgId = do.Grouporgid AND 
left(do.GrouporgName, 2) = '引流'
| WHERE dooi.As_Of_Date=current_date and dooi.Status < 60 AND dooi.Status 
< 120 AND dooi.OrgType = 1
| GROUP BY do.GrouporgName,du.Name




--原始邮件--
发件人: "赵一旦"

Re: flink如何使用oss作为checkpoint/savepoint/statebackend?

2020-12-30 文章 Yun Tang
Hi

其实社区文档 [1] 已经给了很详细的步骤:

  1.  将flink-oss-fs-hadoop jar包放在plugins目录下
  2.  配置oss的endpoint,id和secret
  3.  在需要使用oss的地方,声明oss:// 开头的schema,例如state backend创建的时候

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html

祝好
唐云

From: 陈帅 
Sent: Wednesday, December 30, 2020 20:53
To: user-zh@flink.apache.org 
Subject: flink如何使用oss作为checkpoint/savepoint/statebackend?

请问flink如何使用oss作为checkpoint/savepoint/statebackend? 需要依赖Hadoop并配置Hadoop on OSS吗?


Re: Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

2020-12-28 文章 Yun Tang
Hi 王磊,

当然是可以的,state backend的checkpoint地址其实依赖于Flink的file 
system实现,只要参照文档[1]的描述,对oss进行相关配置即可。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/deployment/filesystems/oss.html#shaded-hadoop-oss-file-system


祝好
唐云

From: Lei Wang 
Sent: Monday, December 28, 2020 16:21
To: user-zh@flink.apache.org 
Subject: Flink RocksDBStateBackend 可以设置阿里云 OSS 存储吗?

如题, 可以直接这样写吗?

env.setStateBackend(new RocksDBStateBackend(“oss://”, true));

谢谢,
王磊


Re: checkpoint持久化问题

2020-12-27 文章 Yun Tang
Hi

既然UI上已经显示成功了,一定是成功且成功保存到HDFS上了,可以看下父目录的情况,chk-x 目录可能随着新的checkpoint完成而被删除

祝好
唐云

From: chen310 <1...@163.com>
Sent: Friday, December 25, 2020 16:01
To: user-zh@flink.apache.org 
Subject: checkpoint持久化问题

问题:
flink sql中设置了job挂掉后checkpoint保留

execution.checkpointing.externalized-checkpoint-retention
RETAIN_ON_CANCELLATION

并且配置了checkpoint保存到hdfs上

state.backend rocksdb

#增量checkpoint
#state.backend.incremental true
state.checkpoints.dir hdfs:///tmp/flink/checkpoint

flink实际也做了checkpoint,但是用这个路径去hdfs上查询,并不存在对应的路径的目录,好像并不是每次做checkpoint都会持久化到hdfs上,这个是要做啥配置么?让每次checkpoint都保存到磁盘





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

2020-12-23 文章 Yun Tang
Hi @Storm

checkpoint的增量模式目前仅对RocksDB生效,这里的增量是指上传新产生的DB 
sst文件。而RocksDB的全量模式是将DB的有效kv进行序列化写出,除非有大量的数据没有compaction清理掉,否则不可能出现增量checkpoint 
size无限膨胀,而全量checkpoint正常的问题,你这里的无限膨胀的size范围是多大呢?

祝好
唐云

From: Storm☀️ 
Sent: Tuesday, December 22, 2020 19:52
To: user-zh@flink.apache.org 
Subject: Re: flink1.10.1/1.11.1 使用sql 进行group 和 时间窗口 操作后 状态越来越大

唐云大佬好,
我关闭了chk的增量模式之后,chkstate确实不会再无线膨胀了。这个是我配置的不准确,还是一个已知问题呢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-17 文章 Yun Tang
Hi

这些metrics启用的配置是放到flink conf里面的,不是让你直接在代码里面调用的。

祝好
唐云

From: bradyMk 
Sent: Thursday, December 17, 2020 20:56
To: user-zh@flink.apache.org 
Subject: Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

谢谢 Yun Tang 大佬的解答~

另外,还想请教一下:我在代码中设置开启了cur-size-all-mem-tables的监控,代码如下:
//设置RocksDB状态后端,且开启增量ck
val backend = new RocksDBStateBackend(path, true)

//监控配置项
val metricOptions = new RocksDBNativeMetricOptions
metricOptions.enableSizeAllMemTables()

//设置预选项
backend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED)

//开启RocksDB
env.setStateBackend(backend.asInstanceOf[StateBackend])

但是发现这个监控指标并没有成功发送,请问是我在代码里开启的方式不对么?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.9.1 如何配置RocksDB的block-cache-usage参数

2020-12-16 文章 Yun Tang
Hi

write buffer的指标可以看 cur-size-all-mem-tables,由于1.9没有block 
cache的指标,如果不自行将代码[1]pick回去的话,暂时没办法通过内置的方式监控了。


[1] https://issues.apache.org/jira/browse/FLINK-15387

祝好
唐云

From: bradyMk 
Sent: Wednesday, December 16, 2020 12:03
To: user-zh@flink.apache.org 
Subject: flink1.9.1 如何配置RocksDB的block-cache-usage参数

Hi~想请教一下大家:

最近使用flink1.9版本用RocksDB做增量ck,我想配置如下两个内容的指标来监控任务的内存情况:
  ①block-cache-usage
  ②write buffer

但是在官网[1]并没有找到相关指标,通过查阅资料得知:
  write buffer对应的指标为:state.backend.rocksdb.metrics.cur-size-all-mem-tables
  而block-cache-usage的指标是1.10版本之后才有的,1.9版本没有这个指标;

问:
①write buffer是否对应这个指标 ->
state.backend.rocksdb.metrics.cur-size-all-mem-tables
②如果1.9没有监控block-cache-usage的直接指标,那么该如何监控block-cache-usage呢?

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/config.html#rocksdb-native-metrics







-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 Yun Tang
Hi

在前面的邮件里面,已经提示可以使用 async-profiler [1] 
来观察RocksDB的内部相关调用栈,这样能看到cpu是否在等待IO。另外iostat等工具可以看磁盘压力如何。至于数据倾斜,可以去看rocksDB的db目录大小,或者看各个subtask的input
 bytes大小,看看是否task间存在数据倾斜。


[1] https://github.com/jvm-profiling-tools/async-profiler

祝好
唐云

From: jindy_liu <286729...@qq.com>
Sent: Friday, December 11, 2020 11:24
To: user-zh@flink.apache.org 
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

感谢指正!这里我验证了下你的说法,确实我理解有误了,我以为这个参数write buffer count以及max write
buffer是taskmanager所有的slots的。从web
ui来看,确实是很多算子都出现了is_stop_write。你的推断是正确的,老的配置参数下,看了下,确实经常出现is_stop_write=1的情况,然后线程就阻塞sleep了。

昨天调整了一版参数:改了下Slot为2,还是6台机器,job并发度设置成12。结果是之前的阻写没有了。跑一晚上10个小时左右,能跑21000W每张表的速度了,并且现在看也没有阻写的情况,硬盘的读写iops与util都很低,基本没有。但这个距离上线还是有差据,也就是6台机器只能处理5000/s的数据性能,有点低。

taskmanager.numberOfTaskSlots: 2
taskmanager.memory.process.size: 20480m
taskmanager.memory.managed.fraction: 0.75
taskmanager.memory.network.fraction: 0.2
taskmanager.memory.network.max: 4gb  #算子大概需要3G左右的network buf
taskmanager.memory.network.min: 128mb

#7G per slot writer + read
state.backend.rocksdb.thread.num: 4
state.backend.rocksdb.writebuffer.count: 20
state.backend.rocksdb.writebuffer.size: 256M   #5G
state.backend.rocksdb.writebuffer.number-to-merge: 1
state.backend.rocksdb.block.cache-size: 2000M  #2112M
state.backend.rocksdb.block.blocksize: 64K
state.backend.rocksdb.localdir: /opt/flink/rocksdb
state.backend.rocksdb.files.open: 9

查看些cpu很忙的机器,jstack发现性能开销都在读上了(跑21000W后),花在rocksdb.get读上较多,怎么看是读的内存还是磁盘来的?我看cpu比较忙的机器上,磁盘io读,基本没有了。看rocksdb本地dir所挂的ssd磁盘上的状态文件大小3台在7GB左右,别外3台在3GB左右(这里没法在web
ui上看checkpointed datasize大小,目前由于没有成功过 ,mysql-cdc-connector会一直超时失败)

@yuntang 这里看看rocksdb上还有提升空间和任务总体性能上还能有提升?
(
但出现1,2机器的cpu负载明显比其它低的情况,这个感觉可能还有另外一个问题,存在些倾斜???!!!
因为有些AGG算子,我开了代码调整了些,开了minibatch
configuration.setString("table.exec.mini-batch.enabled", "true");
configuration.setString("table.exec.mini-batch.allow-latency", "10 s");
configuration.setString("table.exec.mini-batch.size", "1");
)







--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink-1.11.2 rocksdb when trigger savepoint job fail and restart

2020-12-10 文章 Yun Tang
Hi

请问你的TM是单slot吧,managed memory是多少? RocksDB state 
backend在执行savepoint的时候,会使用一个iterator来遍历数据,所以会存在额外的内存开销(并且该部分开销并不属于write 
buffer与block 
cache管理的部分),当然RocksDB的iterator是一个多层的最小堆iterator,理论上来说占用的临时内存并不会太多。不知你们能否将程序抽象成一个必现的demo来给我们做debug呢?

至于如何解决该问题,可以考虑增大JVM overhead [1] 来增大该部分的buffer空间。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12//deployment/memory/mem_setup.html

祝好
唐云

From: smailxie 
Sent: Thursday, December 10, 2020 17:42
To: user-zh@flink.apache.org 
Subject: flink-1.11.2 rocksdb when trigger savepoint job fail and restart







我有一个sql job,跑的任务是双流jion,状态保留12 �C 
24小时,checkpoint是正常的,状态大小在300M到4G之间,当手动触发savepoint时,容器会被杀死,原因是超出内存限制(申请的内存是单slot 
5G)。

我想问的是,rocksdb,在savepiont时,是把所有的磁盘状态读入内存,然后再全量快照?

如果是这样,后续版本有没有优化?不然每次磁盘状态超过托管内存,一手动savepoint,job就会被杀死。

下面是报错信息。



2020-12-10 09:18:50

java.lang.Exception: Container 
[pid=33290,containerID=container_e47_1594105654926_6890682_01_02] is 
running beyond physical memory limits. Current usage: 5.1 GB of 5 GB physical 
memory used; 7.4 GB of 10.5 GB virtual memory used. Killing container.

Dump of the process-tree for container_e47_1594105654926_6890682_01_02 :

  |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) 
SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE

  |- 4 33290 33290 33290 (java) 787940 76501 7842340864 1337121 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address=shd177.yonghui.cn -Dpipeline.classpaths= -Dweb.port=0 
-Dexecution.target=embedded 
-Dweb.tmpdir=/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479 
-Djobmanager.rpc.port=44058 
-Dpipeline.jars=file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar
 -Drest.address=shd177.yonghui.cn 
-Dsecurity.kerberos.login.keytab=/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab

  |- 33290 33288 33290 33290 (bash) 0 0 108679168 318 /bin/bash -c 
/usr/java/default/bin/java -Xmx1234802980 -Xms1234802980 
-XX:MaxDirectMemorySize=590558009 -XX:MaxMetaspaceSize=268435456 
-Dlog.file=/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.log
 -Dlog4j.configuration=file:./log4j.properties 
-Dlog4j.configurationFile=file:./log4j.properties 
org.apache.flink.yarn.YarnTaskExecutorRunner -D 
taskmanager.memory.framework.off-heap.size=134217728b -D 
taskmanager.memory.network.max=456340281b -D 
taskmanager.memory.network.min=456340281b -D 
taskmanager.memory.framework.heap.size=134217728b -D 
taskmanager.memory.managed.size=2738041755b -D taskmanager.cpu.cores=5.0 -D 
taskmanager.memory.task.heap.size=1100585252b -D 
taskmanager.memory.task.off-heap.size=0b --configDir . 
-Djobmanager.rpc.address='shd177.yonghui.cn' -Dpipeline.classpaths='' 
-Dweb.port='0' -Dexecution.target='embedded' 
-Dweb.tmpdir='/tmp/flink-web-c415ad8e-c019-4398-869d-7c9e540c2479' 
-Djobmanager.rpc.port='44058' 
-Dpipeline.jars='file:/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/yh-datacenter-platform-flink-1.0.0.jar'
 -Drest.address='shd177.yonghui.cn' 
-Dsecurity.kerberos.login.keytab='/data1/yarn/nm/usercache/xiebo/appcache/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_01/krb5.keytab'
 1> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.out
 2> 
/data6/yarn/container-logs/application_1594105654926_6890682/container_e47_1594105654926_6890682_01_02/taskmanager.err



Container killed on request. Exit code is 143

Container exited with a non-zero exit code 143



发送自 Windows 10 版邮件应用









--

Name:谢波
Mobile:13764228893


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-10 文章 Yun Tang
Hi

首先需要纠正一点的是,RocksDB的实际可用内存并不是你以为的13GB,因为从Flink-1.10 开始引入的 managed memory 
[1][2],会将slot上的RocksDB的实际可用内存限制在 managed memory / number of 
slots,也就是说对于你配置的10个slot,20GB的process memory,0.75的managed fraction,真实的per slot 
managed memory其实只有不到1.5GB,也就是说你配置的write buffer count以及max write 
buffer啥的并没有真正“生效”。RocksDB的write buffer manager会提前将write buffer 
置为immutable并flush出去。应该增大 managed memory / number of slots 
来增大单个slot内多个RocksDB的共享可用内存,来确保RocksDB的可用实际内存真的有效。
从你的栈看,很多时候卡在了数据put上,我怀疑是遇到了写阻塞 (write stall) [3],可以用async-profiler [4] 
来观察RocksDB的内部相关调用栈。
另外,可以开启rocksDB的native metrics [5][6],观察RocksDB的写是不是经常被阻塞


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/memory/mem_tuning.html#rocksdb-state-backend
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/ops/state/large_state_tuning.html#tuning-rocksdb-memory
[3] https://github.com/facebook/rocksdb/wiki/Write-Stalls
[4] https://github.com/jvm-profiling-tools/async-profiler
[5] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-actual-delayed-write-rate
[6] 
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html#state-backend-rocksdb-metrics-is-write-stopped

祝好
唐云

From: jindy_liu <286729...@qq.com>
Sent: Thursday, December 10, 2020 16:22
To: user-zh@flink.apache.org 
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

补充一个,当我把state.backend.rocksdb.writebuffer.count: 48调小到10的话,

jstack来看,从https://spotify.github.io/threaddump-analyzer/分析来看

top类的方法基本都在rocksdb的io上了。并且很多线程都在等待








--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 Yun Tang
Hi

Operator state 
本身也并不是线程安全的,只是往常的读写都是持有checkpoint锁的task主线程或者checkpoint异步线程,所以才能做到数据安全,SourceFunction文档里面也强调需要在获得checkpointLock的前提下更新state。

至于如何开启Flink中的RocksDB的native metrics,之前给你的文档链接里面有描述,相关的配置项设为true即可。

祝好
唐云

From: bradyMk 
Sent: Thursday, December 10, 2020 11:44
To: user-zh@flink.apache.org 
Subject: Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

谢谢大佬解答~最近一直在看相关的知识,我还有两个问题在网上没有找到解答,想咨询一下:

1、如果我不用keyed State,而改用Operator State,Operator
State是所有线程操作一个state么?如果这样,那Operator State是线程安全的么?

2、您之前说的配置 RocksDB 的native
metrics,我在官网看到这些指标都是禁用的,那该如何开启呢?我在代码里貌似没有找到相关方法开启各类RocksDB 的native metrics;




-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

2020-12-09 文章 Yun Tang
Hi

FsStateBackend 在性能上是比 RocksDBStateBackend 
好,这个是符合预期的。不过想要获得高性能的话,需要更多的jvm堆上内存,但是大内存场景下的GC会很痛苦,所以并不是说加内存之后,性能可以线性增长。

现在还有一个问题是你的状态有多大呢,可以去有状态的节点上看DB的大小(通过增量checkpoint的checkpointed data 
size也可以间接推出),看CPU使用情况,看磁盘的iostat,来找到具体的瓶颈在哪里。

祝好
唐云


From: Jark Wu 
Sent: Thursday, December 10, 2020 11:04
To: user-zh 
Cc: Yun Tang 
Subject: Re: 关于flink cdc的N流join状态后端的选择问题: ‎FsStateBackend和‎RocksDBStateBackend

关于 rocksdb 的性能调优, @Yun Tang<mailto:myas...@live.com> 会更清楚。

On Thu, 10 Dec 2020 at 11:04, Jark Wu 
mailto:imj...@gmail.com>> wrote:
建议大状态还是用 rocksdb,生产上会稳定很多。你说的这个量级感觉相差比较大,可能还没有对 rocksdb 调优导致的。

你可以参考下这几篇文章尝试调优下 rocksdb:

https://mp.weixin.qq.com/s/YpDi3BV8Me3Ay4hzc0nPQA
https://mp.weixin.qq.com/s/mjWGWJVQ_zSVOgZqtjjoLw
https://mp.weixin.qq.com/s/ylqK9_SuPKBKoaKmcdMYPA
https://mp.weixin.qq.com/s/r0iPPGWceWkT1OeBJjvJGg


Best,
Jark

On Wed, 9 Dec 2020 at 12:19, jindy_liu 
<286729...@qq.com<mailto:286729...@qq.com>> wrote:
场景上:

目前都是mysql里的带主键的表(亿级别)的join操作后,得到的实时宽表(视图)上做一些规则筛选或计数等,并且场景上状态(join算子)都基上上不设置TTL。

目前mysql中的都是些有主键数据,且量级不会有太大的变化,并且可以预见,可能一年也就增加个200w左右,但表里的数据变更较频繁。所以处理上需要吞吐量较大,延时低。
目前测试了一版本flink
sql算子使用Rocksdb做后端发现吞吐与延时都比较大,一条数据变化,可能要10秒中才能生效,但换FsStateBackend时,速度就很快了,性能较好;两者相差10倍多。

 所以产生以下想法,不知道可不可行?

1、用FsStateBackend:想堆机器,把任务里的用到的状态都用FsStateBackend放内存来搞(基于表的主键数有限,状态应该也是有限的)。
2、还用rocksdb:尽量把flink的manager内存(rocksdb用的)搞得较大,把读写rocksdb都尽量在内存中进行。
目前对flink sql生成的算子的状态情况不太熟悉,想问题下这两种方法是否可行呢?





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-09 文章 Yun Tang
State本身不是线程安全的 
[1],但是目前对于state的更新都是在task主线程内,而task主线程是线程安全的。除非通过一些特别的方式,例如异步的metrics线程用户逻辑下访问state导致的state写更新副作用,一般是不会出现写错的问题。

[1] https://issues.apache.org/jira/browse/FLINK-13072

祝好
唐云

From: bradyMk 
Sent: Tuesday, December 8, 2020 17:59
To: user-zh@flink.apache.org 
Subject: Re: 回复:flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

好的,谢谢大佬解答~



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 Yun Tang
配置一下 RocksDB 的native metrics,看下block cache以及 write buffer的实际使用内存。
另外,Flink中一个state会使用一个RocksDB的column family,而write buffer和block cache是一套column 
family 一套,所以你的operator 内的state数目多,slot内的keyed operator多,都会导致内存成倍增长


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#rocksdb-native-metrics

祝好
唐云

From: bradyMk 
Sent: Monday, December 7, 2020 17:05
To: user-zh@flink.apache.org 
Subject: Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

Hi~

可是我这边write buffer以及block cache等参数设置的都不大,都远远小于我分给tm的内存,可为什么还会报超出内存的错误呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

2020-12-07 文章 Yun Tang
RocksDB只是将数据可以存储在磁盘上,Flink再周期性将磁盘上数据上传到HDFS,内存中还是有LSM的write buffer以及block 
cache,也还是需要使用内存的

建议升级Flink版本到1.10+,引入了managed memory功能,理论上对于内存控制是要好很多的。


祝好
唐云

From: bradyMk 
Sent: Monday, December 7, 2020 11:27
To: user-zh@flink.apache.org 
Subject: Re: 答复: flink使用RocksDB增量checkpoints,程序运行一段时间报出:超出物理内存

hi~谢谢解答;

但我的状态用的是RocksDB,实质上不应该是存的磁盘么?为什么会一直占用tm的内存呢?



-
Best Wishes
--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

2020-11-23 文章 Yun Tang
Hi

集群负载比较大的时候,下游一直收不到request的partition,就会导致PartitionNotFoundException,建议增大 
taskmanager.network.request-backoff.max [1][2] 以增大重试次数

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-network-request-backoff-max
[2] https://juejin.cn/post/6844904185347964942#heading-8


祝好
唐云

From: 赵一旦 
Sent: Monday, November 23, 2020 13:08
To: user-zh@flink.apache.org 
Subject: Re: Flink任务启动偶尔报错PartitionNotFoundException,会自动恢复。

这个报错和kafka没有关系的哈,我大概理解是提交任务的瞬间,jobManager/taskManager机器压力较大,存在机器之间心跳超时什么的?
这个partition应该是指flink运行图中的数据partition,我感觉。没有具体细看,每次提交的瞬间可能遇到这个问题,然后会自动重试成功。

zhisheng  于2020年11月18日周三 下午10:51写道:

> 是不是有 kafka 机器挂了?
>
> Best
> zhisheng
>
> hailongwang <18868816...@163.com> 于2020年11月18日周三 下午5:56写道:
>
> > 感觉还有其它 root cause,可以看下还有其它日志不?
> >
> >
> > Best,
> > Hailong
> >
> > At 2020-11-18 15:52:57, "赵一旦"  wrote:
> > >2020-11-18 16:51:37
> > >org.apache.flink.runtime.io
> .network.partition.PartitionNotFoundException:
> > >Partition
> > b225fa9143dfa179d3a3bd223165d5c5#3@3fee4d51f5a43001ef743f3f15e4cfb2
> > >not found.
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >RemoteInputChannel.failPartitionRequest(RemoteInputChannel.java:267)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >RemoteInputChannel.retriggerSubpartitionRequest(RemoteInputChannel.java:166)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> > >SingleInputGate.retriggerPartitionRequest(SingleInputGate.java:521)
> > >at org.apache.flink.runtime.io.network.partition.consumer.
> >
> >
> >SingleInputGate.lambda$triggerPartitionStateCheck$1(SingleInputGate.java:765
> > >)
> > >at
> java.util.concurrent.CompletableFuture.uniAccept(CompletableFuture
> > >.java:670)
> > >at java.util.concurrent.CompletableFuture$UniAccept.tryFire(
> > >CompletableFuture.java:646)
> > >at java.util.concurrent.CompletableFuture$Completion.run(
> > >CompletableFuture.java:456)
> > >at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> > >at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(
> > >ForkJoinExecutorConfigurator.scala:44)
> > >at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >at
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool
> > >.java:1339)
> > >at
> > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> > >at
> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread
> > >.java:107)
> > >
> > >
> > >请问这是什么问题呢?
> >
>


Re: Re:flink内存超用问题

2020-11-09 文章 Yun Tang
Hi

可以通过增大 "taskmanager.memory.jvm-overhead.max" [1] 以及  
"taskmanager.memory.process.size" [2] 来增大可以超用的内存空间。可以通过观察 
"state.backend.rocksdb.metrics.block-cache-pinned-usage" [3] 
的数值看rocksDB使用的native memory是否超过managed memory。


[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-jvm-overhead-max
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#taskmanager-memory-process-size
[3]https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-metrics-block-cache-pinned-usage
祝好
唐云


From: hailongwang <18868816...@163.com>
Sent: Sunday, November 8, 2020 20:03
To: user-zh@flink.apache.org 
Subject: Re:flink内存超用问题

Hi Bob,
 可以设置下参数 'state.backend.rocksdb.memory.fixed-per-slot' [1] 看下有没有效果。
[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#state-backend-rocksdb-memory-fixed-per-slot


Best,
Hailong Wang




在 2020-11-08 10:50:29,"元始(Bob Hu)" <657390...@qq.com> 写道:
>请教下,我有个flink任务经常因为内存超用被yarn 集群kill,不知道该怎么排查问题,flink版本1.11.0,启动命令为:
>bin/flink run -m yarn-cluster -yjm 2048m -ytm 8192m -ys 2 
>xxx.jar,使用rocksdb状态后端,设置的参数有taskmanager.memory.managed.fraction=0.6;taskmanager.memory.jvm-overhead.fraction=0.2。下面是某个时刻flink页面的taskmanage统计。请问内存超用可能是来自什么地方呢,感觉程序中并没用用到第三方jar使用大量native,自己程序里也没有用native内存的地方
>
>
>Free Slots / All Slots:0 / 2
>CPU Cores:24
>Physical Memory:251 GB
>JVM Heap Size:1.82 GB
>Flink Managed Memory:4.05 GB
>
>Memory
>
>
>JVM (Heap/Non-Heap)
>
>
>Type
>Committed
>Used
>Maximum
>
>Heap1.81 GB1.13 GB1.81 GB
>Non-Heap169 MB160 MB1.48 GB
>Total1.98 GB1.29 GB3.30 GB
>
>
>
>
>
>Outside JVM
>
>
>Type
>Count
>Used
>Capacity
>
>Direct24,493718 MB718 MB
>Mapped00 B0 B
>
>
>
>
>
>
>Network
>
>
>Memory Segments
>
>
>Type
>Count
>
>Available21,715
>Total22,118
>
>
>
>
>
>Garbage Collection
>
>
>Collector
>Count
>Time
>
>PS_Scavenge19917,433
>PS_MarkSweep44,173


Re: Checkpoint size的问题

2020-10-29 文章 Yun Tang
Hi

web UI显示的是增量上传数据量,包括各个task上传的数据,而_metadata 
只是一个元数据,是由JM上传的,所以不能将_metadata与checkpoint UI显示的数据量划等号。

祝好
唐云

From: gsralex 
Sent: Wednesday, October 28, 2020 19:17
To: user-zh@flink.apache.org 
Subject: Checkpoint size的问题

Hi, All
Checkpoint 一般Web UI显示的是400MB左右,但是查看HDFS实际的大小,不到1MB(_metadata) 
,想问下这之间size的偏差为什么这么大?


Re: 回复:请问现在Flink支持动态扩缩容吗?

2020-10-20 文章 Yun Tang
Hi

Flink-1.8 之前支持通过rest命令进行扩缩容 
[1],不过后来在重构时该功能被disable了[2]。当然这个功能距离动态扩缩容还是有差距的,可以理解成是从外部进行扩缩容的基础。
目前在阿里巴巴的企业版中,有名为libra的动态扩缩容插件 [3] 提供相关功能。


[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#jobs-jobid-rescaling
[2] https://issues.apache.org/jira/browse/FLINK-12312
[3] https://developer.aliyun.com/article/727376

祝好
唐云


From: 熊云昆 
Sent: Tuesday, October 20, 2020 7:15
To: 林影 
Cc: user-zh@flink.apache.org 
Subject: 回复:请问现在Flink支持动态扩缩容吗?

目前还不支持吧


| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月19日 18:22,林影 写道:
请问现在Flink支持动态扩缩容吗,或者说社区在这方面有什么计划吗?


Re: flink-windows-state

2020-10-13 文章 Yun Tang
Hi

这里涉及到的问题比较多。

  1.  为什么心跳会超时?是因为Full 
GC么,如果是使用的FsStateBackend/MemoryStateBackend,这是比较好解释的,因为数据在JVM堆上。如果使用的是RocksDB,这里是解释不通的。
  2.  window确实是使用state来存储数据,如果认为自己的state太大的话,是不是因为使用不当呢?可以参考文档 [1] 进行调优
  3.  仍在运行的TM里面在做什么呢,为什么没有被JM释放,需要检查相关孤儿TM的日志以及jstack查看进程操作判断。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#useful-state-size-considerations

祝好
唐云


From: 孟小鹏 <602012...@qq.com>
Sent: Tuesday, October 13, 2020 18:51
To: user-zh 
Subject: Re: flink-windows-state

贴代码看看?



发自我的iPhone


-- Original --
From: 熊云昆 

Re: 回复:rocksdb增量ckeckpoint问题

2020-10-10 文章 Yun Tang
Hi,云昆

首先,如果Congxian回答的,sst文件是不变的,所以就有了“增量”的前提,如果多个checkpoint均包含某个sst文件,那么该文件可以在多个checkpoint间共享,这些文件表示是属于shared的文件,存储在shared目录下,可以参考文档[1]。

这并不与保留一个checkpoint不一致,增量checkpoint的含义是上传的数据是增量的,但是每一个checkpoint均是完整的,没有上传的重复数据依赖于之前checkpoint上传过,所以这些文件才会存储在“shared”目录下,意指可以被多个checkpoint共享。而state.checkpoints.num-retained
 的含义是保留的完整checkpoint个数,当一个旧的checkpoint不再需要时,我们会把不共享的文件清理掉。

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#directory-structure

祝好
唐云


From: 熊云昆 
Sent: Saturday, October 10, 2020 15:52
To: Congxian Qiu 
Cc: user-zh ; 宁吉浩 
Subject: 回复:rocksdb增量ckeckpoint问题

原来的sst文件存在原来的chk-***下面,意味着原来的chk-***不会被删除,这与stat.checkpoints.num-retained保留1个checkpoint不一致吧?




| |
熊云昆
|
|
邮箱:xiongyun...@163.com
|

签名由 网易邮箱大师 定制

在2020年10月09日 10:47,Congxian Qiu 写道:
Hi
  增量 checkpoint 是指,每次只上传的 *必须的* sst 文件。因为 RocksDB 生成的 sst 文件是不可变的,所以之前上传过的
sst 文件直接引用即可,这样减少了很多 sst 文件的上传(也减少了 HDFS 的存储和删除等操作)

Best,
Congxian


宁吉浩  于2020年10月9日周五 上午10:20写道:

> 没看过源码,看过一些文档,结论还需验证(应该不用了)。
> 增量checkpoint指的是
> 把内存中的state写入hdfs的时不全量写入,而是写入和上次checkpoint不一致的地方,hdfs底层文件的话会有依赖关系。也就是说本次的依赖上一次,上一次的依赖上上一次。
>
> 底部还有一个逻辑是定期合并checkpoint,这个是操作hdfs文件的,checkpoint保留个数可以配置,实际上hdfs上也不会存储太多checkpoint,就是合并这些state。
> 如下是官网连接:
> https://flink.apache.org/features/2018/01/30/incremental-checkpointing.html
>
> 大胆猜测:
> 先写入增量state,然后等待时机和之前的state合并,由于只保留一个checkpoint,所以每次都是触发合并逻辑。
> checkpoint-state是增量
> 但每次都要把之前的state进行合并
>
>
> --
> 发件人:熊云昆 
> 发送时间:2020年10月6日(星期二) 16:53
> 收件人:user-zh@flink.apache.org 
> 主 题:rocksdb增量ckeckpoint问题
>
> Hi,
>
> 有个rocksdb增量checkpoint的问题不明白,如果state.checkpoints.num-retained默认设置为1,意味着checkpoint默认只保留1个,那么在增量checkpoint的时候,它是无法引用上一个checkpoint的备份的sst文件的,其实还是相当于全量备份了,对不对?
>
>
> | |
> 熊云昆
> |
> |
> 邮箱:xiongyun...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: flink1.11.2 在k8s上部署,如何启动history server

2020-10-10 文章 Yun Tang
Hi

可以在yaml文件中覆盖原始的 ENTRYPOINT 启动命令 [1]
或者可以参考 FLINK-17167 [2] 中的修改更改原始Dockerfile中的docker-entrypoint.sh


[1] 
https://kubernetes.io/zh/docs/tasks/inject-data-application/define-command-argument-container/
[2] https://issues.apache.org/jira/browse/FLINK-17167

祝好
唐云

From: chenxuying 
Sent: Saturday, October 10, 2020 15:56
To: user-zh@flink.apache.org 
Subject: flink1.11.2 在k8s上部署,如何启动history server

flink1.11.2 在k8s上部署,如何启动history server
之前1.10的yaml里面可以加命令,但是1.11的yaml是通过docker-entrypoint.sh
好像没发现这个入口脚本没有对应的history server参数



Re: flink savepoint和checkpoint相关事项

2020-10-09 文章 Yun Tang
Hi

在修改作业的情况下,从checkpoint恢复的主要措施是确保相关operator设置了uid[1],恢复state的时候添加了允许non-restored 
state [2] 恢复

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: zjfpla...@hotmail.com 
Sent: Friday, October 9, 2020 8:59
To: user-zh 
Subject: flink savepoint和checkpoint相关事项

Hi,
flink的savepoint,checkpoint在任务如何修改的情况下还能沿用,如何修改下就只能重新消费了



zjfpla...@hotmail.com


Re: checkpoint问题

2020-09-16 文章 Yun Tang
Hi

checkpoint使用operator id进行一一映射进行恢复,请参照 
设置id[1],以及如果checkpoint中存在某个operator但是修改后的作业并不存在该operator时的处理逻辑[2]

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#assigning-operator-ids
[2] 
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/state/savepoints.html#allowing-non-restored-state

祝好
唐云

From: smq <374060...@qq.com>
Sent: Thursday, September 17, 2020 7:02
To: user-zh 
Subject: checkpoint问题

如果我的程序逻辑修改,还能用之前的checkpoint吗


Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-06 文章 Yun Tang
Hi

首先,transient ListState 这种用法绝对是没有问题的,你可以在Flink源码中找到大量的使用例子 [1]

可以排查的思路

  1.  你的state是否开启了TTL呢
  2.  能否在写入的时候,进行打印,然后在get会空iterator时进行对比,确认之前是写入成功的。
  3.  目前使用的state backend是什么,更换一种state backend,问题还能复现么

[1] 
https://github.com/apache/flink/blob/25b54a5e261d8dbc8133d0fdc9cae2e653af1ea7/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/keyed/KeyedJob.java#L158

祝好
唐云

From: Liu Rising 
Sent: Sunday, September 6, 2020 17:45
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi 唐云

以下是state定义以及初始化的code

public class FlinkKeyedProcessFunction extends KeyedProcessFunction, Tuple2> {

private static final Logger LOG =
LoggerFactory.getLogger(FlinkKeyedProcessFunction.class);

  ...

private final ParameterTool params;
private transient ListState unmatchedProbesState;

...

FlinkKeyedProcessFunction(ParameterTool params) {
this.params = params;
}

@Override
public void open(Configuration parameters) {

ListStateDescriptor descriptor = new
ListStateDescriptor<>(
"unmatchedProbes", TypeInformation.of(ObjectNode.class)
);
unmatchedProbesState = getRuntimeContext().getListState(descriptor);

以下是往state里add内容的部分
   ...

List unmatchedProbes =
mapMatching.getUnMatchedProbes(id);
unmatchedProbesState.clear();

if (unmatchedProbes.size() > 0) {
try {
unmatchedProbesState.addAll(unmatchedProbes);
} catch (Exception e) {
LOG.warn("Continue processing although failed to add
unmatchedProbes to ListState. ID: " + id, e);
}
}

   ...

以下是从state读取的code

for (ObjectNode unmatchedProbe :
unmatchedProbesState.get()) {
LOG.info("Processing unmatched probe: " +
unmatchedProbe);
matchedValues.addAll(mapMatching.matchLocation(id,
unmatchedProbe));
}


之前的问题是,从state读取时对于一部分key,unmatchedProbesState.get()返回空值。(实际有值)
去掉定义state那里的transient之后,上述问题不再出现。

谢谢。
Rising




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

2020-09-03 文章 Yun Tang
Hi

Please use English to ask questions in user mailing list. I have added this 
thread to user-zh mailing list, if you would like to reply this thread again, 
please remove user mailing list in senders.

When talking about the question how to handle job manager failure in k8s, you 
could consider jobmanager high availability[1] and you could refer to [2] for 
plans of HighAvailabilityService based on native k8s APIs.

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/jobmanager_high_availability.html
[2] https://issues.apache.org/jira/browse/FLINK-12884

Best
Yun Tang





From: dty...@163.com 
Sent: Friday, September 4, 2020 11:04
To: user 
Subject: flink on k8s 如果jobmanager 所在pod重启后job失败如何处理

请教一个问题。在使用k8s 部署的flink 集群,如果jobmanger 重启后,1)job所在的jar包会清除,jobmanager 
找不到这个job的jar 包,2)正在运行的job也会取消,重启后的jobmanager 如何找到之前运行的job


dty...@163.com


Re: Flink如何实现至多一次(At Most Once)

2020-09-03 文章 Yun Tang
Hi

如果是完全依赖source的offset管理,可以达到类似 at most once 的语义。

社区其实也有更完备的checkpoint at most once 的实现讨论,已经抄送了相关的开发人员 
@yuanmei.w...@gmail.com

祝好
唐云

From: Paul Lam 
Sent: Thursday, September 3, 2020 17:28
To: user-zh 
Subject: Re: Flink如何实现至多一次(At Most Once)

如果每次都从最新的数据开始读的话,关掉 checkpoint 是可以达到 At Most Once。
另外建议还要看看 sink 有没有自动重试机制,可能造成数据重复。

Best,
Paul Lam

> 2020年9月2日 19:16,Tianwang Li  写道:
>
> 我们有一些场景,对实时性要求高,同时对数据重复会有比较大大影响。
> 我想关闭checkpoint,这样是不是能不能保证“至多一次” (At Most Once) ?
> 这里会不会有什么坑?
> 另外:我们允许丢失数据。
>
>
> --
> **
> tivanli
> **



Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

2020-09-03 文章 Yun Tang
Hi

我觉得这个不是root cause,实际上 transient ListState 
是一种正确的用法,因为state应该是在函数open方法里面进行初始化,所以transient 修饰即可。

麻烦把这个list state的初始化以及使用方法的代码都贴出来吧。

祝好
唐云

From: Liu Rising 
Sent: Thursday, September 3, 2020 12:26
To: user-zh@flink.apache.org 
Subject: Re: 从Savepoint/Checkpoint恢复时 keyedstate中部分数据无法被读取

Hi

找到原因了。

问题在于在定义ListState时使用了transient关键字,如下。
 private transient ListState state;

去掉了transient之后,问题解决。
虽然不太清粗为何transient会造成这种情况。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink文档

2020-08-28 文章 Yun Tang
Hi

SQL解析不通过的可以在 https://issues.apache.org/jira/projects/FLINK/issues 
里面创建相关ticket指明出来,很快会有相关开发来帮助的。
不过需要注意的是,需要用英文进行阐述。

祝好
唐云


From: Dream-底限 
Sent: Friday, August 28, 2020 16:42
To: user-zh@flink.apache.org 
Subject: flink文档

hi、
哪位大佬可以把flink官方文档中的函数部分完善一下啊,函数下面配个应用方式可好,看文档我都不知道下面函数是怎么用的,有的可以直接用有的sql解析不通过,还得一个一个测。。。
Temporal functionsDescription

DATE string

Returns a SQL date parsed from *string* in form of "-MM-dd".

TIME string

Returns a SQL time parsed from *string* in form of "HH:mm:ss".

TIMESTAMP string

Returns a SQL timestamp parsed from *string* in form of "-MM-dd
HH:mm:ss[.SSS]".

INTERVAL string range

Parses an interval *string* in the form "dd hh:mm:ss.fff" for SQL intervals
of milliseconds or "-mm" for SQL intervals of months. An interval range
might be DAY, MINUTE, DAY TO HOUR, or DAY TO SECOND for intervals of
milliseconds; YEAR or YEAR TO MONTH for intervals of months.

E.g., INTERVAL '10 00:00:00.004' DAY TO SECOND, INTERVAL '10' DAY, or INTERVAL
'2-10' YEAR TO MONTH return intervals.

CURRENT_DATE

Returns the current SQL date in the UTC time zone.

CURRENT_TIME

Returns the current SQL time in the UTC time zone.

CURRENT_TIMESTAMP

Returns the current SQL timestamp in the UTC time zone.

LOCALTIME

Returns the current SQL time in local time zone.

LOCALTIMESTAMP

Returns the current SQL timestamp in local time zone.


Re: flink prometheus 无法上报accumulator类型监控吗

2020-08-28 文章 Yun Tang
Hi

没有名为 accumulator 的metrics类型数据,目前只有Counters, Gauges, Histograms 和 Meters [1] 
这四种,如果你想要用累积型metrics,可以考虑counters

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/monitoring/metrics.html#metric-types

祝好
唐云


From: 赵一旦 
Sent: Friday, August 28, 2020 10:53
To: user-zh@flink.apache.org 
Subject: Re: flink prometheus 无法上报accumulator类型监控吗

hi,有人回答下这个问题吗。

赵一旦  于2020年8月21日周五 下午4:20写道:

> 如题,没找到accumulator类型数据,metric之类找到了,但是accumulator类没找到。
>


Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

2020-08-27 文章 Yun Tang
Hi 范超

虽然看不到你的图,但是你的启动命令错误了,所有的options应该放在jar包文件地址前面[1]

  1.  class name 应该在 jar包地址前面 [2]
  2.  savepoint/checkpoint 地址应该在jar包地址前面 [3]

没有正确从checkpoint恢复的原因应该是这个原因

[1] https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#usage
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/cli.html#job-submission-examples
[3] 
https://ci.apache.org/projects/flink/flink-docs-stable/ops/state/checkpoints.html#resuming-from-a-retained-checkpoint

祝好
唐云


From: zilong xiao 
Sent: Friday, August 28, 2020 11:45
To: user-zh 
Subject: Re: 从savepoint 启动以后,无法在checkpoint页面看到last restore的相关信息

图挂了,用图床工具贴链接吧

范超  于2020年8月28日周五 上午11:37写道:

> Hi, 大家好
>
> Flink版本 1.10.0
>
>
>
> 目前程序的checkpoint使用rocksdb的方式存储在hdfs上,在sink失败的时候能够正常从上一个checkpoint点恢复。
>
> 问题是由于升级程序,我使用了命令行
>
> *bin/flink stop -p ${hdfsSavepointDir} -d $runningJobId -yid $yarnAppId*
>
>
>
> 将savepoint文件保存,然后再使用保存的savepoint来启动程序
>
> */bin/flink run -d -m yarn-cluster -p ${parallelism} -yjm ${jm} -ytm ${tm}
> $fullJarPath -s $savePointFullPath �Cc xxx*
>
>
>
> 比较无法理解的是,jm和tm日志都显示成功启动,但是无法看到从checkpoint恢复的记录如下图所示:
>
>
>
> 有知道的大佬知道是不是我哪里处理不正常么?
>


Re: [ANNOUNCE] New PMC member: Dian Fu

2020-08-27 文章 Yun Tang
Congratulations , Dian!

Best
Yun Tang

From: Yang Wang 
Sent: Friday, August 28, 2020 10:28
To: Arvid Heise 
Cc: Benchao Li ; dev ; user-zh 
; Dian Fu ; user 

Subject: Re: [ANNOUNCE] New PMC member: Dian Fu

Congratulations Dian !


Best,
Yang

Arvid Heise mailto:ar...@ververica.com>> 于2020年8月28日周五 
上午1:39写道:
Congrats Dian :)

On Thu, Aug 27, 2020 at 5:01 PM Benchao Li 
mailto:libenc...@apache.org>> wrote:
Congratulations Dian!

Cranmer, Danny  于2020年8月27日周四 下午10:55写道:
Congratulations Dian! :D

On 27/08/2020, 15:25, "Robert Metzger" 
mailto:rmetz...@apache.org>> wrote:

CAUTION: This email originated from outside of the organization. Do not 
click links or open attachments unless you can confirm the sender and know the 
content is safe.



Congratulations Dian!

On Thu, Aug 27, 2020 at 3:09 PM Congxian Qiu 
mailto:qcx978132...@gmail.com>> wrote:

> Congratulations Dian
> Best,
> Congxian
>
>
> Xintong Song mailto:tonysong...@gmail.com>> 
于2020年8月27日周四 下午7:50写道:
>
> > Congratulations Dian~!
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Aug 27, 2020 at 7:42 PM Jark Wu 
mailto:imj...@gmail.com>> wrote:
> >
> > > Congratulations Dian!
> > >
> > > Best,
> > > Jark
> > >
> > > On Thu, 27 Aug 2020 at 19:37, Leonard Xu 
mailto:xbjt...@gmail.com>> wrote:
> > >
> > > > Congrats, Dian!  Well deserved.
> > > >
> > > > Best
> > > > Leonard
> > > >
> > > > > 在 2020年8月27日,19:34,Kurt Young 
mailto:ykt...@gmail.com>> 写道:
> > > > >
> > > > > Congratulations Dian!
> > > > >
> > > > > Best,
> > > > > Kurt
> > > > >
> > > > >
> > > > > On Thu, Aug 27, 2020 at 7:28 PM Rui Li 
mailto:lirui.fu...@gmail.com>>
> > wrote:
> > > > >
> > > > >> Congratulations Dian!
> > > > >>
> > > > >> On Thu, Aug 27, 2020 at 5:39 PM Yuan Mei 
mailto:yuanmei.w...@gmail.com>>
> > > > wrote:
> > > > >>
> > > > >>> Congrats!
> > > > >>>
> > > > >>> On Thu, Aug 27, 2020 at 5:38 PM Xingbo Huang 
mailto:hxbks...@gmail.com>
> >
> > > > wrote:
> > > > >>>
> > > > >>>> Congratulations Dian!
> > > > >>>>
> > > > >>>> Best,
> > > > >>>> Xingbo
> > > > >>>>
> > > > >>>> jincheng sun 
mailto:sunjincheng...@gmail.com>> 于2020年8月27日周四 
下午5:24写道:
> > > > >>>>
> > > > >>>>> Hi all,
> > > > >>>>>
> > > > >>>>> On behalf of the Flink PMC, I'm happy to announce that Dian Fu
> is
> > > now
> > > > >>>>> part of the Apache Flink Project Management Committee (PMC).
> > > > >>>>>
> > > > >>>>> Dian Fu has been very active on PyFlink component, working on
> > > various
> > > > >>>>> important features, such as the Python UDF and Pandas
> > integration,
> > > > and
> > > > >>>>> keeps checking and voting for our releases, and also has
> > > successfully
> > > > >>>>> produced two releases(1.9.3&1.11.1) as RM, currently working 
as
> > RM
> > > > to push
> > > > >>>>> forward the release of Flink 1.12.
> > > > >>>>>
> > > > >>>>> Please join me in congratulating Dian Fu for becoming a Flink
> PMC
> > > > >>>>> Member!
> > > > >>>>>
> > > > >>>>> Best,
> > > > >>>>> Jincheng(on behalf of the Flink PMC)
> > > > >>>>>
> > > > >>>>
> > > > >>
> > > > >> --
> > > > >> Best regards!
> > > > >> Rui Li
> > > > >>
> > > >
> > > >
> > >
> >
>



--

Best,
Benchao Li


--

Arvid Heise | Senior Java Developer

[https://lh5.googleusercontent.com/ODbO0aq1IqKMfuoy_pw2YH8r6dqDRTq37rg3ytg11FCGJx12jJ1ff_SANPBxTHzSJTUQY9JLuoXq4NB7Om7j6Vq1lg6jIOKz8S5g2VKDGwicbj5fbY09PVb6mD5TdRuWEUvEMZTG]<https://www.ververica.com/>


Follow us @VervericaData

--

Join Flink Forward<https://flink-forward.org/> - The Apache Flink Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--

Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji (Toni) 
Cheng


Re: 回复: 流处理任务中checkpoint失败

2020-08-27 文章 Yun Tang
Hi Robert

你的两个source 
firstSource和secondSource是自己实现的么,一种怀疑是source在没有数据时持锁[1][2]导致checkpoint 
barrier并没有下发。
建议使用jstack查看在没有数据下发时,source相关task的java调用栈,观察是否存在等待锁释放

[1] 
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java#L916
[2] 
https://github.com/apache/flink/blob/162e3ead63e5766cf2116af09922a88537889e53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTaskActionExecutor.java#L92

祝好
唐云

From: Robert.Zhang <173603...@qq.com>
Sent: Wednesday, August 26, 2020 22:17
To: user-zh 
Subject: 回复: 流处理任务中checkpoint失败

Hi


代码大致如下:
DataStream broad=env.readFrom(...).broad;
DataStream firstSource=env.readFrom(...);
DataStream secondSource=env.readFrom(...);


DataStream union=firstSource.union(secondSource);
IterativeStream iterativeStream=union.keyby(...).process(...).iterate();

DataStream result=iterativeStream.closeWith(
   
   
   
iterativeStream
   
   
   
.keyby(...)
   
   
   
.connect(broad)
   
   
   
.process(...));
result.addSink(...);


是否是代码的书写上有问题呢?不胜感激,Thanks all




--原始邮件--
发件人:
"user-zh"   
 
https://zhuanlan.zhihu.com/p/87131964
  Best,
  Congxian
 
 
  Robert.Zhang <173603...@qq.comgt; 于2020年8月21日周五 下午6:31写道:
 
  gt; Hello all,
  gt; 目前遇到一个问题,在iterative stream job
  gt; 使用checkpoint,按照文档进行了相应的配置,测试过程中checkpoint几乎无法成功
  gt; 测试state
 很小,只有几k,依然无法成功。会出现org.apache.flink.util.FlinkRuntimeException:
  gt; Exceeded checkpoint tolerable failure threshold.的报错
  gt;
  gt;
  gt; 配置如下:
  gt; env.enableCheckpointing(1, 
CheckpointingMode.EXACTLY_ONCE,
 true);
  gt; CheckpointConfig checkpointConfig = 
env.getCheckpointConfig();
  gt; checkpointConfig.setCheckpointTimeout(60);
  gt; checkpointConfig.setMinPauseBetweenCheckpoints(6);
  gt; checkpointConfig.setMaxConcurrentCheckpoints(4);
  gt;
  gt;
 
 
checkpointConfig.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
  gt; checkpointConfig.setPreferCheckpointForRecovery(true);
  gt; checkpointConfig.setTolerableCheckpointFailureNumber(2);
  gt; checkpointConfig.enableUnalignedCheckpoints();
  gt;
  gt;
  gt; 任务只处理几条数据,未存在反压的情况。有遇到类似问题的老哥吗?



Re: 1.11.2大概什么时候发布

2020-08-26 文章 Yun Tang
可以参照 https://flink.apache.org/downloads.html#all-stable-releases 
的历史发布记录,一般是3个月左右,也就是大约10月底。

1.11.2 有什么特别期待的bug fix么?

祝好
唐云




From: abc15...@163.com 
Sent: Wednesday, August 26, 2020 15:41
To: user-zh@flink.apache.org 
Subject: 1.11.2大概什么时候发布

1.11.2大概什么时候发布?


Re: flink checkpoint导致反压严重

2020-08-25 文章 Yun Tang
Hi

对于已经改为at least once的checkpoint,其在checkpoint时对于作业吞吐的影响只有task 
同步阶段的snapshot,这个时间段的snapshot由于与task的主线程的数据访问持有同一把锁,会影响主线程的数据处理。但是就算这样,我也很怀疑checkpoint本身并不是导致早上10点高峰期无法运行的罪魁祸首。
使用异步的,支持增量的state backend (例如RocksDBStateBackend)会大大缓解该问题。
建议排查思路:

  1.  检查使用的state backend类型
  2.  检查是否存在sync阶段checkpoint用时久的问题(可以观察WEB UI上的checkpoint部分,看sync阶段的耗时)
  3.  借助jstack等工具,检查执行checkpoint时,TM上的task执行逻辑,判断是哪里消耗CPU

祝好
唐云

From: LakeShen 
Sent: Wednesday, August 26, 2020 10:00
To: user-zh 
Subject: Re: flink checkpoint导致反压严重

Hi zhanglachun,

你们使用 checkpoint 的方式是什么?是否有 full gc 的情况呢

Best,
LakeShen

徐骁  于2020年8月26日周三 上午2:10写道:

> input
>   .keyBy()
>   .timeWindow()
>   .reduce(new MyReduceFunction(), new MyProcessWindowFunction());
>
> 可以看一下官网的 reduce 用法, 可以在数据进入窗口的时候进行处理(排序), 而没必要等到 window 结束后再进行 这样可以把时间分散到
> window 里面
>


Re: [ANNOUNCE] Apache Flink 1.10.2 released

2020-08-25 文章 Yun Tang
Thanks for Zhu's work to manage this release and everyone who contributed to 
this!

Best,
Yun Tang

From: Yangze Guo 
Sent: Tuesday, August 25, 2020 14:47
To: Dian Fu 
Cc: Zhu Zhu ; dev ; user 
; user-zh 
Subject: Re: [ANNOUNCE] Apache Flink 1.10.2 released

Thanks a lot for being the release manager Zhu Zhu!
Congrats to all others who have contributed to the release!

Best,
Yangze Guo

On Tue, Aug 25, 2020 at 2:42 PM Dian Fu  wrote:
>
> Thanks ZhuZhu for managing this release and everyone else who contributed to 
> this release!
>
> Regards,
> Dian
>
> 在 2020年8月25日,下午2:22,Till Rohrmann  写道:
>
> Great news. Thanks a lot for being our release manager Zhu Zhu and to all 
> others who have contributed to the release!
>
> Cheers,
> Till
>
> On Tue, Aug 25, 2020 at 5:37 AM Zhu Zhu  wrote:
>>
>> The Apache Flink community is very happy to announce the release of Apache 
>> Flink 1.10.2, which is the first bugfix release for the Apache Flink 1.10 
>> series.
>>
>> Apache Flink® is an open-source stream processing framework for distributed, 
>> high-performing, always-available, and accurate data streaming applications.
>>
>> The release is available for download at:
>> https://flink.apache.org/downloads.html
>>
>> Please check out the release blog post for an overview of the improvements 
>> for this bugfix release:
>> https://flink.apache.org/news/2020/08/25/release-1.10.2.html
>>
>> The full release notes are available in Jira:
>> https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12347791
>>
>> We would like to thank all contributors of the Apache Flink community who 
>> made this release possible!
>>
>> Thanks,
>> Zhu
>
>


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-25 文章 Yun Tang
Hi

这种思路我觉得是可以尝试的,不过目前看需要改动的地方很多:

  1.  需要更改RocksDB 创建checkpoint 到TiKV的代码逻辑
  2.  需要改动RocksDB 从checkpoint resume的代码逻辑
  3.  
如果想要数据可以TiKV可以读取,那么TiKV中存储的格式要么与RocksDB内存储的一样,那这样子的话,lookup时候,需要能够反序列化Flink在RocksDB中的存储格式;要么是重新的格式,但这样子会导致RocksDB的checkpoint流程和时间都会增长。
  4.  TiKV中的数据的更新依赖于checkpoint interval,不能做到实时更新

其实queryable state 也是一个可以实现你们类似目的的方式,不确定你们是否可以尝试。

祝好
唐云

From: wxpcc 
Sent: Tuesday, August 25, 2020 17:05
To: user-zh@flink.apache.org 
Subject: Re: 有没有可能使用tikv作为flink 分布式的backend

感谢解答

就像你说的,的确可以 用lookup方式实现一部分公用kv的共享

我的理解现有的 rocksdb backend 为:rocksdb+hdfs , 如果是变成:rocksdb+tikv ,这样在一些应用过程中产生的
kv指标数据最终会存储到 tikv之中,外部也有可能访问到,通过 lookup的方式,不知道这样是否可行





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 有没有可能使用tikv作为flink 分布式的backend

2020-08-24 文章 Yun Tang
Hi

TiKV 本身就是分布式的,多副本的,可以类比HBase,所以不是将其向Flink内置的state 
backend靠拢,而是向Flink读写HBase靠拢,这样若干写TiKV的Flink作业就做到了数据共享。

如果想将TiKV向Flink 
state-backend靠拢,TiKV本身的分布式架构,多副本机制,网络传输(而不是本地磁盘访问)都是缺点或者说不再必要存在的特性。
最后就会演化成现在Flink + RocksDB state-backend的架构,更何况TiKV就是基于RocksDB的,整体意义不是很大。

祝好
唐云

From: Congxian Qiu 
Sent: Monday, August 24, 2020 20:17
To: user-zh 
Subject: Re: 有没有可能使用tikv作为flink 分布式的backend

Hi
   StateBackend 可以理解为 一个 KV 存储加上一个 snapshot 过程,其中 snapshot 过程负责将当前 KV
存储的数据进行备份。理论上任何的 KV 存储都是有可能作为 StateBackend 的,不过增加一种 StateBackend 的话,需要实现相应的
snapshot/restore 逻辑。

   但是在多个 Flink 作业中实现共享的 state 这个在 Flink 中是不支持的。
Best,
Congxian


wxpcc  于2020年8月21日周五 下午6:33写道:

> 项目里有部分需要进行状态共享的需求,多个flink 任务之间
>
> 如题,tikv本身基于rocksdb 是否有可能扩展成为分布式 backend
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: state序列化问题

2020-08-21 文章 Yun Tang
Hi

其实你的问题就是MapState中的value本身是java的map结构,也就是对应MapStateDescriptor里面的valueSerializer是否需要区分显示声明成HashMap类型,这个取决于你的value
 serializer实现,如果你用的是Flink内置的MapSerializer[1],没必要声明成HashMap类型。


[1] 
https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/MapSerializer.java


祝好
唐云


From: shizk233 
Sent: Friday, August 21, 2020 10:51
To: user-zh@flink.apache.org 
Subject: Re: state序列化问题

抱歉,是我表述不清楚,ListState>只是举个例子,并不是我的应用场景实际的状态。
从实际考虑,我想利用MapState保存一系列特殊的计数器Map,也就是MapState>,主要用来做一个伪窗口,key是窗口的开始时间。

主要想知道,在MapStateDescriptor声明类型信息时,我是否应该把内部Map声明成明确的HashMap类型,而不是Map类型?

Yun Tang  于2020年8月21日周五 上午12:13写道:

> Hi
>
> 如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是
> ListState, 而不是
> ListState>,后者表示有一个list,list中的每一个元素均是一个list
>
> ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。
>
> 祝好
> 唐云
> 
> From: shizk233 
> Sent: Thursday, August 20, 2020 18:00
> To: user-zh@flink.apache.org 
> Subject: state序列化问题
>
> Hi all,
>
> 请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
> 那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
> 会对类型信息提取产生不良影响吗?
>
> 按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
> 但是都可以作为List>来声明。
>
> 请求野生的大佬支援一下!
>


Re: state序列化问题

2020-08-20 文章 Yun Tang
Hi

如果想要在list中保存String,也就是list中的每个元素格式是String,ListState的格式应该是 ListState, 
而不是 ListState>,后者表示有一个list,list中的每一个元素均是一个list

ListState 本身并不属于java的collection,所以不存在ArrayList 与 LinkedList的区别。

祝好
唐云

From: shizk233 
Sent: Thursday, August 20, 2020 18:00
To: user-zh@flink.apache.org 
Subject: state序列化问题

Hi all,

请教一下,State应该是通过StateDescriptor提取的类型信息来序列化/反序列化,
那么如果声明为接口类型,如ListState>,但实际存入的是ArrayList/LinkedList,
会对类型信息提取产生不良影响吗?

按我的理解,ArrayList和LinkedList在序列化时的bytes组成结构应该是不太一样的。
但是都可以作为List>来声明。

请求野生的大佬支援一下!


Re: 增量che ckpoint

2020-08-20 文章 Yun Tang
Hi

增量checkpoint与web界面的信息其实没有直接联系,增量checkpoint的信息记录由CheckpointCoordinator中的SharedStateRegistry[1]
 进行计数管理,而保留多少checkpoint则由 CheckpointStore管理 [2].
保留2个checkpoint的执行过程如下:
chk-1 completed --> register chk-1 in state registry --> add to checkpoint store
chk-2 completed --> register chk-2 in state registry --> add to checkpoint store
chk-3 completed --> register chk-3 in state registry --> add to checkpoint 
store --> chk-1 subsumed --> unregister chk-1 in state registry --> discard 
state with reference=0
chk-4 completed --> register chk-4 in state registry --> add to checkpoint 
store --> chk-2 subsumed --> unregister chk-2 in state registry --> discard 
state with reference=0

从上面可以看懂整个执行流程,所以当chk-3 
仍然有部分数据依赖chk-1时,那些state数据在unregister时,其计数统计并不会降为0,也就不会删掉,也不需要copy到本次中。


[1] 
https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L192
[2] 
https://github.com/apache/flink/blob/f8ce30a50b8dd803d4476ea5d83e7d48708d54fa/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java#L41

祝好
唐云



From: 赵一旦 
Sent: Thursday, August 20, 2020 10:50
To: user-zh@flink.apache.org 
Subject: Re: 增量che ckpoint

等其他人正解。下面是我的猜测:
保留2个检查点是web界面保留2个检查点,增量情况下,这2个检查点所有引用到的所有历史检查点肯定都不会被删除。
因此第3个检查点的时候,只有2,3检查点仍然引用了1,则1就不会被删除。

superainbower  于2020年8月20日周四 上午10:46写道:

> hi,请教大家一个问题,开启了增量checkpoint,同时checkpoint的个数设置为只保留2个,那么如果当前是第三次checkpoint
> 仍然依赖第一次的checkpoint会出现什么情况,会把第一次的copy过来到本次中吗?如过第一次不删除,不是会不满足保留2个的限制吗


Re: 能否考虑针对检查点和保存点设置不同的超时时间

2020-08-19 文章 Yun Tang
Hi

你的这个需求其实社区早已经有相关ticket [1]了,不过这个需求一直不是很强烈,毕竟大多数时候可以通过增大checkpoint 
timeout即可,增大checkpoint timeout不代表着也会增大checkpoint占据的资源。

[1] https://issues.apache.org/jira/browse/FLINK-9465

祝好
唐云

From: 赵一旦 
Sent: Tuesday, August 18, 2020 14:38
To: user-zh@flink.apache.org 
Subject: 能否考虑针对检查点和保存点设置不同的超时时间

出发点是,检查点超时失败啥的其实并不是很重要,高峰时间有时候就是会超时失败,但并不会明显反压,因此没关系。但是,有时候需要重启任务,用保存点,那么高峰时期就是无法生成保存点,然后任务失败自动从上一次检查点恢复。这导致本身高峰时期,重启在停的过程失败导致回滚了近10分(检查点周期)。

有一种思路是直接将超时设置更长时间,但这也不行,因为检查点本身是占据资源的,设置短超时就是不希望检查点占据过多资源,相当于超时完成不了就不要继续了。

但是保存点却是人工介入,需要去重启任务,可能是bug或者什么原因必须重启任务。但高峰时间按照正常设置的超时可能就是无法完成保存点。


Re: flink 1.11 web ui请教

2020-08-19 文章 Yun Tang
Hi


  1.  框图的数量是因为默认启用了operator chain导致的,至于连接线上的文字(例如hash)则是由网络连接方式决定了[2]
  2.  record received 为0 是因为这个指标表征了数据在Flink 
的channel内收到的record数量,由于source节点并没有从Flink channel获取数据(往往是从外部系统获取),所以自然record 
received为0

[1] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#task-chaining-and-resource-groups
[2] 
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/#physical-partitioning

祝好
唐云


From: wxpcc 
Sent: Wednesday, August 19, 2020 16:10
To: user-zh@flink.apache.org 
Subject: Re: flink 1.11 web ui请教

environment.disableOperatorChaining()



--
Sent from: http://apache-flink.147419.n8.nabble.com/


  1   2   >