flink table api 或者 sql 使用 自定义含有的state方法

2020-11-23 文章
大家好:
请问,因为当前flink sql或者flink table
中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink
sql了,只能自己使用flink datastream去硬编码,请问,flink
sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink
job?


Re: Flink keyby数据倾斜问题

2020-04-05 文章
你好 
   可以参考一下这个链接的思路 

https://blog.csdn.net/IT_Lee_J_H/article/details/88641894

发自我的iPhone

> 在 2020年4月4日,18:15,chanamper  写道:
> 
> Dear All,
>大家好,请教一下。目前针对Java Api的方式,对于Flink keyby情况存在key数据倾斜有啥实现优化思路吗?看官方文档目前在table 
> api和sql层面,有Minibatch Aggregation和Local Global Aggregation方式的实现,针对Java 
> Api的方式有啥办法可以达到local global aggregation的效果吗?
>  多谢!


回复: 回复:如何获取算子处理一条数据记录的时间

2020-01-06 文章
你可以在算子中计算,然后上传到自定义的Flink Metrics中,这样就能看到平均一个算子的时间了.

发件人: 张江
发送时间: 2020年1月2日 19:18
收件人: user-zh
抄送: user-zh
主题: 回复:如何获取算子处理一条数据记录的时间

我其实是想知道算子的数据处理能力,得到一个算子每秒钟最多能处理多少条数据。比如说map算子,我需要知道它一秒钟最多能转换多少数据,之后根据source端的数据量来设置算子的并行度




| |
张江
|
|
邮箱:zjkingdom2...@163.com
|

签名由 网易邮箱大师 定制

在2020年01月02日 10:28,猫猫 写道:
只有如下算子支持测流输出。

ProcessFunction

CoProcessFunction

ProcessWindowFunction

ProcessAllWindowFunction

如果要计时的话,需要将你的逻辑全部放到相关函数中。在逐条处理的时候,记录数据ID和时间,写成一个新的数据流并行输出出去。
但实际上我们很少这样做,因为很难将所有逻辑全部放到一个算子中。


比较常见的方式是,在整体上数据有流入和流出,所以增加流入和流出算子,在数据经过的时候,记录数据ID,并形成新的数据流,然后写入到数据库进行合并就行了。
这种可以考虑采样的方式,例如根据ID取模获取数据,但缺点是监控嵌入了执行逻辑,并且必须有并行度为1的统计算子,可能会影响性能。

更为合理的看法是,当数据量不堆积的时候,时间也不是问题。
我们只要看kafka-offset的消费速度就行了。一定时间消费多少条,平均下来就是速率的。适用于压满性能的时候用。

可能还有更好的处理方式,我还没有了解,flink好像自身也有一定的监控能力。
但你的需求到底是什么?你为什么要知道一条数据的处理时间?而不是一批数据的处理时间?
你关心每个算子的时间,还是关心数据整体的处理时间?还是关心某个业务的执行时间?

提供一下场景为佳。






--原始邮件--
发件人:"张江"

slot询问

2019-12-27 文章
大家好:
   我在的Flink是在yarn上跑,在yarn上部署了个yarn-session,命令如下:
./yarn-session.sh -jm 5120m -tm 10240m   -s 30 -d -st

  
这里我是设置了一个tm上面跑30个slot,但是我在1.8版本的时候,是在yarn中会看到的是一个tm是使用一个cpu,但是我切换到了1.9.1后,发现是一个slot使用一个cpu,导致我设置的并行超出了yarn中的cpu资源。请问在1.9.1中,如果降低这个cpu使用率?


回复: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章
Hi 唐云
好的,我重新启动一个yarn 
session来查找原因,不过我早上把RocksDBState后端改成了FSState后端后,就没有出现过任何异常了。

发件人: Yun Tang
发送时间: 2019年10月11日 15:27
收件人: user-zh
主题: Re: 回复: flink 缓存本地文件被删除疑问

hi  嘉诚

这个异常是因为你的task 被cancel了,所以清理了相关目录,结果导致硬链时候原始目录下文件找不到了。
换言之,这个task ”累积数据 -> Sink: 写入到HBase (12/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)” 
是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job 
manager日志中找到第一个fail的task,那上面的异常才是root cause。

祝好
唐云

From: 戴嘉诚 
Sent: Friday, October 11, 2019 15:17
To: user-zh 
Subject: Re: 回复: flink 缓存本地文件被删除疑问

Hi
   这是早上发生异常后,我下载的日志,请麻烦查看一下。
 taskmanager.log
<https://drive.google.com/file/d/17nP8yxSpdAnDDgBEbEUrDXYx-rwosi52/view?usp=drive_web>

Yun Tang  于2019年10月11日周五 下午2:56写道:

> Hi 戴嘉诚
>
> 你的异常发生在failover时,rocksDB恢复的时候,会从hdfs下载文件到临时restore目录,然后再做硬链到DB目录下。原先已经下载到临时restore目录的文件在做硬链的时候,突然报出原始文件找不到了
> [1]。这个异常的出现比较奇怪,能否先share一下出问题的taskmanager的所有日志?
>
>
> [1]
> https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/restore/RocksDBIncrementalRestoreOperation.java#L473
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Friday, October 11, 2019 14:54
> To: user-zh@flink.apache.org 
> Subject: 回复: flink 缓存本地文件被删除疑问
>
> Hi,
> 我在代码中已经是显式的给每个算子都设置了uid了
>
> 发件人: Qi Kang
> 发送时间: 2019年10月11日 14:48
> 收件人: user-zh@flink.apache.org
> 主题: Re: flink 缓存本地文件被删除疑问
>
> Hi,
>
> 从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID ->
> 状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids
>
>
> > On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> >
> > 大家好:
> >最近我的程序迁移到了flink1.9 on yarn
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> > java.lang.Exception: Exception while creating StreamOperatorStateContext.
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> > at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> > at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> > at java.lang.Thread.run(Thread.java:748)
> > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6)
> from any of the 1 provided restore options.
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> > ... 6 more
> > Caused by: org.apache.flink.runtime.state.BackendBuildingException:
> Caught unexpected exception.
> > at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> > at
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> > at
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> > at
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> > ... 8 more
> > Caused by: java.nio.file.NoSuchFileException:
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
> ->
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431

回复: 回复: flink1.9 webui exception日志显示问题

2019-10-11 文章
Hi

是彻底的看不到任何一条历史异常。

发件人: Yun Tang
发送时间: 2019年10月11日 14:43
收件人: user-zh@flink.apache.org
主题: Re: 回复: flink1.9 webui exception日志显示问题

hi

Web上保存的历史异常数目是有限的,只会保存20个 
[1],如果更旧的异常被冲掉了,直接去jobmanager日志里面检索异常信息吧。另外,你的问题是彻底看不到任何一条历史异常,还是看不到最老的历史异常?


[1] 
https://github.com/apache/flink/blob/cda6dc0c44239aa7a36105988328de5744aea125/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L54

祝好
唐云


From: 戴嘉诚 
Sent: Thursday, October 10, 2019 16:26
To: user-zh@flink.apache.org 
Subject: 回复: flink1.9 webui exception日志显示问题

+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了

发件人: 李杰
发送时间: 2019年10月10日 14:41
收件人: user-zh@flink.apache.org
主题: flink1.9 webui exception日志显示问题

log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息






回复: flink 缓存本地文件被删除疑问

2019-10-11 文章
Hi,
我在代码中已经是显式的给每个算子都设置了uid了

发件人: Qi Kang
发送时间: 2019年10月11日 14:48
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

Hi,

从报错看,可能是因为自动生成的算子ID改变,无法取得state backend里存储的状态。检查点维护的是[算子ID -> 
状态]的映射关系,所以Flink推荐用uid()方法给算子显式设置ID,才能保证正常恢复现场。可以参考文档[1]。

[1] 
https://ci.apache.org/projects/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids


> On Oct 11, 2019, at 11:00, 戴嘉诚  wrote:
> 
> 大家好:
>   最近我的程序迁移到了flink1.9 on yarn 
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> java.lang.Exception: Exception while creating StreamOperatorStateContext.
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
> at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed 
> state backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from 
> any of the 1 provided restore options.
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
> ... 6 more
> Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
> unexpected exception.
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
> at 
> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
> at 
> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
> ... 8 more
> Caused by: java.nio.file.NoSuchFileException: 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
>  -> 
> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
> at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
> at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
> at 
> sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
> at java.nio.file.Files.createLink(Files.java:1086)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
> at 
> org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
> at 
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
> ... 12 more
> 
> 
> 我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
> 这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…
> 
> 




回复: flink 缓存本地文件被删除疑问

2019-10-10 文章
你好,我的任务是用RocksDB存储的Checkpoint, 是运行了一段时间后报的这个错误

发件人: pengchenglin
发送时间: 2019年10月11日 11:59
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问

你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》
 
发件人: 戴嘉诚
发送时间: 2019-10-11 11:00
收件人: user-zh@flink.apache.org
主题: flink 缓存本地文件被删除疑问
大家好:
最近我的程序迁移到了flink1.9 on yarn 
session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
java.lang.Exception: Exception while creating StreamOperatorStateContext.
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:195)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:253)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:878)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:392)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state 
backend for WindowOperator_3572e2ca02acab3056013a7c1c4fa645_(6/6) from any of 
the 1 provided restore options.
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:307)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:135)
... 6 more
Caused by: org.apache.flink.runtime.state.BackendBuildingException: Caught 
unexpected exception.
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:326)
at 
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:520)
at 
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:291)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142)
at 
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121)
... 8 more
Caused by: java.nio.file.NoSuchFileException: 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/db/000835.sst
 -> 
/data/hadoop/yarn/local/usercache/hdfs/appcache/application_1569065892199_0031/flink-io-6bcd299a-00e7-4f6a-8466-09d8d13d831d/job_add9718b9fe3e5054577d8b9e5799829_op_WindowOperator_3572e2ca02acab3056013a7c1c4fa645__6_6__uuid_53528904-c829-431a-ba0a-b76041cd2d2b/14634105-83be-429e-9cdc-36dbff14a7fa/000835.sst
at sun.nio.fs.UnixException.translateToIOException(UnixException.java:86)
at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
at 
sun.nio.fs.UnixFileSystemProvider.createLink(UnixFileSystemProvider.java:476)
at java.nio.file.Files.createLink(Files.java:1086)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreInstanceDirectoryFromPath(RocksDBIncrementalRestoreOperation.java:473)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOperation.java:212)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreFromRemoteState(RocksDBIncrementalRestoreOperation.java:188)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restoreWithoutRescaling(RocksDBIncrementalRestoreOperation.java:162)
at 
org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation.restore(RocksDBIncrementalRestoreOperation.java:148)
at 
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:270)
... 12 more
 
 
我这里的session中是有多个job在session上运行,但是这里job都是写入hbase中的。
这里看上去是本地机器上的目录文件被删除了?但是这个文件我是没有删除过的…
 
 



回复: flink1.9 webui exception日志显示问题

2019-10-10 文章
+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了

发件人: 李杰
发送时间: 2019年10月10日 14:41
收件人: user-zh@flink.apache.org
主题: flink1.9 webui exception日志显示问题

log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息





flink 命令行疑问

2019-09-28 文章

大家好:

我的flink代码打包的jar包是放到了hdfs上面,但是当我在flink中用命令行执行的时候,flink本地是否只能解析本地jar包?不能解析到hdfs上面的jar包?

我把jar包下载到服务器本地后,就可以执行成功了


我的命令是:
./bin/flink run  -yid application_1567652112073_0001   -p 6 -yj 
hdfs://ysec-storage/flink/runJar/business-security-1.0-SNAPSHOT.jar --appId 
act_test



返回的结果是:
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in 
[jar:file:/data/flink/flink-1.9.0/lib/slf4j-log4j12-1.7.15.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in 
[jar:file:/usr/hdp/2.6.5.0-292/hadoop/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2019-09-29 11:48:15,686 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-hdfs.
2019-09-29 11:48:15,686 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli 
- Found Yarn properties file under /tmp/.yarn-properties-hdfs.
Could not build the program from JAR file.


答复: Re: 如何优化flink内存?

2019-09-05 文章
对,我这边使用的也是相同的操作

发件人: 陈赋赟
发送时间: 2019年9月5日 16:08
收件人: user-zh@flink.apache.org
主题: Re:Re: 如何优化flink内存?

HI
  我在项目中有遇到过类似的情况,我说下我的想法和思路。
  伊始是需要统计90天事件窗口中用户浏览事件总数,如果是在近30天内有浏览事件则累加1次,在30天内没有浏览事件但在 30天 ~ 
90天内有其他浏览事件则记0次(需求比较奇葩),我们使用了滑动窗口(长度90天 步长1天 
数据进来实时trigger触发计算)因为需要拿到窗口的结束时间所以一开始是用windowProcessFunction去做的聚合统计,这意味着90个窗口每个窗口里都需要缓存着全部的数据而不是一个聚合汇总数据,在线上跑了两天后发现checkpoint
 size已经陡增到20个G并且不久就OOM了。后面想了一下,Flink 
提供的SlideWindow的算法不是闭包可以直接复用,用flatmap对每条数据使用slideWindow得出这条数据对应的90天的窗口结束时间,然后在keyby后使用ProcessFunction,在里面自定义valueState对数据进行聚合汇总,并且在processFunction内部还可以访问TimeService,可以注册清理过期state数据的Timer,并在onTimer回调方法中清理状态。
 以上是我的思路,希望能帮助到你~




祝好





在 2019-09-05 13:43:00,"Yifei Qi"  写道:
>你的意思是自己去实现滑动窗口的功能么?
>
>戴嘉诚  于2019年9月4日周三 下午10:51写道:
>
>> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>>
>> Yifei Qi 于2019年9月4日 周三20:07写道:
>>
>> > 大家好:
>> >
>> >
>> >
>> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>> >
>> >
>> >
>> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>> >
>> >
>> >
>> > 具体情况是这样的:
>> >
>> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>> >
>> > 按照用户进行分组.
>> >
>> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>> >
>> >
>> >
>> >
>> >
>> > flink运行在3个节点后, 内存合计就用了5G.
>> >
>> >
>> >
>> >
>> >
>> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>> >
>> >
>> >
>> >
>> >
>> > 顺祝商祺
>> >
>> >
>> > --
>> >
>> >
>> > Qi Yifei
>> > [image: https://]about.me/qyf404
>> > <
>> >
>> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
>> > >
>> >
>>
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
><https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api>



答复: 如何优化flink内存?

2019-09-05 文章
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口


发件人: Yifei Qi
发送时间: 2019年9月5日 13:42
收件人: user-zh@flink.apache.org
主题: Re: 如何优化flink内存?

你的意思是自己去实现滑动窗口的功能么?

戴嘉诚  于2019年9月4日周三 下午10:51写道:

> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
> Yifei Qi 于2019年9月4日 周三20:07写道:
>
> > 大家好:
> >
> >
> >
> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> > 具体情况是这样的:
> >
> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> > 按照用户进行分组.
> >
> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> > flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> > 顺祝商祺
> >
> >
> > --
> >
> >
> > Qi Yifei
> > [image: https://]about.me/qyf404
> > <
> >
> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
> > >
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404
<https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api>



Re: 如何优化flink内存?

2019-09-04 文章
这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存

Yifei Qi 于2019年9月4日 周三20:07写道:

> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
> flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
> 顺祝商祺
>
>
> --
>
>
> Qi Yifei
> [image: https://]about.me/qyf404
> <
> https://about.me/qyf404?promo=email_sig_source=product_medium=email_sig_campaign=gmail_api
> >
>


Streaming File Sink疑问

2019-09-03 文章
大家好:
我在看到streamingFileSink 
中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢



Flink编译问题

2019-08-20 文章

大家好:

我这里用的cdh6.3.0版本进行hadoop管理。所以我根据官网上的显示,对flink的源码根据cdh6.3.0重新编译打包,但是在打包过程中,貌似发现了个问题:

[ERROR] Failed to execute goal 
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile 
(default-testCompile) on project flink-yarn_2.11: Compilation failure
[ERROR] 
/data1/flink/flink/flink-yarn/src/test/java/org/apache/flink/yarn/AbstractYarnClusterTest.java:[89,41]
 no suitable method found for 
newInstance(org.apache.hadoop.yarn.api.records.ApplicationId,org.apache.hadoop.yarn.api.records.ApplicationAttemptId,java.lang.String,java.lang.String,java.lang.String,java.lang.String,int,,org.apache.hadoop.yarn.api.records.YarnApplicationState,,,long,long,org.apache.hadoop.yarn.api.records.FinalApplicationStatus,,,float,,)
[ERROR] method 
org.apache.hadoop.yarn.api.records.ApplicationReport.newInstance(org.apache.hadoop.yarn.api.records.ApplicationId,org.apache.hadoop.yarn.api.records.ApplicationAttemptId,java.lang.String,java.lang.String,java.lang.String,java.lang.String,int,org.apache.hadoop.yarn.api.records.Token,org.apache.hadoop.yarn.api.records.YarnApplicationState,java.lang.String,java.lang.String,long,long,long,org.apache.hadoop.yarn.api.records.FinalApplicationStatus,org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport,java.lang.String,float,java.lang.String,org.apache.hadoop.yarn.api.records.Token)
 is not applicable
[ERROR]   (actual and formal argument lists differ in length)
[ERROR] method 
org.apache.hadoop.yarn.api.records.ApplicationReport.newInstance(org.apache.hadoop.yarn.api.records.ApplicationId,org.apache.hadoop.yarn.api.records.ApplicationAttemptId,java.lang.String,java.lang.String,java.lang.String,java.lang.String,int,org.apache.hadoop.yarn.api.records.Token,org.apache.hadoop.yarn.api.records.YarnApplicationState,java.lang.String,java.lang.String,long,long,org.apache.hadoop.yarn.api.records.FinalApplicationStatus,org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport,java.lang.String,float,java.lang.String,org.apache.hadoop.yarn.api.records.Token,java.util.Set,boolean,org.apache.hadoop.yarn.api.records.Priority,java.lang.String,java.lang.String)
 is not applicable
[ERROR]   (actual and formal argument lists differ in length)
[ERROR] method 
org.apache.hadoop.yarn.api.records.ApplicationReport.newInstance(org.apache.hadoop.yarn.api.records.ApplicationId,org.apache.hadoop.yarn.api.records.ApplicationAttemptId,java.lang.String,java.lang.String,java.lang.String,java.lang.String,int,org.apache.hadoop.yarn.api.records.Token,org.apache.hadoop.yarn.api.records.YarnApplicationState,java.lang.String,java.lang.String,long,long,long,org.apache.hadoop.yarn.api.records.FinalApplicationStatus,org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport,java.lang.String,float,java.lang.String,org.apache.hadoop.yarn.api.records.Token,java.util.Set,boolean,org.apache.hadoop.yarn.api.records.Priority,java.lang.String,java.lang.String)
 is not applicable
[ERROR]   (actual and formal argument lists differ in length)

Ps : 我执行的命令是:mvn clean install -DskipTests -Pvendor-repos 
-Dhadoop.version=3.0.0-cdh6.3.0
这个hadoop是3.0版本的



答复: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 文章
你好,
可以通过flink的restFul去调用保存savepoint

发件人: liu zhongfeng
发送时间: 2019年8月9日 20:28
收件人: user-zh@flink.apache.org
主题: 恢复savepoint,除了命令行,能通过代码获取吗?

如题,restore savepoint,除了run flink -s 
savepointpath之外,能通过代码恢复吗,因为公司集群没法输入命令行。如果可以的话,能给个小demo,或者API也可以
谢谢。

Best,
Rio Liu, 刘中锋




Re: AsyncIO 用Redis做缓存

2019-08-06 文章
你好,
可以用lettuce做异步客户端,排除lettuce的netty依赖,用flink的netty,就可以了集成lettuce了

王佩 于2019年8月6日 周二22:11写道:

> 这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。
>
> 王佩  于2019年8月6日周二 下午10:09写道:
>
> > 需求: 事实表实时Join Kudu中的维度表,用来补全维度。
> >
> > 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。
> >
> > 遇到的问题:
> > 1、不用Redis缓存,checkpoint很快,效率很高。
> > 2、用Redis缓存,用Jedis,但不用连接池,效率很低。
> > 3、用Redis缓存,用Redis连接池,效率更低。
> >
> > 请教下:
> > 1、从Kudu中取数据,不用缓存可以吗。
> > 2、在AsyncIO中,用lettuce异步客户端,和flink netty不兼容。
> > 3、在AsyncIO中,用Jedis连接池,flink checkpoint很慢的原因。
> > 3、像我这种场景: 流(实时表) Join Kudu中的维度表,怎么才能更好的提高性能。
> >
> > 烦请解答下,辛苦,感谢。
> >
> >
> >
>


答复: jobmanager 日志异常

2019-08-06 文章
你好,
谢谢!已经找到原因了 

发件人: Biao Liu
发送时间: 2019年8月6日 13:55
收件人: user-zh
主题: Re: jobmanager 日志异常

你好,

> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED
> SIGNAL 15: SIGTERM. Shutting down as requested.

这是收到了 signal 15 了 [1],Wong 说得对,搜一下 yarn node manager 或者 yarn resource
manager 的 log

1. https://access.redhat.com/solutions/737033

Thanks,
Biao /'bɪ.aʊ/



On Tue, Aug 6, 2019 at 12:30 PM Wong Victor 
wrote:

> Hi,
>   可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill;
>
> Regards
>
> On 2019/8/6, 11:40 AM, "戴嘉诚"  wrote:
>
> 大家好:
>
>
>
> 我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
> gc和频繁的gc,以下是jobmanager的日志:
> 就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?
>
> 2019-08-06 06:43:58,891 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7843 for job e49624208fe771c4c9527799fd46f2a3 (5645215
> bytes in
> > 801 ms).
> > 2019-08-06 06:43:59,336 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045039321 for job
> a9a7464ead55474bea6f42ed8e5de60f.
> > 2019-08-06 06:44:00,971 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045040957 for job
> 79788b218e684cb31c1ca0fcc641e89f.
> > 2019-08-06 06:44:01,357 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job a9a7464ead55474bea6f42ed8e5de60f (25870658
> bytes in
> > 1806 ms).
> > 2019-08-06 06:44:02,887 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 79788b218e684cb31c1ca0fcc641e89f (29798945
> bytes in
> > 1849 ms).
> > 2019-08-06 06:44:05,101 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7852 @ 1565045045092 for job
> 03f3a0bd53c21f90f70ea01916dc9f78.
> > 2019-08-06 06:44:06,547 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7844 @ 1565045046522 for job
> 486a1949d75863f823013d87b509d228.
> > 2019-08-06 06:44:07,311 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7844 for job 486a1949d75863f823013d87b509d228 (62458942
> bytes in
> > 736 ms).
> > 2019-08-06 06:44:07,506 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7852 for job 03f3a0bd53c21f90f70ea01916dc9f78 (105565032
> bytes
> > in 2366 ms).
> > 2019-08-06 06:44:08,087 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045048055 for job
> 32783d371464265ef536454055ae6182.
> > 2019-08-06 06:44:09,626 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Checkpoint
> > 7050 of job 4b542195824ff7b7cdf749543fd368cb expired before
> completing.
> > 2019-08-06 06:44:09,647 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7051 @ 1565045049626 for job
> 4b542195824ff7b7cdf749543fd368cb.
> > 2019-08-06 06:44:12,006 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 32783d371464265ef536454055ae6182 (299599482
> bytes
> > in 3912 ms).
> > 2019-08-06 06:44:12,972 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045052962 for job
> 16db5afe9a8cd7c6278030d5dec4c80c.
> > 2019-08-06 06:44:13,109 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Triggering
> > checkpoint 7853 @ 1565045053080 for job
> 9c1394a2d2ff47c7852eff9f1f932535.
> > 2019-08-06 06:44:16,779 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7853 for job 16db5afe9a8cd7c6278030d5dec4c80c (152643149
> bytes
> > in 3666 ms).
> > 2019-08-06 06:44:18,598 INFO
> >  org.apache.flink.runtime.checkpoint.CheckpointCoordinator -
> Completed
> > checkpoint 7828 for job 8df2b47f2a4c1ba0f7019ee5989f6e71 (837558245
> bytes
>

Re: Flink RocksDBStateBackend 问题

2019-08-05 文章
FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
FileSystem的吞吐就会比rocksdb会高

lvwenyuan  于2019年8月6日周二 上午11:39写道:

> 请教各位:
>RocksDBStateBackend
> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>
>
>
>


jobmanager 日志异常

2019-08-05 文章
大家好:


我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
gc和频繁的gc,以下是jobmanager的日志:
就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?

2019-08-06 06:43:58,891 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7843 for job e49624208fe771c4c9527799fd46f2a3 (5645215 bytes in
> 801 ms).
> 2019-08-06 06:43:59,336 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7852 @ 1565045039321 for job a9a7464ead55474bea6f42ed8e5de60f.
> 2019-08-06 06:44:00,971 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7852 @ 1565045040957 for job 79788b218e684cb31c1ca0fcc641e89f.
> 2019-08-06 06:44:01,357 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7852 for job a9a7464ead55474bea6f42ed8e5de60f (25870658 bytes in
> 1806 ms).
> 2019-08-06 06:44:02,887 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7852 for job 79788b218e684cb31c1ca0fcc641e89f (29798945 bytes in
> 1849 ms).
> 2019-08-06 06:44:05,101 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7852 @ 1565045045092 for job 03f3a0bd53c21f90f70ea01916dc9f78.
> 2019-08-06 06:44:06,547 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7844 @ 1565045046522 for job 486a1949d75863f823013d87b509d228.
> 2019-08-06 06:44:07,311 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7844 for job 486a1949d75863f823013d87b509d228 (62458942 bytes in
> 736 ms).
> 2019-08-06 06:44:07,506 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7852 for job 03f3a0bd53c21f90f70ea01916dc9f78 (105565032 bytes
> in 2366 ms).
> 2019-08-06 06:44:08,087 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7853 @ 1565045048055 for job 32783d371464265ef536454055ae6182.
> 2019-08-06 06:44:09,626 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint
> 7050 of job 4b542195824ff7b7cdf749543fd368cb expired before completing.
> 2019-08-06 06:44:09,647 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7051 @ 1565045049626 for job 4b542195824ff7b7cdf749543fd368cb.
> 2019-08-06 06:44:12,006 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7853 for job 32783d371464265ef536454055ae6182 (299599482 bytes
> in 3912 ms).
> 2019-08-06 06:44:12,972 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7853 @ 1565045052962 for job 16db5afe9a8cd7c6278030d5dec4c80c.
> 2019-08-06 06:44:13,109 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7853 @ 1565045053080 for job 9c1394a2d2ff47c7852eff9f1f932535.
> 2019-08-06 06:44:16,779 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7853 for job 16db5afe9a8cd7c6278030d5dec4c80c (152643149 bytes
> in 3666 ms).
> 2019-08-06 06:44:18,598 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7828 for job 8df2b47f2a4c1ba0f7019ee5989f6e71 (837558245 bytes
> in 23472 ms).
> 2019-08-06 06:44:19,193 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7853 for job 9c1394a2d2ff47c7852eff9f1f932535 (594628825 bytes
> in 6067 ms).
> 2019-08-06 06:44:19,238 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 5855 for job 108ce7f6f5f3e76b12fad9dbdbc8feba (45917615 bytes in
> 61819 ms).
> 2019-08-06 06:44:19,248 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 5856 @ 1565045059238 for job 108ce7f6f5f3e76b12fad9dbdbc8feba.
> 2019-08-06 06:44:22,092 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 7802 @ 1565045062084 for job 430689e0f202fcb29ce9d6403e6825f9.
> 2019-08-06 06:44:22,838 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 2940 for job fea51fd74006de69e265adc13e802229 (122562953 bytes
> in 174336 ms).
> 2019-08-06 06:44:22,888 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 2941 @ 1565045062838 for job fea51fd74006de69e265adc13e802229.
> 2019-08-06 06:44:24,348 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering
> checkpoint 613 @ 1565045064328 for job 5a75d77312f29c714af0a2994f0e8b1a.
> 2019-08-06 06:44:25,327 INFO
>  org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed
> checkpoint 7802 for job 430689e0f202fcb29ce9d6403e6825f9 (358649788 bytes
> in 

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
嗯嗯!感谢

Yun Tang  于2019年7月25日周四 下午9:37写道:

> Hi
>
> 你给的代码跟你的异常栈其实还是对不上,前文已经说了,出问题的是operator state,但是你的代码都是keyed
> state相关的代码。不过从你出问题的operator name "KeyedProcess -> async wait operator ->
> Flat Map -> Sink",
> 以及异常栈中的StreamElementSerializer使用和一致性问题的表象,我推测应该是应该是AsyncWaitOperator中的operator
> state "_async_wait_operator_state_"相关。最近fix的
> https://issues.apache.org/jira/browse/FLINK-13063
> 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。
>
> 祝好
> 唐云
> 
> From: 戴嘉诚 
> Sent: Thursday, July 25, 2019 21:07
> To: user-zh 
> Subject: Re: Re: Flink checkpoint 并发问题
>
> Hi 唐云
>
>
> 这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。
>
> 这个其中的ProcessElement大致的代码是这样的:
> .process(new KeyedProcessFunction, Map Object>>() {
>   private static final long serialVersionUID =
> 5245290869789704294L;
>
>   private MapState accumulateStateMap;
>   Map resultMap = new HashMap<>();
>   private transient Long hourClear = 24L;
>
>
>   @Override
>   public void open(Configuration parameters) throws Exception {
> MapStateDescriptor accumulateState = new
> MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
> LongSerializer.INSTANCE);
> accumulateStateMap =
> getRuntimeContext().getMapState(accumulateState);
>   }
>
>   @Override
>   public void processElement(Map value, Context
> ctx,
>   Collector> out) throws Exception {
> logger.info("来数据了:{}", value);
> realData.increment();
> resultMap.clear();
> String valueFieldValue =
> String.valueOf(value.get(getLastName(valueFieldName)));
> Long timeFieldValue =
> Long.parseLong(String.valueOf(value.get(timeFieldName)));
> //写到state中
> //判断,state是否存在fieldValue,  如果fieldValue
> 存在,再判断state的时间是否小于time(用于判断乱序时间
> if (!accumulateStateMap.contains(valueFieldValue) ||
> accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
>   accumulateStateMap.put(valueFieldValue, timeFieldValue);
> }
> //判断配置是否已经刷进来了
> if (value.containsKey("config")) {
>   Map config = (Map)
> value.get("config");
>   Integer configCount =
> Integer.parseInt(config.get(countFieldName));
>   Long configTime =
> Long.parseLong(config.get(timeRangeFieldName)) * 1000;
>   //在配置时间范围前
>   long lastTimeStamp = timeFieldValue - configTime;
>   //状态里面有多少个值
>   int stateSize = 0;
>   //遍历state, 删除过时的时间
>   Iterator> iterator =
> accumulateStateMap.iterator();
>   while (iterator.hasNext()) {
> ++stateSize;
> Entry next = iterator.next();
> if (lastTimeStamp >= next.getValue()) {
>   iterator.remove();
>   --stateSize;
> }
>   }
>   //state的值的数量大于阈值
>   if (stateSize >= configCount) {
> resultMap.put("id", config.get("id"));
> resultMap.put("config_id", config.get("_id"));
> resultMap.put("config_version", config.get("_version"));
> resultMap.put("config_score", config.get("score"));
> resultMap.put("config_ttl", config.get("ttl"));
> resultMap.put("startTime", lastTimeStamp);
> resultMap.put("endTime", timeFieldValue);
> resultMap.put("key", ctx.getCurrentKey());
> resultMap.put("value", valueFieldValue);
> resultMap.put("count", stateSize);
> out.collect(resultMap);
>   }
>   logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
> stateSize);
>   //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
>   hourClear = (configTime * 2  + EXPIRATION_TIME) / 360;
>   LocalDateTime localDateTime =
>
> Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
>   localDateTime =
> localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
>   long timeClean =
> localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
>   timeTimerGauge = S

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
Hi 唐云

这个问题在这个job上是必现的,即使是failvoer后自动重启,也会出现,但是在同一个处理逻辑,在其他的场景下是没有出现过,而这其中的区别是,这个job是数据量比较大,将近一分钟80万左右。

这个其中的ProcessElement大致的代码是这样的:
.process(new KeyedProcessFunction, Map>() {
  private static final long serialVersionUID = 5245290869789704294L;

  private MapState accumulateStateMap;
  Map resultMap = new HashMap<>();
  private transient Long hourClear = 24L;


  @Override
  public void open(Configuration parameters) throws Exception {
MapStateDescriptor accumulateState = new
MapStateDescriptor<>("accumulateState", StringSerializer.INSTANCE,
LongSerializer.INSTANCE);
accumulateStateMap =
getRuntimeContext().getMapState(accumulateState);
  }

  @Override
  public void processElement(Map value, Context ctx,
  Collector> out) throws Exception {
logger.info("来数据了:{}", value);
realData.increment();
resultMap.clear();
String valueFieldValue =
String.valueOf(value.get(getLastName(valueFieldName)));
Long timeFieldValue =
Long.parseLong(String.valueOf(value.get(timeFieldName)));
//写到state中
//判断,state是否存在fieldValue,  如果fieldValue
存在,再判断state的时间是否小于time(用于判断乱序时间
if (!accumulateStateMap.contains(valueFieldValue) ||
accumulateStateMap.get(valueFieldValue) < timeFieldValue) {
  accumulateStateMap.put(valueFieldValue, timeFieldValue);
}
//判断配置是否已经刷进来了
if (value.containsKey("config")) {
  Map config = (Map)
value.get("config");
  Integer configCount =
Integer.parseInt(config.get(countFieldName));
  Long configTime =
Long.parseLong(config.get(timeRangeFieldName)) * 1000;
  //在配置时间范围前
  long lastTimeStamp = timeFieldValue - configTime;
  //状态里面有多少个值
  int stateSize = 0;
  //遍历state, 删除过时的时间
  Iterator> iterator =
accumulateStateMap.iterator();
  while (iterator.hasNext()) {
++stateSize;
Entry next = iterator.next();
if (lastTimeStamp >= next.getValue()) {
  iterator.remove();
  --stateSize;
}
  }
  //state的值的数量大于阈值
  if (stateSize >= configCount) {
resultMap.put("id", config.get("id"));
resultMap.put("config_id", config.get("_id"));
resultMap.put("config_version", config.get("_version"));
resultMap.put("config_score", config.get("score"));
resultMap.put("config_ttl", config.get("ttl"));
resultMap.put("startTime", lastTimeStamp);
resultMap.put("endTime", timeFieldValue);
resultMap.put("key", ctx.getCurrentKey());
resultMap.put("value", valueFieldValue);
resultMap.put("count", stateSize);
out.collect(resultMap);
  }
  logger.info("当前key为:{}, 聚集数量为:{}", ctx.getCurrentKey(),
stateSize);
  //根据配置时间乘以2,错信息范围加上来注册指定清理state的时间
  hourClear = (configTime * 2  + EXPIRATION_TIME) / 360;
  LocalDateTime localDateTime =
Instant.ofEpochMilli(timeFieldValue).atZone(ZoneId.systemDefault()).toLocalDateTime();
  localDateTime =
localDateTime.withMinute(0).withSecond(0).withNano(0).plusHours(hourClear);
  long timeClean =
localDateTime.toInstant(OffsetDateTime.now().getOffset()).toEpochMilli();
  timeTimerGauge = String.valueOf(timeClean);
  //注册过期时间
  ctx.timerService().registerEventTimeTimer(timeClean);
}
  }

  @Override
  public void onTimer(long timestamp, OnTimerContext ctx,
  Collector> out) throws Exception {
//减去时间,防止删除中间要累积的数据
Long lasttime = timestamp - (hourClear * 360);
//删除过期时间
int stateSize = 0;
int removeState = 0;
Iterator> iterator =
accumulateStateMap.iterator();
while (iterator.hasNext()) {
  ++stateSize;
  Entry next = iterator.next();
  if (lasttime >= next.getValue()) {
iterator.remove();
--stateSize;
++removeState;
  }
}
        if (stateSize == 0) {
  accumulateStateMap.clear();
}
//把这个定时器删除掉
ctx.timerService().deleteEventTimeTimer(timestamp);
  }
})

Yun Tang  于2019年7月25日周四 下午8:39写道:

> Hi 戴嘉诚
>
> 从异常栈上看,DefaultOperatorStateBackendSnapshotStrategy 88行左右的代码[1]是对list
>

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
hi
你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292  自行打包的,  operator state
descriptor是使用MapStateDescriptor,
谢谢!

Yun Tang  于2019年7月25日周四 下午7:10写道:

> Hi  all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Flink版本是什么?这个operator 里面的operator state descriptor是什么?
>
> 祝好
> 唐云
> ____
> From: 戴嘉诚 
> Sent: Thursday, July 25, 2019 19:04
> To: user-zh@flink.apache.org 
> Subject: Re: Re: Flink checkpoint 并发问题
>
> 这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
> 7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数
>
> athlon...@gmail.com 于2019年7月25日 周四18:50写道:
>
> > 那你用window和evictor 不可以吗?
> > 类似这样,因为我理解你的业务需求可以用这个来实现
> > 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
> >
> > --
> > athlon...@gmail.com
> >
> >
> > *发件人:* 戴嘉诚 
> > *发送时间:* 2019-07-25 18:45
> > *收件人:* user-zh 
> > *主题:* Re: Re: Flink checkpoint 并发问题
> >
> >
> >
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> > 对
> >
> > athlon...@gmail.com  于2019年7月25日周四 下午6:40写道:
> >
> > > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> > >
> > >
> > >
> > > athlon...@gmail.com
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:24
> > > 收件人: user-zh
> > > 主题: Re: Flink checkpoint 并发问题
> > > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> > >
> > > athlon...@gmail.com  于2019年7月25日周四 下午6:20写道:
> > >
> > > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > > >
> > > >
> > > >
> > > > athlon...@gmail.com
> > > >
> > > > 发件人: 戴嘉诚
> > > > 发送时间: 2019-07-25 18:07
> > > > 收件人: user-zh
> > > > 主题: Flink checkpoint 并发问题
> > > > 大家好:
> > > >
> > > > 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > > >
> > > >
> > > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > > >
> > > >
> > > >
> > > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > > >
> > > >
> > > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > > (16/20).
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > > >
> > > >  at org.apache.flink.streaming.runtime.io
> > > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > > >
> > > >  at
> > org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > > >
> > > >  at java.lang.Thread.run(Thread.java:748)
> > > >
> > > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > > 写入redis库存 (16/20).
> > > >
> > > >  at
> > > >
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > > >
> > > >  at
> > > >
&

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
这个用window不符合这个场景,因为是取每时每刻的最近半小时,而window只能是固定时间区间,例如window只能统计6点半至7点,
7点之7点半,而我的场景是需要6点到6点半,6点02分到6点32分,是根据数据的时间来确定区间,达到统计区间内的总数

athlon...@gmail.com 于2019年7月25日 周四18:50写道:

> 那你用window和evictor 不可以吗?
> 类似这样,因为我理解你的业务需求可以用这个来实现
> 在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
>
> --
> athlon...@gmail.com
>
>
> *发件人:* 戴嘉诚 
> *发送时间:* 2019-07-25 18:45
> *收件人:* user-zh 
> *主题:* Re: Re: Flink checkpoint 并发问题
>
>
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在map里面删除,这个ttl不能精确到map里面的具体的k-v
> 对
>
> athlon...@gmail.com  于2019年7月25日周四 下午6:40写道:
>
> > 其实你可以不用自己删除.使用TTL设置短一些时间,试试
> >
> >
> >
> > athlon...@gmail.com
> >
> > 发件人: 戴嘉诚
> > 发送时间: 2019-07-25 18:24
> > 收件人: user-zh
> > 主题: Re: Flink checkpoint 并发问题
> > 你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
> >
> > athlon...@gmail.com  于2019年7月25日周四 下午6:20写道:
> >
> > > setMaxConcurrentCheckpoints 这个参数你设置过么?
> > >
> > >
> > >
> > > athlon...@gmail.com
> > >
> > > 发件人: 戴嘉诚
> > > 发送时间: 2019-07-25 18:07
> > > 收件人: user-zh
> > > 主题: Flink checkpoint 并发问题
> > > 大家好:
> > >
> > > 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
> > >
> > >
> > >
> >
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
> > >
> > >
> > >
> > >
> >
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
> > >
> > >
> > > java.lang.Exception: Could not perform checkpoint 550 for operator
> > > KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> > > (16/20).
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
> > >
> > >  at org.apache.flink.streaming.runtime.io
> > > .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
> > >
> > >  at
> org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > >
> > >  at java.lang.Thread.run(Thread.java:748)
> > >
> > > Caused by: java.lang.Exception: Could not complete snapshot 550 for
> > > operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> > > 写入redis库存 (16/20).
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
> > >
> > >  at
> > >
> >
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
> > >
> > >  ... 8 more
> > >
> > > Caused by: java.util.ConcurrentModificationException
> > >
> > >  at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
> > >
> > >  at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
> > >

Re: Flink checkpoint 并发问题

2019-07-25 文章
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了

athlon...@gmail.com  于2019年7月25日周四 下午6:20写道:

> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> athlon...@gmail.com
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
> 我这里有个Job,会经常不定时的挂掉重启,具体的异常日志为一下,大致是checkpoint的同步失败的
>
>
> 在这个job中,具体是查询最近半小时的数据量是否达到配置的阈值,我使用了rockDB作为后端,然后中间会有个processElement方法会把Key出来的消息,在内部中的MapState堆积,同时也会每条消息过来,就遍历一遍MapState,来删除其中过时的数据。
>
>
>
> 这个异常是不是我在ProcessElement方法内对MapState进行删除的过程中,checkpoint异步复制这个state到rockDB导致的?
>
>
> java.lang.Exception: Could not perform checkpoint 550 for operator
> KeyedProcess -> async wait operator -> Flat Map -> Sink: 写入redis库存
> (16/20).
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:595)
>
>  at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.notifyCheckpoint(BarrierBuffer.java:396)
>
>  at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.processBarrier(BarrierBuffer.java:292)
>
>  at
> org.apache.flink.streaming.runtime.io.BarrierBuffer.getNextNonBlocked(BarrierBuffer.java:200)
>
>  at org.apache.flink.streaming.runtime.io
> .StreamInputProcessor.processInput(StreamInputProcessor.java:209)
>
>  at
> org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
>
>  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
>
>  at java.lang.Thread.run(Thread.java:748)
>
> Caused by: java.lang.Exception: Could not complete snapshot 550 for
> operator KeyedProcess -> async wait operator -> Flat Map -> Sink:
> 写入redis库存 (16/20).
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:422)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.checkpointStreamOperator(StreamTask.java:1113)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask$CheckpointingOperation.executeCheckpointing(StreamTask.java:1055)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.checkpointState(StreamTask.java:729)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:641)
>
>  at
> org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:586)
>
>  ... 8 more
>
> Caused by: java.util.ConcurrentModificationException
>
>  at java.util.HashMap$HashIterator.nextNode(HashMap.java:1442)
>
>  at java.util.HashMap$EntryIterator.next(HashMap.java:1476)
>
>  at java.util.HashMap$EntryIterator.next(HashMap.java:1474)
>
>  at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:156)
>
>  at
> com.esotericsoftware.kryo.serializers.MapSerializer.copy(MapSerializer.java:21)
>
>  at com.esotericsoftware.kryo.Kryo.copy(Kryo.java:862)
>
>  at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.copy(KryoSerializer.java:248)
>
>  at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:105)
>
>  at
> org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.copy(StreamElementSerializer.java:46)
>
>  at
> org.apache.flink.runtime.state.ArrayListSerializer.copy(ArrayListSerializer.java:73)
>
>  at
> org.apache.flink.runtime.state.PartitionableListState.(PartitionableListState.java:68)
>
>  at
> org.apache.flink.runtime.state.PartitionableListState.deepCopy(PartitionableListState.java:80)
>
>  at
> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy.snapshot(DefaultOperatorStateBackendSnapshotStrategy.java:88)
>
>  at
> org.apache.flink.runtime.state.DefaultOperatorStateBackend.snapshot(DefaultOperatorStateBackend.java:261)
>
>  at
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:402)
>
>  ... 13 more
>


答复: 注册缓存文件的热更新问题

2019-07-04 文章
好的,谢谢

发件人: Biao Liu
发送时间: 2019年7月5日 10:39
收件人: user-zh
主题: Re: 注册缓存文件的热更新问题

据我所知,没有
自己写代码实现吧

戴嘉诚  于2019年7月5日周五 上午10:36写道:

> 好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢?
> 谢谢!
>
> 发件人: Biao Liu
> 发送时间: 2019年7月5日 10:20
> 收件人: user-zh
> 主题: Re: 注册缓存文件的热更新问题
>
> 这个接口只会在提交 job 时工作一次,不会检测更新
>
> Xintong Song  于2019年7月4日周四 下午7:39写道:
>
> > 你好,
> >
> > 这个应该是不可以的。
> >
> > Thank you~
> >
> > Xintong Song
> >
> >
> >
> > On Thu, Jul 4, 2019 at 4:29 PM 戴嘉诚  wrote:
> >
> > > 大家好:
> > >
> > >
> >
> 我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。
> > >
> > >
> >
>
>



答复: 注册缓存文件的热更新问题

2019-07-04 文章
好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢?
谢谢!

发件人: Biao Liu
发送时间: 2019年7月5日 10:20
收件人: user-zh
主题: Re: 注册缓存文件的热更新问题

这个接口只会在提交 job 时工作一次,不会检测更新

Xintong Song  于2019年7月4日周四 下午7:39写道:

> 你好,
>
> 这个应该是不可以的。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 4, 2019 at 4:29 PM 戴嘉诚  wrote:
>
> > 大家好:
> >
> >
> 我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。
> >
> >
>



注册缓存文件的热更新问题

2019-07-04 文章
大家好:

我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。



答复: flink metrics的 Reporter 问题

2019-05-15 文章
好的,感谢

发件人: Xintong Song
发送时间: 2019年5月15日 21:17
收件人: user-zh@flink.apache.org
主题: Re: flink metrics的 Reporter 问题

取hostname的第一部分是为了和hdfs的用法保持一致,可以参考一下当时的issue,作者专门提到了为什么要这么做。
https://issues.apache.org/jira/browse/FLINK-1170?focusedCommentId=14175285=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-14175285

Thank you~

Xintong Song



On Wed, May 15, 2019 at 9:11 PM Yun Tang  wrote:

> Hi 嘉诚
>
> 不清楚你使用的Flink具体版本,不过这个显示host-name第一部分的逻辑是一直存在的,因为大部分场景下host-name只需要取第一部分即可表征。具体实现代码可以参阅
> [1] 和 [2] 。
>
> 受到你的启发,我创建了一个JIRA [3] 来追踪这个问题,解法是提供一个metrics
> options,使得你们场景下可以展示metrics的完整hostname
>
> 祝好
> 唐云
>
>
> [1]
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L365
> [2]
> https://github.com/apache/flink/blob/505b54c182867ccac5d1724d72f4085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59
> [3] https://issues.apache.org/jira/browse/FLINK-12520
> 
> From: 戴嘉诚 
> Sent: Wednesday, May 15, 2019 20:24
> To: user-zh@flink.apache.org
> Subject: flink metrics的 Reporter 问题
>
> 大家好:
> 我按照官网的文档,调试了flink metrics 的 reporter
> ,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题,
> 在taskManager中打印里面的信息的时候,打印出来的是:
> ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded:
> 12044
> 这里的格式范围,我看了源码应该是.taskmanager..:
>
>
> 但是这里就存在了个问题了,这里的host,显示的是ambari,我服务器上配置的计算机名称应该是全量的ambari.host12.yy,这里的host把后面的给全部省略掉了。这样,我就无法判断这条记录是来自哪个机器了。
>
> 同时,我在jobManager中看到jobmanager打印出来的日志中,是一个全量的机器名称,如下:
> ambari.host02.yy.jobmanager.Status.JVM.Memory.NonHeap.Max: -1
>
> 请问如果我要在taskmanager的reporter中获取到全量的机器名称,我这里需要如何处理?这里是一个bug吗?还是我的使用有误
>



答复: flink metrics的 Reporter 问题

2019-05-15 文章
Hi 唐云

我用的是flink1.8

感谢你的解答,我刚刚也找到了源码里面的[2]方式截取方式。目前来说,应该只能自己在report中,调用系统变量来获取主机名称了。



发件人: Yun Tang
发送时间: 2019年5月15日 21:11
收件人: user-zh@flink.apache.org
主题: Re: flink metrics的 Reporter 问题

Hi 嘉诚

不清楚你使用的Flink具体版本,不过这个显示host-name第一部分的逻辑是一直存在的,因为大部分场景下host-name只需要取第一部分即可表征。具体实现代码可以参阅
 [1] 和 [2] 。

受到你的启发,我创建了一个JIRA [3] 来追踪这个问题,解法是提供一个metrics 
options,使得你们场景下可以展示metrics的完整hostname

祝好
唐云


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java#L365
[2] 
https://github.com/apache/flink/blob/505b54c182867ccac5d1724d72f4085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59
[3] https://issues.apache.org/jira/browse/FLINK-12520

From: 戴嘉诚 
Sent: Wednesday, May 15, 2019 20:24
To: user-zh@flink.apache.org
Subject: flink metrics的 Reporter 问题

大家好:
我按照官网的文档,调试了flink metrics 的 reporter 
,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题,
在taskManager中打印里面的信息的时候,打印出来的是:
ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded:
 12044
这里的格式范围,我看了源码应该是.taskmanager..:

但是这里就存在了个问题了,这里的host,显示的是ambari,我服务器上配置的计算机名称应该是全量的ambari.host12.yy,这里的host把后面的给全部省略掉了。这样,我就无法判断这条记录是来自哪个机器了。

同时,我在jobManager中看到jobmanager打印出来的日志中,是一个全量的机器名称,如下:
ambari.host02.yy.jobmanager.Status.JVM.Memory.NonHeap.Max: -1

请问如果我要在taskmanager的reporter中获取到全量的机器名称,我这里需要如何处理?这里是一个bug吗?还是我的使用有误



flink metrics的 Reporter 问题

2019-05-15 文章
大家好:
我按照官网的文档,调试了flink metrics 的 reporter 
,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题,
在taskManager中打印里面的信息的时候,打印出来的是:
ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded:
 12044
这里的格式范围,我看了源码应该是.taskmanager..:

但是这里就存在了个问题了,这里的host,显示的是ambari,我服务器上配置的计算机名称应该是全量的ambari.host12.yy,这里的host把后面的给全部省略掉了。这样,我就无法判断这条记录是来自哪个机器了。

同时,我在jobManager中看到jobmanager打印出来的日志中,是一个全量的机器名称,如下:
ambari.host02.yy.jobmanager.Status.JVM.Memory.NonHeap.Max: -1

请问如果我要在taskmanager的reporter中获取到全量的机器名称,我这里需要如何处理?这里是一个bug吗?还是我的使用有误


答复: flink集群性能问题

2019-05-10 文章
好的,我尝试吧18个flink job拆分出来,在yarn中单独运行,观察里面的jm/tm的信息,谢谢.

发件人: Yun Gao
发送时间: 2019年5月10日 17:52
收件人: user-zh
主题: Re: flink集群性能问题

异常一和异常三应该是相关的,因为Heartbeat也是通过akka来发送的;如果确定不是GC的问题的话,那么超时一般是由于JM/TM在做一些耗时的操作导致akka线程阻塞,但是具体在做什么操作应该还需要进一步分析。


--
From:戴嘉诚 
Send Time:2019 May 10 (Fri.) 17:00
To:user-zh@flink.apache.org 
Subject:flink集群性能问题

大家好:
 我这里遇到了一个问题,我的运行方式是flink session on 
yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。
 状态后端使用的是文件后端,以前用rocksDB的时候,也是出现过如此异常。

异常一:
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
container_e30_1556059873942_0007_01_003306 timed out.
 at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)
 at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
 at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


异常二:
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/flink/checkpoint/public_rule_persona_guanzhu_count/225a05c3ac6192017f38017417ad8f71/chk-6015/1ed16855-3261-4eda-bceb-1519f401eae5
 (inode 1967760389): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_2064130997_92, pendingcreates: 11]
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3697)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3498)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3336)
 at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3296)
 at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504)
 at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
 at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
 at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
 at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

 at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
 at org.apache.hadoop.ipc.Client.call(Client.java:1498)
 at org.apache.hadoop.ipc.Client.call(Client.java:1398)
 at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
 at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
 at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:459)
 at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
 at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:498)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
 at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
 at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1580)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1375)
 at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:564)

异常三:
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on 
[Actor[akka.tcp://fl...@ambari.host08.yy:37375/user/taskmanager_0#-433497337]] 
after [1 ms

flink集群性能问题

2019-05-10 文章
大家好:
我这里遇到了一个问题,我的运行方式是flink session on 
yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。
状态后端使用的是文件后端,以前用rocksDB的时候,也是出现过如此异常。

异常一:
java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id 
container_e30_1556059873942_0007_01_003306 timed out.
at 
org.apache.flink.runtime.jobmaster.JobMaster$TaskManagerHeartbeatListener.notifyHeartbeatTimeout(JobMaster.java:1656)
at 
org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl$HeartbeatMonitor.run(HeartbeatManagerImpl.java:339)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.runtime.concurrent.akka.ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.run(ActorSystemScheduledExecutorAdapter.java:154)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


异常二:
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease on 
/flink/checkpoint/public_rule_persona_guanzhu_count/225a05c3ac6192017f38017417ad8f71/chk-6015/1ed16855-3261-4eda-bceb-1519f401eae5
 (inode 1967760389): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_2064130997_92, pendingcreates: 11]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3697)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.analyzeFileState(FSNamesystem.java:3498)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getNewBlockTargets(FSNamesystem.java:3336)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getAdditionalBlock(FSNamesystem.java:3296)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.addBlock(NameNodeRpcServer.java:850)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.addBlock(ClientNamenodeProtocolServerSideTranslatorPB.java:504)
at 
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:640)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2351)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2347)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1869)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2347)

at org.apache.hadoop.ipc.Client.getRpcResponse(Client.java:1554)
at org.apache.hadoop.ipc.Client.call(Client.java:1498)
at org.apache.hadoop.ipc.Client.call(Client.java:1398)
at 
org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:233)
at com.sun.proxy.$Proxy17.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.addBlock(ClientNamenodeProtocolTranslatorPB.java:459)
at sun.reflect.GeneratedMethodAccessor51.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:290)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:202)
at 
org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:184)
at com.sun.proxy.$Proxy18.addBlock(Unknown Source)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.locateFollowingBlock(DFSOutputStream.java:1580)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1375)
at 
org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:564)

异常三:
java.util.concurrent.CompletionException: akka.pattern.AskTimeoutException: Ask 
timed out on 
[Actor[akka.tcp://fl...@ambari.host08.yy:37375/user/taskmanager_0#-433497337]] 
after [1 ms]. Sender[null] sent message of type 

job 失败告警

2019-04-19 文章
大家好:
  请问,在代码中,如果感知job failed 后的方法调用(除了用restful 实时调用接口)?因为在on 
yarn中,如果job晚上failed了…上班的时候,就看不到对应的日志,也不知道他failed的原因了。我这里需要,当感知到job失败了,就调用代码外部通知。来实时知道job的情况。


答复: 回复: 方案询问

2019-04-02 文章
这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的

写个简单的MapState demo吧,如下:
env
.addSource(flinkKafkaConsumer)
.process(new ProcessFunction() {
  private static final long serialVersionUID = -8357959184126038977L;

  private MapState accumulateState;

  @Override
  public void open(Configuration parameters) throws Exception {
MapStateDescriptor accumulateStateDescriptor =
new MapStateDescriptor<>(
"map_state",
StringSerializer.INSTANCE,
StringSerializer.INSTANCE);
accumulateState = 
getRuntimeContext().getMapState(accumulateStateDescriptor);
  }

  @Override
  public void processElement(String value, Context ctx, Collector 
out)
  throws Exception {
  String key = null;
if (accumulateState.contains(key)) {
  String  存在的订单号 = accumulateState.get(key);
  存在订单号 和 value 合并;
out.collect(合并的订单);
} else {
  accumulateState.put(key, value);
}
  }
})
;



发件人: 1900
发送时间: 2019年4月2日 20:59
收件人: paullin3280
主题: 回复: 方案询问

MapState 暂时还不知道怎么做,后面继续研究,我现在做了个版本


1.将收到的流分成两份流,一份初始状态的流,一份终态的流
2.watermark用订单的eventtime,采用滑动窗口进行流的切分
3.根据订单号进行合并,采用CoGroupFunction进行流的处理
4.在CoGroupFunction中合并两个流,流1跟流2进行过滤合并,同一个订单号最终只有一条数据,最终变成一个流


不知道现在这样写怎么样?有没有什么弊端?性能怎么样?会不会造成数据丢失什么的?


-- 原始邮件 --
发件人: "paullin3280";
发送时间: 2019年4月2日(星期二) 下午2:10
收件人: "user-zh";

主题: Re: 方案询问



Hi,

推荐可以维护两个 MapState 分别缓存尚未匹配的两种订单。一条订单数据进来首先查找另一种订单的 MapState,若找到则输出合并的数据并删除对应的 
entry,否则放入所属订单类型的 MapState。

Best,
Paul Lam

> 在 2019年4月2日,13:46,1900 <575209...@qq.com> 写道:
> 
> 现在有个需求,从kafka接收订单信息,每条订单信息有1-2条数据(一般第一条是订单初始状态数据,第二条是订单终态数据);时间间隔不等(一般5秒以内),
> 如何能将数据进行合并,最终合并成一条数据?
> 
> 
> 现在有一个考虑,根据订单号keyby分组后处理,这样的话是不是开启的窗口太多了?



答复: 批流结合

2019-04-02 文章
是什么样的离线数据?要如何累加到实时流?

发件人: 492341344
发送时间: 2019年4月2日 10:06
收件人: user-zh
主题: 批流结合

各位好,项目中有一批历史离线的统计数据,需要累加到实时流的统计中。请问有什么好的方案吗?



答复: flink-connector-redis连接器

2019-04-01 文章
源码里面是不支持expire, 你可以自己覆盖源码的接口,自定义方法

发件人: 周美娜
发送时间: 2019年4月1日 20:48
收件人: user-zh@flink.apache.org
主题: flink-connector-redis连接器

请问:flink 的redis connector作为sink时 不支持Expire命令吗?



答复: RocksDB中指定nameNode 的高可用

2019-03-29 文章
可以了,感谢指教。

发件人: Biao Liu
发送时间: 2019年3月27日 19:13
收件人: user-zh@flink.apache.org
主题: Re: RocksDB中指定nameNode 的高可用

Hi,
  HDFS 本身可以解决该问题,可以搜一下 “HDFS HA nameservice”,可以避免写死 name node 地址

Yun Tang  于2019年3月26日周二 下午5:29写道:

> Hi
>
> Flink高可用相关配置的存储目录,当存储路径配置成HDFS时,相关namenode高可用性由HDFS支持,对上层完全透明。
>
> 祝好
> 唐云
> ____
> From: 戴嘉诚 
> Sent: Tuesday, March 26, 2019 16:57
> To: user-zh@flink.apache.org
> Subject: RocksDB中指定nameNode 的高可用
>
> 嘿,我想询问一下,flink中的RocksDB位置
> 我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active
> nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉
>
>



RocksDB中指定nameNode 的高可用

2019-03-26 文章
  嘿,我想询问一下,flink中的RocksDB位置  
我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active 
nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉



答复: flink疑问

2019-03-25 文章
使用 Split 算子把流根据特定条件拆分成两个或者更多,然后在用select算子从拆分流中选择对应的拆分流做处理即可。
可以看看文档上,有介绍用法
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/

发件人: baiyg25...@hundsun.com
发送时间: 2019年3月26日 10:10
收件人: user-zh
主题: 回复: flink疑问

一个算子出来两个流好像不能吧。
要想实现你说的,可以先基于A流过滤生成要进行B算子的流,基于A流过滤生成要进行C算子的流。



baiyg25...@hundsun.com
 
发件人: IORI
发送时间: 2019-03-26 09:46
收件人: user-zh
主题: flink疑问
请问:数据流通过算子A时,我想分裂成两个数据流,一个数据流进行算子B操作然后sink,另外一个数据流需要先进行算子C操作,再reduce然后sink,请问这种情况应该如何处理?一个operator中能出来两个不同的数据流吗?
 



答复: Flink 在什么情况下产生乱序问题?

2019-03-06 文章
你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger 
,然后设置你的时间间隔。

发件人: 刘 文
发送时间: 2019年3月6日 22:55
收件人: user-zh@flink.apache.org
抄送: qcx978132...@gmail.com
主题: Re: Flink 在什么情况下产生乱序问题?

).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的 getCurrentWatermark()的时间戳 > TimeWindow + maxOutOfOrderness 
时,才会触发结束上一次window
).可是最新的记录是不能及时被处理,或者是不能被处理
).请问这个问题怎么处理?









---

> 在 2019年3月6日,下午10:29,刘 文  写道:
> 
> 该问题,明白一点了,整理成文档供大家参考
> ———
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析: 
> https://github.com/opensourceteams/flink-maven-scala/blob/master/md/miniCluster/Flink-EventTime-watermark.md
>  
> 
> 
> 
> ———
> 
> 
> 
> Flink 1.7.2 业务时间戳分析流式数据源码分析
> 
>  
> 源码
> 
> https://github.com/opensourceteams/flink-maven-scala 
> 
>  
> 概述
> 
> 由于Flink默认的ProcessTime是按Window收到Source发射过来的数据的时间,来算了,也就是按Flink程序接收的时间来进行计算,但实际业务,处理周期性的数据时,每5分钟内的数据,每1个小时内的数据进行分析,实际是业务源发生的时间来做为实际时间,所以用Flink的EventTime和Watermark来处理这个问题
> 指定Env为EventTime
> 调置数据流assignTimestampsAndWatermarks函数,由AssignerWithPeriodicWatermarks中的extractTimestamp()函数提取实际业务时间,getCurrentWatermark得到最新的时间,这个会对每个元素算一次,拿最大的当做计算时间,如果当前时间,大于上一次的时间间隔
>  + 这里设置的延时时间,就会结束上一个Window,也就是对这一段时间的Window进行操作
> 本程序以指定业务时间,来做为统计时间
>  
> 程序
> 
> package 
> com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.eventtime
> 
> import java.util.{Date, Properties}
> 
> import com.alibaba.fastjson.JSON
> import com.opensourceteams.module.bigdata.flink.common.ConfigurationUtil
> import org.apache.flink.api.common.serialization.SimpleStringSchema
> import org.apache.flink.configuration.Configuration
> import org.apache.flink.streaming.api.TimeCharacteristic
> import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
> import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
> import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
> import org.apache.flink.streaming.api.watermark.Watermark
> import org.apache.flink.streaming.api.windowing.time.Time
> import org.apache.flink.streaming.api.windowing.windows.TimeWindow
> import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
> import org.apache.flink.util.Collector
> 
> 
> object SockWordCountRun {
> 
> 
> 
>   def main(args: Array[String]): Unit = {
> 
> 
> // get the execution environment
>// val env: StreamExecutionEnvironment = 
> StreamExecutionEnvironment.getExecutionEnvironment
> 
> 
> val configuration : Configuration = 
> ConfigurationUtil.getConfiguration(true)
> 
> val env:StreamExecutionEnvironment = 
> StreamExecutionEnvironment.createLocalEnvironment(1,configuration)
> 
> 
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> 
> 
> 
> import org.apache.flink.streaming.api.scala._
> val dataStream = env.socketTextStream("localhost", 1234, '\n')
> 
>  // .setParallelism(3)
> 
> 
> dataStream.assignTimestampsAndWatermarks(new 
> AssignerWithPeriodicWatermarks[String] {
> 
> val maxOutOfOrderness =  2 * 1000L // 3.5 seconds
> var currentMaxTimestamp: Long = _
> var currentTimestamp: Long = _
> 
> override def getCurrentWatermark: Watermark =  new 
> Watermark(currentMaxTimestamp - maxOutOfOrderness)
> 
> override def extractTimestamp(element: String, 
> previousElementTimestamp: Long): Long = {
>   val jsonObject = JSON.parseObject(element)
> 
>   val timestamp = jsonObject.getLongValue("extract_data_time")
>   currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
>   currentTimestamp = timestamp
> 
> /*  println("===watermark begin===")
>   println()
>   println(new Date(currentMaxTimestamp - 20 * 1000))
>   println(jsonObject)
>   println("===watermark end===")
>   println()*/
>   timestamp
> }
> 
>   })
>   .timeWindowAll(Time.seconds(3))
> 
>   .process(new ProcessAllWindowFunction[String,String,TimeWindow]() {
>   override def process(context: Context, elements: Iterable[String], out: 
> Collector[String]): Unit = {
> 
> 
> println()
> println("开始提交window")
> println(new Date())
> for(e <- elements) out.collect(e)
> 

Re: 如何每五分钟统计一次当天某个消息的总条数

2019-03-04 文章
当天的,就直接是翻滚窗口就行了吧,不过你要注意你一天量有多大,小心内存不够了

张作峰 于2019年3月5日 周二15:06写道:

> 设置event time 窗口为一天,是滑动窗口吗?具体是指?需要统计的是当天的
>
> --
> 张作峰
> 创维 一体机软件开发部
>
> 深圳市南山区高新南一道创维大厦A座12楼
> 手机: 18320872958 座机: 0755-26974350(分机号 4350)
> Email:m...@zhangzuofeng.cn
> 主页:http://www.zhangzuofeng.cn
> wiki: http://wiki.qiannuo.me
>
>
>
>
>
>
>
>
>
> -- 原始邮件 --
> 发件人: "Paul Lam";
> 发送时间: 2019年3月5日(星期二) 下午2:46
> 收件人: "user-zh";
> 主题: Re: 如何每五分钟统计一次当天某个消息的总条数
>
>
>
> Hi,
>
> 你可以试下设置 event time 窗口为一天,然后设置 processing time timer 来定时每 5 分钟触发输出当天最新的结果。
>
> Best,
> Paul Lam
>
> > 在 2019年3月5日,13:16,张作峰  写道:
> >
> > 大家好!
> > 请教下诸位大牛,如何使用stream api每五分钟统计一次当天某个消息的总条数?
> > 谢谢!