可以 savepoint 到 HDFS,然后配置 checkpoint 的地址为 对象存储。
我们就是 flink 支持对象存储和 HDFS。
Hangxiang Yu 于2023年8月2日周三 14:03写道:
> Hi, 我理解可以有两种方式:
> 1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址
> 2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储
>
> 关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid
有大佬看看吗?
Tianwang Li 于2021年10月20日周三 上午11:03写道:
> flink1.13 和 flink 1.14
>
> Caizhi Weng 于2021年10月20日周三 上午10:17写道:
>
>> Hi!
>>
>> 会将 input 的记录存储在 state 里面。
>>
>> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
>>
>> 这样会导致 state 非常的大。
>> >
>>
&
flink1.13 和 flink 1.14
Caizhi Weng 于2021年10月20日周三 上午10:17写道:
> Hi!
>
> 会将 input 的记录存储在 state 里面。
>
> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。
>
> 这样会导致 state 非常的大。
> >
>
> 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。
>
> Tianwang Li
Flink 的 Over 窗口
例如在 range over window 场合,会将 input 的记录存储在 state 里面。
如果 input 的字段比较多,但是参与聚合运算的字段比较少。
这样会导致 state 非常的大。
从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看,
不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。
这样能提升Over 窗口的处理性能吗?
SQL例子:
SELECT
col_1,
col_2,
col_3,
col_4,
col_5,
col_6, -- 字段内容比较长
col_7,
(小的)tumbling window + (大的)over window
这样会不会好一些。
Wanghui (HiCampus) 于2021年7月30日周五 下午3:17写道:
> Hi all:
>我在测试Over窗口时,当窗口是5秒~15s级别时,处理速度能够达到2000/s。
> 但是当我把窗口调整为10分钟以上时,处理速度从2000开始急速下降,几分钟后下降至230/s。
> 请问下:
>Over窗口的性能该如何优化,因为我后续会将窗口调整为24小时,按照目前的情况来看,性能会下降很快。
>我的测试节点配置:8C +
问题:
flink 计算任务配置的TM内存是 6G,但是,TM进程占用的内存实际达到了 8G。
是什么超用那么多内存?
flink rocksdb内存超用那么多吗?
还是我的配置有什么问题 ?
内存使用:
> top - 19:05:36 up 304 days, 9:12, 0 users, load average: 7.24, 5.99,
> 5.25
> Tasks: 5 total, 1 running, 4 sleeping, 0 stopped, 0 zombie
> %Cpu(s): 5.8 us, 0.9 sy, 0.0 ni,
er(id);
}
Caizhi Weng 于2021年7月15日周四 上午11:15写道:
> Hi!
>
> Flink 1.11 以来对自动类型推导进行了一些修改。可能需要添加一些 annotations 才能推导类型。详见文档
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/#%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc
>
> Tianwang Li
Flink-1.13 注册 UDAF accumulator 类型识别失败,
在Flink-1.10的时候是可以的。
请问在新的版本要如何注册写UDAF??
错误信息:
Caused by: org.apache.flink.table.api.ValidationException: SQL
validation failed. An error occurred in the type inference logic of
function 'default_catalog.default_database.hlp_count'.
at
在 docker-entrypoint.sh 指定了是 “start-foreground” 模式。
所以没有 .out 文件
Tianwang Li 于2021年1月27日周三 下午5:38写道:
> 我参考:
> https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
>
> 部署了一个session集群,运行任务。
> 在UI查看Stdout是空的,没有生成 .out
我参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html
部署了一个session集群,运行任务。
在UI查看Stdout是空的,没有生成 .out 文件
请问是哪里配置的问题吗?
--
**
tivanli
**
主要原因是Flink SQL 上报对数据的task_name 太长了,比较吃资源,导致prometheus采集超时。
当前做法是修改org.apache.flink.metrics.prometheus.AbstractPrometheusReporter,对上报对task_name
进行截断。
Tianwang Li 于2021年1月19日周二 下午5:36写道:
> 我在Flink 集成pushgateway到时候,
> 运行几十个任务,pushgateway到内容就达到了几百万行,太恐怖了。
> 请问一下,大
我在Flink 集成pushgateway到时候,
运行几十个任务,pushgateway到内容就达到了几百万行,太恐怖了。
请问一下,大家有什么优化方案么?
--
**
tivanli
**
Flink版本:1.10.2
使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。
本地测试的结果是一直重复输出数据。
请问一下DataStream 处理之后,怎么才能注册为 Table。
---
代码如下:
// 异步redis处理
RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node,
aggProcessorArgs);
// 获取异步处理流
DataStream result =
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。
例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理)
SELECT cnt, word, time_hour
FROM (
SELECT *,
ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum
FROM test_word_count)
WHERE rownum <= 100;
Benchao Li 于2020年9月14日周一 下午1:03写道:
> Hi,
>
> 看起来你并没有实现`retract`
这种有窗口统计没有影响吧?
刘建刚 于2020年9月30日周三 下午2:25写道:
> 修复方案参考https://github.com/apache/flink/pull/11830
>
> kandy.wang 于2020年9月30日周三 下午2:19写道:
>
> > group agg 开启了mini batch之后,state ttl不生效的问题:
> >
> >
> > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink
> >
FROM_UNIXTIME 使用的是本地时区, (可以看,org.apache.flink.table.runtime.functions.
SqlDateTimeUtils#fromUnixtime的实现)
可以指定时区,StreamTableEnvironment.getConfig().setLocalTimeZone()
Joker 于2020年9月24日周四 下午1:54写道:
> 不好意思,插入个问题。ts AS TO_TIMESTAMP(FROM_UNIXTIME(create_time / 1000,
> '-MM-dd HH:mm:ss'))
目前,观察到另外一个现象,
如果任务出现了异常,例如写Kafka失败,任务自动重启,这个时候就会突然飙升。
应该是任务失败之后,关闭重启,rocksdb占用到内存没有回收。
通过pmap查看,占用比较多内存多是很多个(128MB 和 64MB 内存块)。
另外,失败重启和如下多jira 描述重启任务多时候比较类似。
https://issues.apache.org/jira/browse/FLINK-7289
pmap图:
[image: image.png]
[image: image.png]
Tianwang Li 于2020年9月23日周三 下午9:11写道
被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到
> > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是
> > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) /
> > 2;`,在你的SQL来讲,就是3h,也就是说
> > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1]
> >
> > 希望这个可以解
希望这个可以解答你的疑惑~
>
> [1] https://issues.apache.org/jira/browse/FLINK-18996
>
> Tianwang Li 于2020年9月22日周二 下午8:26写道:
>
> > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
> >
> >
> > 【join】
> >
> > > SELECT `b`.`rowtime`,
> > > `a`.`c
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。
【join】
> SELECT `b`.`rowtime`,
> `a`.`c_id`,
> `b`.`openid`
> FROM `test_table_a` AS `a`
> INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID`
> AND `a`.`openid` = `b`.`openid`
> AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime`
Flink1.10.0 的checkpoint越来越大,
但是,我到TM机器上看,flink-io-xxx 目录也没有那么大,是统计指标大问题吗?
307
> COMPLETED
> 30/30 20:55:40 20:55:54 14s 8.62 GB 0 B
> 306
> COMPLETED
> 30/30 20:50:40 20:50:55 15s 8.59 GB 0 B
> 305
> COMPLETED
> 30/30 20:45:40 20:45:54 13s 8.56 GB 0 B
> 304
> COMPLETED
> 30/30 20:40:40 20:40:55
我们有一些场景,对实时性要求高,同时对数据重复会有比较大大影响。
我想关闭checkpoint,这样是不是能不能保证“至多一次” (At Most Once) ?
这里会不会有什么坑?
另外:我们允许丢失数据。
--
**
tivanli
**
thx
期待这个功能。
Danny Chan 于2020年8月27日周四 下午4:12写道:
> Hi ~
>
> 要开启大小写不敏感涉及的东西比较多,例如词法解析,catalog 以及部分访问表达式 (a.b.c 或者 a[‘f0’]),社区已经有 issue
> 跟进了 [1],预期在 1.12 版本可以解决。
>
> [1] https://issues.apache.org/jira/browse/FLINK-16175
>
> Best,
> Danny Chan
> 在 2020年8月27日 +0800 PM3:
我们对用户在使用习惯了Hive之后,在写一些flink sql对时候经常碰到大小写对困扰。
使用对是默认对catalog。
```
Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column
'uid' not found in any table; did you mean 'Uid'?
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at
展开讨论一些特点从场景。
1、inner join场景。有什么办法取两条流的的rowtime 的max吗?
使用SQL语句的场合,怎么实现?
例如:
SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as
rowtime, ...
如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。
Tianwang Li 于2020年8月16日周日 上午10:40写道:
> 展开讨论一些特点场景。
>
> Benchao Li 于2020年7月6日
展开讨论一些特点场景。
Benchao Li 于2020年7月6日周一 下午11:08写道:
> 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。
>
> 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话,
> A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。
>
> 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。
>
知道了,使用TUMBLE_ROWTIME HOP_ROWTIME SESSION_ROWTIME
可以传递了。
Tianwang Li 于2020年8月6日周四 下午9:12写道:
>
> 我向做两次的窗口计算。
> 1、第一次5分钟窗口。
> 2、第二次10分钟窗口,使用的上一次窗口的结果。
>
>
> 我尝试了发生了异常, Window aggregate can only be defined over a time attribute
> column, but TIMESTAMP(3) encountered.
>
我向做两次的窗口计算。
1、第一次5分钟窗口。
2、第二次10分钟窗口,使用的上一次窗口的结果。
我尝试了发生了异常, Window aggregate can only be defined over a time attribute
column, but TIMESTAMP(3) encountered.
请问有什么办法可以解决吗?
我希望是一个窗口计算后面可以再接一个窗口计算。
第一次计算:
CREATE VIEW tmp_5min AS
SELECT
max(rowtime) as rowtime,
TUMBLE_START(`rowtime`,
| 为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
可以使用blink 的SQL,是通过pane 实现的,输出的时候才合并每个pane。参考`PanedWindowAssigner`
张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道:
> Hi,all!
> 由于第一次咨询,我不确定上一份邮件大家是否收到。
> 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份?
>
>
> |
:58写道:
> Hi
>
>
>
> 如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。
>
>
> Best,
> Yichao Yang
>
>
>
>
> ------原始邮件--
> 发件人:"Tianwang Li" 发送时间:2020年6月28日(星期
>
> 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
用Row 和 Tuple 性能上会有差别吗?
Jark Wu 于2020年6月19日周五 下午3:47写道:
> 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么?
>
>
> On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote:
>
> > 感谢你的回答,请问可否举一个参照例子?
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。
>
我增加每个task处理窗口数据的时间在观察一下,
我这个是测试任务,没有sink输出。
source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右)
LakeShen 于2020年6月28日周日 上午10:35写道:
> Hi Tianwang Li,
>
> 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key
我补充一下,checkpoint的UI截图如下:
https://imgchr.com/i/NgCUgS
https://imgchr.com/i/NgChDJ
https://imgchr.com/i/NgCT4x
>
--
**
tivanli
**
关于Flink checkpoint偶尔会比较长时间的问题。
*环境与背景:*
版本:flink1.10.0
数据量:每秒约10万左右的记录,数据源是kafka
计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。
是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。
*问题:*
大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。
source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。
第一个场景: 从SQL的角度,增加时间字段精确到分钟为key,格式如-MM-dd HH:mm。这样是不是就可以实现你要到效果了。
flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道:
> 我有两个需求
> 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办?
> 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办?
--
经处理过了,处理成了 10:00, 10:10, 10:20... 这种10分钟间隔的点,所以按照 time_str
> 分组的话,一天下来也就 24*6 个点。
> 在 Flink SQL 中,并不一定要 GROUP BY TUMLBE 才能做类似窗口聚合的操作,直接 GROUP BY hour/min/ts
> 也能达到类似的效果。
> 只不过前者不会输出更新,且能自动清理 state,后者会输出更新且不会自动清理 state。
>
> Best,
> Jark
>
> On Sat, 21 Mar 2020 at 11:24, Tia
SELECT HOP_START (ts, INTERVAL '3' MINUTE, INTERVAL '24' HOUR),
HOP_END (ts, INTERVAL '3' MINUTE, INTERVAL '24' HOUR),
dt,
count(1) as pv,
count(distinct userid) as uv
FROM t_user_log
GROUP BY HOP (ts, INTERVAL '5' MINUTE, INTERVAL '24' HOUR),
dt
37 matches
Mail list logo