flink table api 或者 sql 使用 自定义含有的state方法

2020-11-23 文章
大家好: 请问,因为当前flink sql或者flink table 中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink sql了,只能自己使用flink datastream去硬编码,请问,flink sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink job?

Re: Flink keyby数据倾斜问题

2020-04-05 文章
你好 可以参考一下这个链接的思路 https://blog.csdn.net/IT_Lee_J_H/article/details/88641894 发自我的iPhone > 在 2020年4月4日,18:15,chanamper 写道: > > Dear All, >大家好,请教一下。目前针对Java Api的方式,对于Flink keyby情况存在key数据倾斜有啥实现优化思路吗?看官方文档目前在table > api和sql层面,有Minibatch Aggregation和Local Global

回复: 回复:如何获取算子处理一条数据记录的时间

2020-01-06 文章
你可以在算子中计算,然后上传到自定义的Flink Metrics中,这样就能看到平均一个算子的时间了. 发件人: 张江 发送时间: 2020年1月2日 19:18 收件人: user-zh 抄送: user-zh 主题: 回复:如何获取算子处理一条数据记录的时间 我其实是想知道算子的数据处理能力,得到一个算子每秒钟最多能处理多少条数据。比如说map算子,我需要知道它一秒钟最多能转换多少数据,之后根据source端的数据量来设置算子的并行度 | | 张江 | | 邮箱:zjkingdom2...@163.com | 签名由 网易邮箱大师 定制 在2020年01月02日

slot询问

2019-12-27 文章
大家好: 我在的Flink是在yarn上跑,在yarn上部署了个yarn-session,命令如下: ./yarn-session.sh -jm 5120m -tm 10240m -s 30 -d -st 这里我是设置了一个tm上面跑30个slot,但是我在1.8版本的时候,是在yarn中会看到的是一个tm是使用一个cpu,但是我切换到了1.9.1后,发现是一个slot使用一个cpu,导致我设置的并行超出了yarn中的cpu资源。请问在1.9.1中,如果降低这个cpu使用率?

回复: 回复: flink 缓存本地文件被删除疑问

2019-10-11 文章
/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)” 是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job manager日志中找到第一个fail的task,那上面的异常才是root cause。 祝好 唐云 From: 戴嘉诚 Sent: Friday, October 11, 2019 15:17 To: user-zh Subject: Re: 回复: flink 缓存本地文件被删除疑问 Hi 这是早上发生异常后,我下载的日志,请麻烦查看一下。 taskmanager.

回复: 回复: flink1.9 webui exception日志显示问题

2019-10-11 文章
/cda6dc0c44239aa7a36105988328de5744aea125/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L54 祝好 唐云 From: 戴嘉诚 Sent: Thursday, October 10, 2019 16:26 To: user-zh@flink.apache.org Subject: 回复: flink1.9 webui exception日志显示问

回复: flink 缓存本地文件被删除疑问

2019-10-11 文章
cts/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids > On Oct 11, 2019, at 11:00, 戴嘉诚 wrote: > > 大家好: > 最近我的程序迁移到了flink1.9 on yarn > session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为: > java.lang.Exception: Exc

回复: flink 缓存本地文件被删除疑问

2019-10-10 文章
你好,我的任务是用RocksDB存储的Checkpoint, 是运行了一段时间后报的这个错误 发件人: pengchenglin 发送时间: 2019年10月11日 11:59 收件人: user-zh@flink.apache.org 主题: Re: flink 缓存本地文件被删除疑问 你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》 发件人: 戴嘉诚 发送时间: 2019-10-11 11:00 收件人: user-zh@flink.apache.org 主题: flink 缓存本地文件被删除疑问 大家好

回复: flink1.9 webui exception日志显示问题

2019-10-10 文章
+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了 发件人: 李杰 发送时间: 2019年10月10日 14:41 收件人: user-zh@flink.apache.org 主题: flink1.9 webui exception日志显示问题 log4j.properties为官方默认。 weib ui exception日志一闪而过,ui上看不到历史异常信息

flink 命令行疑问

2019-09-28 文章
大家好: 我的flink代码打包的jar包是放到了hdfs上面,但是当我在flink中用命令行执行的时候,flink本地是否只能解析本地jar包?不能解析到hdfs上面的jar包? 我把jar包下载到服务器本地后,就可以执行成功了 我的命令是: ./bin/flink run -yid application_1567652112073_0001 -p 6 -yj hdfs://ysec-storage/flink/runJar/business-security-1.0-SNAPSHOT.jar --appId act_test 返回的结果是:

答复: Re: 如何优化flink内存?

2019-09-05 文章
在onTimer回调方法中清理状态。 以上是我的思路,希望能帮助到你~ 祝好 在 2019-09-05 13:43:00,"Yifei Qi" 写道: >你的意思是自己去实现滑动窗口的功能么? > >戴嘉诚 于2019年9月4日周三 下午10:51写道: > >> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存 >> >> Yifei Qi 于2019年9月4日 周三20:07写道: >> >> &g

答复: 如何优化flink内存?

2019-09-05 文章
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口 发件人: Yifei Qi 发送时间: 2019年9月5日 13:42 收件人: user-zh@flink.apache.org 主题: Re: 如何优化flink内存? 你的意思是自己去实现滑动窗口的功能么? 戴嘉诚 于2019年9月4日周三 下午10:51写道: > 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存 > > Yifei Qi 于2019年9月4日 周三20:07写道:

Re: 如何优化flink内存?

2019-09-04 文章
这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存 Yifei Qi 于2019年9月4日 周三20:07写道: > 大家好: > > > > 不知道大家在使用flink时遇到过内存消耗过大的问题么? > > > > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化? > > > > 具体情况是这样的: > > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M. > > 按照用户进行分组. > > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口.

Streaming File Sink疑问

2019-09-03 文章
大家好: 我在看到streamingFileSink

Flink编译问题

2019-08-20 文章
大家好: 我这里用的cdh6.3.0版本进行hadoop管理。所以我根据官网上的显示,对flink的源码根据cdh6.3.0重新编译打包,但是在打包过程中,貌似发现了个问题: [ERROR] Failed to execute goal org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile (default-testCompile) on project flink-yarn_2.11: Compilation failure [ERROR]

答复: 恢复savepoint,除了命令行,能通过代码获取吗?

2019-08-09 文章
你好, 可以通过flink的restFul去调用保存savepoint 发件人: liu zhongfeng 发送时间: 2019年8月9日 20:28 收件人: user-zh@flink.apache.org 主题: 恢复savepoint,除了命令行,能通过代码获取吗? 如题,restore savepoint,除了run flink -s savepointpath之外,能通过代码恢复吗,因为公司集群没法输入命令行。如果可以的话,能给个小demo,或者API也可以 谢谢。 Best, Rio Liu, 刘中锋

Re: AsyncIO 用Redis做缓存

2019-08-06 文章
你好, 可以用lettuce做异步客户端,排除lettuce的netty依赖,用flink的netty,就可以了集成lettuce了 王佩 于2019年8月6日 周二22:11写道: > 这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。 > > 王佩 于2019年8月6日周二 下午10:09写道: > > > 需求: 事实表实时Join Kudu中的维度表,用来补全维度。 > > > > 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。 > > > > 遇到的问题: > >

答复: jobmanager 日志异常

2019-08-06 文章
者 yarn resource manager 的 log 1. https://access.redhat.com/solutions/737033 Thanks, Biao /'bɪ.aʊ/ On Tue, Aug 6, 2019 at 12:30 PM Wong Victor wrote: > Hi, > 可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill; > > Regards > > On 2019/8/6, 11:40 AM, "戴嘉诚" wrote: >

Re: Flink RocksDBStateBackend 问题

2019-08-05 文章
FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说, FileSystem的吞吐就会比rocksdb会高 lvwenyuan 于2019年8月6日周二 上午11:39写道: > 请教各位: >RocksDBStateBackend > 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答 > > > >

jobmanager 日志异常

2019-08-05 文章
大家好: 我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full gc和频繁的gc,以下是jobmanager的日志: 就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢? 2019-08-06 06:43:58,891 INFO >

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
该是AsyncWaitOperator中的operator > state "_async_wait_operator_state_"相关。最近fix的 > https://issues.apache.org/jira/browse/FLINK-13063 > 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。 > > 祝好 > 唐云 > > From: 戴嘉诚 > S

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
if (lasttime >= next.getValue()) { iterator.remove(); --stateSize; ++removeState; } } if (stateSize == 0) { accumulateStateMap.clear(); } //把这个定时器删除掉

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
hi 你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292 自行打包的, operator state descriptor是使用MapStateDescriptor, 谢谢! Yun Tang 于2019年7月25日周四 下午7:10写道: > Hi all > > 你们讨论的已经越来越偏了,出问题的是operator state > backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。 > > To 戴嘉诚 > 你使用的Fl

Re: Re: Flink checkpoint 并发问题

2019-07-25 文章
在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing > > -- > athlon...@gmail.com > > > *发件人:* 戴嘉诚 > *发送时间:* 2019-07-25 18:45 > *收件人:* user-zh > *主题:* Re: Re: Flink checkpoint 并发问题 > > > 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在

Re: Flink checkpoint 并发问题

2019-07-25 文章
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了 athlon...@gmail.com 于2019年7月25日周四 下午6:20写道: > setMaxConcurrentCheckpoints 这个参数你设置过么? > > > > athlon...@gmail.com > > 发件人: 戴嘉诚 > 发送时间: 2019-07-25 18:07 > 收件人: user-zh > 主题: Flink checkpoint 并发问题 > 大家好: > >

答复: 注册缓存文件的热更新问题

2019-07-04 文章
好的,谢谢 发件人: Biao Liu 发送时间: 2019年7月5日 10:39 收件人: user-zh 主题: Re: 注册缓存文件的热更新问题 据我所知,没有 自己写代码实现吧 戴嘉诚 于2019年7月5日周五 上午10:36写道: > 好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢? > 谢谢! > > 发件人: Biao Liu > 发送时间: 2019年7月5日 10:20 > 收件人: user-zh > 主题: Re: 注册缓存文件的热更新问题 > >

答复: 注册缓存文件的热更新问题

2019-07-04 文章
好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢? 谢谢! 发件人: Biao Liu 发送时间: 2019年7月5日 10:20 收件人: user-zh 主题: Re: 注册缓存文件的热更新问题 这个接口只会在提交 job 时工作一次,不会检测更新 Xintong Song 于2019年7月4日周四 下午7:39写道: > 你好, > > 这个应该是不可以的。 > > Thank you~ > > Xintong Song > > > > On Thu, Jul 4, 2019

注册缓存文件的热更新问题

2019-07-04 文章
大家好: 我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。

答复: flink metrics的 Reporter 问题

2019-05-15 文章
085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59 > [3] https://issues.apache.org/jira/browse/FLINK-12520 > > From: 戴嘉诚 > Sent: Wednesday, May 15, 2019 20:24 > To: user-zh@flink.apache.org > Subject: flink metrics的 R

答复: flink metrics的 Reporter 问题

2019-05-15 文章
/505b54c182867ccac5d1724d72f4085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59 [3] https://issues.apache.org/jira/browse/FLINK-12520 From: 戴嘉诚 Sent: Wednesday, May 15, 2019 20:24 To: user-zh@flink.apache.org Subject: flink metrics的 Reporter 问题 大家好: 我

flink metrics的 Reporter 问题

2019-05-15 文章
大家好: 我按照官网的文档,调试了flink metrics 的 reporter ,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题, 在taskManager中打印里面的信息的时候,打印出来的是: ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded: 12044 这里的格式范围,我看了源码应该是.taskmanager..:

答复: flink集群性能问题

2019-05-10 文章
。 -- From:戴嘉诚 Send Time:2019 May 10 (Fri.) 17:00 To:user-zh@flink.apache.org Subject:flink集群性能问题 大家好: 我这里遇到了一个问题,我的运行方式是flink session on yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。 状态后端使用的是文件

flink集群性能问题

2019-05-10 文章
大家好: 我这里遇到了一个问题,我的运行方式是flink session on yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。 状态后端使用的是文件后端,以前用rocksDB的时候,也是出现过如此异常。 异常一: java.util.concurrent.TimeoutException: Heartbeat

job 失败告警

2019-04-19 文章
大家好:   请问,在代码中,如果感知job failed 后的方法调用(除了用restful 实时调用接口)?因为在on yarn中,如果job晚上failed了…上班的时候,就看不到对应的日志,也不知道他failed的原因了。我这里需要,当感知到job失败了,就调用代码外部通知。来实时知道job的情况。

答复: 回复: 方案询问

2019-04-02 文章
这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的 写个简单的MapState demo吧,如下: env .addSource(flinkKafkaConsumer) .process(new ProcessFunction() { private static final long serialVersionUID = -8357959184126038977L; private MapState accumulateState; @Override public void

答复: 批流结合

2019-04-02 文章
是什么样的离线数据?要如何累加到实时流? 发件人: 492341344 发送时间: 2019年4月2日 10:06 收件人: user-zh 主题: 批流结合 各位好,项目中有一批历史离线的统计数据,需要累加到实时流的统计中。请问有什么好的方案吗?

答复: flink-connector-redis连接器

2019-04-01 文章
源码里面是不支持expire, 你可以自己覆盖源码的接口,自定义方法 发件人: 周美娜 发送时间: 2019年4月1日 20:48 收件人: user-zh@flink.apache.org 主题: flink-connector-redis连接器 请问:flink 的redis connector作为sink时 不支持Expire命令吗?

答复: RocksDB中指定nameNode 的高可用

2019-03-29 文章
gt; > 祝好 > 唐云 > ____ > From: 戴嘉诚 > Sent: Tuesday, March 26, 2019 16:57 > To: user-zh@flink.apache.org > Subject: RocksDB中指定nameNode 的高可用 > > 嘿,我想询问一下,flink中的RocksDB位置 > 我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当activ

RocksDB中指定nameNode 的高可用

2019-03-26 文章
  嘿,我想询问一下,flink中的RocksDB位置  我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉

答复: flink疑问

2019-03-25 文章
使用 Split 算子把流根据特定条件拆分成两个或者更多,然后在用select算子从拆分流中选择对应的拆分流做处理即可。 可以看看文档上,有介绍用法 https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/ 发件人: baiyg25...@hundsun.com 发送时间: 2019年3月26日 10:10 收件人: user-zh 主题: 回复: flink疑问 一个算子出来两个流好像不能吧。 要想实现你说的,可以先基于A流过滤生成要进行B算子的流,基于A流过滤生成要进行C算子的流。

答复: Flink 在什么情况下产生乱序问题?

2019-03-06 文章
你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger ,然后设置你的时间间隔。 发件人: 刘 文 发送时间: 2019年3月6日 22:55 收件人: user-zh@flink.apache.org 抄送: qcx978132...@gmail.com 主题: Re: Flink 在什么情况下产生乱序问题? ).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出 ).验证发现,只有当前发送的数据的

Re: 如何每五分钟统计一次当天某个消息的总条数

2019-03-04 文章
当天的,就直接是翻滚窗口就行了吧,不过你要注意你一天量有多大,小心内存不够了 张作峰 于2019年3月5日 周二15:06写道: > 设置event time 窗口为一天,是滑动窗口吗?具体是指?需要统计的是当天的 > > -- > 张作峰 > 创维 一体机软件开发部 > > 深圳市南山区高新南一道创维大厦A座12楼 > 手机: 18320872958 座机: 0755-26974350(分机号 4350) > Email:m...@zhangzuofeng.cn > 主页:http://www.zhangzuofeng.cn > wiki: