大家好:
请问,因为当前flink sql或者flink table
中,不支持自定义的udf中使用有state的逻辑,所以,当我们自己任务中,如果统计需要聚集型指标的情况下,就不能用上flink
sql了,只能自己使用flink datastream去硬编码,请问,flink
sql中,能否有其他方式,可以调用我们自己定义的有state的udf,并且可以不让再解析执行的时候,多次出现呢?还是说,只能一个指标一个flink
job?
你好
可以参考一下这个链接的思路
https://blog.csdn.net/IT_Lee_J_H/article/details/88641894
发自我的iPhone
> 在 2020年4月4日,18:15,chanamper 写道:
>
> Dear All,
>大家好,请教一下。目前针对Java Api的方式,对于Flink keyby情况存在key数据倾斜有啥实现优化思路吗?看官方文档目前在table
> api和sql层面,有Minibatch Aggregation和Local Global
你可以在算子中计算,然后上传到自定义的Flink Metrics中,这样就能看到平均一个算子的时间了.
发件人: 张江
发送时间: 2020年1月2日 19:18
收件人: user-zh
抄送: user-zh
主题: 回复:如何获取算子处理一条数据记录的时间
我其实是想知道算子的数据处理能力,得到一个算子每秒钟最多能处理多少条数据。比如说map算子,我需要知道它一秒钟最多能转换多少数据,之后根据source端的数据量来设置算子的并行度
| |
张江
|
|
邮箱:zjkingdom2...@163.com
|
签名由 网易邮箱大师 定制
在2020年01月02日
大家好:
我在的Flink是在yarn上跑,在yarn上部署了个yarn-session,命令如下:
./yarn-session.sh -jm 5120m -tm 10240m -s 30 -d -st
这里我是设置了一个tm上面跑30个slot,但是我在1.8版本的时候,是在yarn中会看到的是一个tm是使用一个cpu,但是我切换到了1.9.1后,发现是一个slot使用一个cpu,导致我设置的并行超出了yarn中的cpu资源。请问在1.9.1中,如果降低这个cpu使用率?
/12)(55962df9fd694ed1f82b8f3ec2aaf6c4)”
是受害者,是因为其他异常导致整个作业failover,之后导致cancel了当前task,你应该在job
manager日志中找到第一个fail的task,那上面的异常才是root cause。
祝好
唐云
From: 戴嘉诚
Sent: Friday, October 11, 2019 15:17
To: user-zh
Subject: Re: 回复: flink 缓存本地文件被删除疑问
Hi
这是早上发生异常后,我下载的日志,请麻烦查看一下。
taskmanager.
/cda6dc0c44239aa7a36105988328de5744aea125/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java#L54
祝好
唐云
From: 戴嘉诚
Sent: Thursday, October 10, 2019 16:26
To: user-zh@flink.apache.org
Subject: 回复: flink1.9 webui exception日志显示问
cts/flink/flink-docs-release-1.9/ops/state/savepoints.html#assigning-operator-ids
> On Oct 11, 2019, at 11:00, 戴嘉诚 wrote:
>
> 大家好:
> 最近我的程序迁移到了flink1.9 on yarn
> session后,原本有个程序是读取kafka消息,然后key分组,window聚集一秒后,批量异步写入hbase中经常发生异常,其中异常为:
> java.lang.Exception: Exc
你好,我的任务是用RocksDB存储的Checkpoint, 是运行了一段时间后报的这个错误
发件人: pengchenglin
发送时间: 2019年10月11日 11:59
收件人: user-zh@flink.apache.org
主题: Re: flink 缓存本地文件被删除疑问
你好,你的任务是用RocksDB存储的Checkpoint吗?任务是每次启动时就报这个错误,还是运行一段时间报这个错误》
发件人: 戴嘉诚
发送时间: 2019-10-11 11:00
收件人: user-zh@flink.apache.org
主题: flink 缓存本地文件被删除疑问
大家好
+1 这个我也遇到了这个问题,主要原因是异常了,然后region 重启,重启后,会重新加载,就自动清空了异常日志信息..现在不能再webui上排查异常信息了
发件人: 李杰
发送时间: 2019年10月10日 14:41
收件人: user-zh@flink.apache.org
主题: flink1.9 webui exception日志显示问题
log4j.properties为官方默认。
weib ui exception日志一闪而过,ui上看不到历史异常信息
大家好:
我的flink代码打包的jar包是放到了hdfs上面,但是当我在flink中用命令行执行的时候,flink本地是否只能解析本地jar包?不能解析到hdfs上面的jar包?
我把jar包下载到服务器本地后,就可以执行成功了
我的命令是:
./bin/flink run -yid application_1567652112073_0001 -p 6 -yj
hdfs://ysec-storage/flink/runJar/business-security-1.0-SNAPSHOT.jar --appId
act_test
返回的结果是:
在onTimer回调方法中清理状态。
以上是我的思路,希望能帮助到你~
祝好
在 2019-09-05 13:43:00,"Yifei Qi" 写道:
>你的意思是自己去实现滑动窗口的功能么?
>
>戴嘉诚 于2019年9月4日周三 下午10:51写道:
>
>> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>>
>> Yifei Qi 于2019年9月4日 周三20:07写道:
>>
>> &g
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口
发件人: Yifei Qi
发送时间: 2019年9月5日 13:42
收件人: user-zh@flink.apache.org
主题: Re: 如何优化flink内存?
你的意思是自己去实现滑动窗口的功能么?
戴嘉诚 于2019年9月4日周三 下午10:51写道:
> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
> Yifei Qi 于2019年9月4日 周三20:07写道:
这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
Yifei Qi 于2019年9月4日 周三20:07写道:
> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口.
大家好:
我在看到streamingFileSink
大家好:
我这里用的cdh6.3.0版本进行hadoop管理。所以我根据官网上的显示,对flink的源码根据cdh6.3.0重新编译打包,但是在打包过程中,貌似发现了个问题:
[ERROR] Failed to execute goal
org.apache.maven.plugins:maven-compiler-plugin:3.8.0:testCompile
(default-testCompile) on project flink-yarn_2.11: Compilation failure
[ERROR]
你好,
可以通过flink的restFul去调用保存savepoint
发件人: liu zhongfeng
发送时间: 2019年8月9日 20:28
收件人: user-zh@flink.apache.org
主题: 恢复savepoint,除了命令行,能通过代码获取吗?
如题,restore savepoint,除了run flink -s
savepointpath之外,能通过代码恢复吗,因为公司集群没法输入命令行。如果可以的话,能给个小demo,或者API也可以
谢谢。
Best,
Rio Liu, 刘中锋
你好,
可以用lettuce做异步客户端,排除lettuce的netty依赖,用flink的netty,就可以了集成lettuce了
王佩 于2019年8月6日 周二22:11写道:
> 这种Join场景,用上缓存后,理论上应该更快,但为啥会变慢呢。
>
> 王佩 于2019年8月6日周二 下午10:09写道:
>
> > 需求: 事实表实时Join Kudu中的维度表,用来补全维度。
> >
> > 为加快查询速度,先从Kudu中查询数据,查询到数据后放入Redis缓存,下次查询先从Redis中取,取不到再从Kudu中查。
> >
> > 遇到的问题:
> >
者 yarn resource
manager 的 log
1. https://access.redhat.com/solutions/737033
Thanks,
Biao /'bɪ.aʊ/
On Tue, Aug 6, 2019 at 12:30 PM Wong Victor
wrote:
> Hi,
> 可以查看一下jobmanager所在节点的yarn log,搜索一下对应的container为什么被kill;
>
> Regards
>
> On 2019/8/6, 11:40 AM, "戴嘉诚" wrote:
>
FileSystem 我记得是存储的大小是不能超过tm的内存还是jm的内存,而rocksdb上存储的数据是可以无限的,不过相对来说,
FileSystem的吞吐就会比rocksdb会高
lvwenyuan 于2019年8月6日周二 上午11:39写道:
> 请教各位:
>RocksDBStateBackend
> 中,rocksdb上存储的内如和FileSystem上存储的数据内容是一样的?如果不一样,那么分别是什么呢?感谢回答
>
>
>
>
大家好:
我的flink是部署在yarn上左session,今天早上jobmanager自动退出了,然后yarn把他重新拉起了,导致里面跑的job重新启动了,但是我查看日志,看到jobmanager的日志没有任何异常,同时jobmanager也没有长时间的full
gc和频繁的gc,以下是jobmanager的日志:
就是在06:44分的是偶,日志上标记了收收到停止请求,然后jobmanager直接停止了...请问是由于什么原因导致的呢?
2019-08-06 06:43:58,891 INFO
>
该是AsyncWaitOperator中的operator
> state "_async_wait_operator_state_"相关。最近fix的
> https://issues.apache.org/jira/browse/FLINK-13063
> 问题,正是暂时解决一致性问题,可以考虑cherry pick相关的fix重新部署你们的Flink作业,观察该问题是否还会复现。
>
> 祝好
> 唐云
>
> From: 戴嘉诚
> S
if (lasttime >= next.getValue()) {
iterator.remove();
--stateSize;
++removeState;
}
}
if (stateSize == 0) {
accumulateStateMap.clear();
}
//把这个定时器删除掉
hi
你好,我用的flink是1.8,但是是根据hadoop 2.7.3.2.6.5.0-292 自行打包的, operator state
descriptor是使用MapStateDescriptor,
谢谢!
Yun Tang 于2019年7月25日周四 下午7:10写道:
> Hi all
>
> 你们讨论的已经越来越偏了,出问题的是operator state
> backend,实际上与RocksDB没有关系,与MaxConcurrentCheckpoints 也不应该有关系。
>
> To 戴嘉诚
> 你使用的Fl
在flink源码中org.apache.flink.streaming.examples.windowing.TopSpeedWindowing
>
> --
> athlon...@gmail.com
>
>
> *发件人:* 戴嘉诚
> *发送时间:* 2019-07-25 18:45
> *收件人:* user-zh
> *主题:* Re: Re: Flink checkpoint 并发问题
>
>
> 这个应该不行,因为这个ttl是删除整个map状态,而我是需要最近半小时内的数据量,所以这个map状态,如果一直有数据写入,我就只需要获取最近的半小时,超过的就在
你好,这个参数我没有设置过,对于checkpoint,我只是设置了rockDB增量写入,其他都没有设置了
athlon...@gmail.com 于2019年7月25日周四 下午6:20写道:
> setMaxConcurrentCheckpoints 这个参数你设置过么?
>
>
>
> athlon...@gmail.com
>
> 发件人: 戴嘉诚
> 发送时间: 2019-07-25 18:07
> 收件人: user-zh
> 主题: Flink checkpoint 并发问题
> 大家好:
>
>
好的,谢谢
发件人: Biao Liu
发送时间: 2019年7月5日 10:39
收件人: user-zh
主题: Re: 注册缓存文件的热更新问题
据我所知,没有
自己写代码实现吧
戴嘉诚 于2019年7月5日周五 上午10:36写道:
> 好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢?
> 谢谢!
>
> 发件人: Biao Liu
> 发送时间: 2019年7月5日 10:20
> 收件人: user-zh
> 主题: Re: 注册缓存文件的热更新问题
>
>
好的,那我想问问,如果要定期更新文件的这个场景,flink有没有其他功能是否支持呢?
谢谢!
发件人: Biao Liu
发送时间: 2019年7月5日 10:20
收件人: user-zh
主题: Re: 注册缓存文件的热更新问题
这个接口只会在提交 job 时工作一次,不会检测更新
Xintong Song 于2019年7月4日周四 下午7:39写道:
> 你好,
>
> 这个应该是不可以的。
>
> Thank you~
>
> Xintong Song
>
>
>
> On Thu, Jul 4, 2019
大家好:
我在flink中看到可以注册一个分布式缓存文件StreamExecutionEnvironment.registerCachedFile()然后可以广播到每个tm上给算子使用,那么我想问问,这个文件可以检测到文件更新了,然后会重新广播过去嘛?因为ip会可能会每天都有改变,所以ip库要每天都更新。
085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59
> [3] https://issues.apache.org/jira/browse/FLINK-12520
>
> From: 戴嘉诚
> Sent: Wednesday, May 15, 2019 20:24
> To: user-zh@flink.apache.org
> Subject: flink metrics的 R
/505b54c182867ccac5d1724d72f4085425ac08a8/flink-core/src/main/java/org/apache/flink/util/NetUtils.java#L59
[3] https://issues.apache.org/jira/browse/FLINK-12520
From: 戴嘉诚
Sent: Wednesday, May 15, 2019 20:24
To: user-zh@flink.apache.org
Subject: flink metrics的 Reporter 问题
大家好:
我
大家好:
我按照官网的文档,调试了flink metrics 的 reporter
,加载了Slf4jReporter,这个Reporter运行是正常了,但是发现了个问题,
在taskManager中打印里面的信息的时候,打印出来的是:
ambari.taskmanager.container_e31_1557826320302_0005_01_02.Status.JVM.ClassLoader.ClassesLoaded:
12044
这里的格式范围,我看了源码应该是.taskmanager..:
。
--
From:戴嘉诚
Send Time:2019 May 10 (Fri.) 17:00
To:user-zh@flink.apache.org
Subject:flink集群性能问题
大家好:
我这里遇到了一个问题,我的运行方式是flink session on
yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。
状态后端使用的是文件
大家好:
我这里遇到了一个问题,我的运行方式是flink session on
yarn上,一共有18个任务在这个session上运行,这个任务运行了几天后,最近开始发现有几个job,不定时报这个错误,(ps:就这几个job报这个异常,其他job没有出现)。一直都提示超时,然后看了gc,发现没有长时间的的fullgc,而且gc也改为了用g1垃圾收集器,但是也是会有这个问题。
状态后端使用的是文件后端,以前用rocksDB的时候,也是出现过如此异常。
异常一:
java.util.concurrent.TimeoutException: Heartbeat
大家好:
请问,在代码中,如果感知job failed 后的方法调用(除了用restful 实时调用接口)?因为在on
yarn中,如果job晚上failed了…上班的时候,就看不到对应的日志,也不知道他failed的原因了。我这里需要,当感知到job失败了,就调用代码外部通知。来实时知道job的情况。
这样写好复杂。弊端和性能方面具体就不清楚,但肯定是可比MapState弱一点的
写个简单的MapState demo吧,如下:
env
.addSource(flinkKafkaConsumer)
.process(new ProcessFunction() {
private static final long serialVersionUID = -8357959184126038977L;
private MapState accumulateState;
@Override
public void
是什么样的离线数据?要如何累加到实时流?
发件人: 492341344
发送时间: 2019年4月2日 10:06
收件人: user-zh
主题: 批流结合
各位好,项目中有一批历史离线的统计数据,需要累加到实时流的统计中。请问有什么好的方案吗?
源码里面是不支持expire, 你可以自己覆盖源码的接口,自定义方法
发件人: 周美娜
发送时间: 2019年4月1日 20:48
收件人: user-zh@flink.apache.org
主题: flink-connector-redis连接器
请问:flink 的redis connector作为sink时 不支持Expire命令吗?
gt;
> 祝好
> 唐云
> ____
> From: 戴嘉诚
> Sent: Tuesday, March 26, 2019 16:57
> To: user-zh@flink.apache.org
> Subject: RocksDB中指定nameNode 的高可用
>
> 嘿,我想询问一下,flink中的RocksDB位置
> 我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当activ
嘿,我想询问一下,flink中的RocksDB位置
我指定了hdfs路径,但是,这里是强指定nameNode的地址,但是我的hdfs是有个两个nameNode地址的,这里能否有个功能,当active
nameNode挂掉了,类似hdfs的HA那样,能无缝切换nameNode地址吗?不然,当nameNode挂掉了, 我指定的flink也会一并挂掉
使用 Split 算子把流根据特定条件拆分成两个或者更多,然后在用select算子从拆分流中选择对应的拆分流做处理即可。
可以看看文档上,有介绍用法
https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/
发件人: baiyg25...@hundsun.com
发送时间: 2019年3月26日 10:10
收件人: user-zh
主题: 回复: flink疑问
一个算子出来两个流好像不能吧。
要想实现你说的,可以先基于A流过滤生成要进行B算子的流,基于A流过滤生成要进行C算子的流。
你可以了解下触发器,默认的触发器是按照你发现的做,如果你要实时输出,可以吧触发器更改为ContinuonsEventTimeTrigger
,然后设置你的时间间隔。
发件人: 刘 文
发送时间: 2019年3月6日 22:55
收件人: user-zh@flink.apache.org
抄送: qcx978132...@gmail.com
主题: Re: Flink 在什么情况下产生乱序问题?
).在验证EventTime 加watermark 处理中,我发现往socket发送的数据,不能及时输出或没有输出
).验证发现,只有当前发送的数据的
当天的,就直接是翻滚窗口就行了吧,不过你要注意你一天量有多大,小心内存不够了
张作峰 于2019年3月5日 周二15:06写道:
> 设置event time 窗口为一天,是滑动窗口吗?具体是指?需要统计的是当天的
>
> --
> 张作峰
> 创维 一体机软件开发部
>
> 深圳市南山区高新南一道创维大厦A座12楼
> 手机: 18320872958 座机: 0755-26974350(分机号 4350)
> Email:m...@zhangzuofeng.cn
> 主页:http://www.zhangzuofeng.cn
> wiki:
42 matches
Mail list logo