Re: flink sql作业状态跨存储系统迁移问题

2023-08-18 文章 Tianwang Li
可以 savepoint 到 HDFS,然后配置 checkpoint 的地址为 对象存储。 我们就是 flink 支持对象存储和 HDFS。 Hangxiang Yu 于2023年8月2日周三 14:03写道: > Hi, 我理解可以有两种方式: > 1. 设定从某个存储集群上恢复并向另一个存储集群上快照,即设置[1]为 HDFS地址,[2] 为后面的对象存储地址 > 2. 还是在HDFS集群上启停作业,设置 savepoint 目录[3]到对象存储 > > 关于 state processor api,目前 sql 作业确实操作起来比较困难,只能从日志里获取 uid

Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-21 文章 Tianwang Li
有大佬看看吗? Tianwang Li 于2021年10月20日周三 上午11:03写道: > flink1.13 和 flink 1.14 > > Caizhi Weng 于2021年10月20日周三 上午10:17写道: > >> Hi! >> >> 会将 input 的记录存储在 state 里面。 >> >> 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 >> >> 这样会导致 state 非常的大。 >> > >> &

Re: Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Tianwang Li
flink1.13 和 flink 1.14 Caizhi Weng 于2021年10月20日周三 上午10:17写道: > Hi! > > 会将 input 的记录存储在 state 里面。 > > 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 > > 这样会导致 state 非常的大。 > > > > 你使用的是哪个 Flink 版本以及哪个 planner?这个观察是如何得出的呢?就我所知,state 里应该只存储了参与 agg 运算的字段。 > > Tianwang Li

Flink Over Window功能存储状态优化能有性能提升吗?

2021-10-19 文章 Tianwang Li
Flink 的 Over 窗口 例如在 range over window 场合,会将 input 的记录存储在 state 里面。 如果 input 的字段比较多,但是参与聚合运算的字段比较少。 这样会导致 state 非常的大。 从 RowTimeRangeBoundedPrecedingFunction 里面逻辑看, 不参与agg运算的字段,在 onTimer 时期输出之后,是可以清理了的。 这样能提升Over 窗口的处理性能吗? SQL例子: SELECT col_1, col_2, col_3, col_4, col_5, col_6, -- 字段内容比较长 col_7,

Re: Over窗口聚合性能调优问题

2021-07-30 文章 Tianwang Li
(小的)tumbling window + (大的)over window 这样会不会好一些。 Wanghui (HiCampus) 于2021年7月30日周五 下午3:17写道: > Hi all: >我在测试Over窗口时,当窗口是5秒~15s级别时,处理速度能够达到2000/s。 > 但是当我把窗口调整为10分钟以上时,处理速度从2000开始急速下降,几分钟后下降至230/s。 > 请问下: >Over窗口的性能该如何优化,因为我后续会将窗口调整为24小时,按照目前的情况来看,性能会下降很快。 >我的测试节点配置:8C +

flink 1.13.1 TM内存超用严重问题

2021-07-30 文章 Tianwang Li
问题: flink 计算任务配置的TM内存是 6G,但是,TM进程占用的内存实际达到了 8G。 是什么超用那么多内存? flink rocksdb内存超用那么多吗? 还是我的配置有什么问题 ? 内存使用: > top - 19:05:36 up 304 days, 9:12, 0 users, load average: 7.24, 5.99, > 5.25 > Tasks: 5 total, 1 running, 4 sleeping, 0 stopped, 0 zombie > %Cpu(s): 5.8 us, 0.9 sy, 0.0 ni,

Re: Flink-1.13 注册 UDAF accumulator 类型识别失败

2021-07-14 文章 Tianwang Li
er(id); } Caizhi Weng 于2021年7月15日周四 上午11:15写道: > Hi! > > Flink 1.11 以来对自动类型推导进行了一些修改。可能需要添加一些 annotations 才能推导类型。详见文档 > > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/functions/udfs/#%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc > > Tianwang Li

Flink-1.13 注册 UDAF accumulator 类型识别失败

2021-07-14 文章 Tianwang Li
Flink-1.13 注册 UDAF accumulator 类型识别失败, 在Flink-1.10的时候是可以的。 请问在新的版本要如何注册写UDAF?? 错误信息: Caused by: org.apache.flink.table.api.ValidationException: SQL validation failed. An error occurred in the type inference logic of function 'default_catalog.default_database.hlp_count'. at

Re: flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 文章 Tianwang Li
在 docker-entrypoint.sh 指定了是 “start-foreground” 模式。 所以没有 .out 文件 Tianwang Li 于2021年1月27日周三 下午5:38写道: > 我参考: > https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html > > 部署了一个session集群,运行任务。 > 在UI查看Stdout是空的,没有生成 .out

flink-1.12.1 k8s Session集群模式没有生成Stdout输出文件

2021-01-27 文章 Tianwang Li
我参考: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html 部署了一个session集群,运行任务。 在UI查看Stdout是空的,没有生成 .out 文件 请问是哪里配置的问题吗? -- ** tivanli **

Re: Flink 上报到pushgateway到数据量太大了

2021-01-19 文章 Tianwang Li
主要原因是Flink SQL 上报对数据的task_name 太长了,比较吃资源,导致prometheus采集超时。 当前做法是修改org.apache.flink.metrics.prometheus.AbstractPrometheusReporter,对上报对task_name 进行截断。 Tianwang Li 于2021年1月19日周二 下午5:36写道: > 我在Flink 集成pushgateway到时候, > 运行几十个任务,pushgateway到内容就达到了几百万行,太恐怖了。 > 请问一下,大

Flink 上报到pushgateway到数据量太大了

2021-01-19 文章 Tianwang Li
我在Flink 集成pushgateway到时候, 运行几十个任务,pushgateway到内容就达到了几百万行,太恐怖了。 请问一下,大家有什么优化方案么? -- ** tivanli **

[flink-1.10.2] 异步IO结果DataStream 该如何注册为table??

2020-12-07 文章 Tianwang Li
Flink版本:1.10.2 使用RichAsyncFunction 异步IO 操作,结果DataStream 不能注册为table。 本地测试的结果是一直重复输出数据。 请问一下DataStream 处理之后,怎么才能注册为 Table。 --- 代码如下: // 异步redis处理 RedisAsyncFunction asyncFunction = new RedisAsyncFunction(node, aggProcessorArgs); // 获取异步处理流 DataStream result =

Re: UDAF函数在over窗口使用问题

2020-11-04 文章 Tianwang Li
有没有文章,介绍过期时间清理的?需不需要用户自己设置TTL。 例如:我有一个TOPN计算,怎么做过期数据清理?(还是会自动处理) SELECT cnt, word, time_hour FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY time_hour ORDER BY cnt desc) AS rownum FROM test_word_count) WHERE rownum <= 100; Benchao Li 于2020年9月14日周一 下午1:03写道: > Hi, > > 看起来你并没有实现`retract`

Re: group agg 开启了mini batch之后,state ttl不生效的问题

2020-09-30 文章 Tianwang Li
这种有窗口统计没有影响吧? 刘建刚 于2020年9月30日周三 下午2:25写道: > 修复方案参考https://github.com/apache/flink/pull/11830 > > kandy.wang 于2020年9月30日周三 下午2:19写道: > > > group agg 开启了mini batch之后,state ttl不生效的问题: > > > > > > 现在我们发现好像 计算订单指标,写hbase,开启mini batch确实是需要的。这样可以大大降低sink > >

Re: flinksql接收到字段值格式为2020-09-23T20:58:24+08:00,如何转成TIMESTAMP

2020-09-26 文章 Tianwang Li
FROM_UNIXTIME 使用的是本地时区, (可以看,org.apache.flink.table.runtime.functions. SqlDateTimeUtils#fromUnixtime的实现) 可以指定时区,StreamTableEnvironment.getConfig().setLocalTimeZone() Joker 于2020年9月24日周四 下午1:54写道: > 不好意思,插入个问题。ts AS TO_TIMESTAMP(FROM_UNIXTIME(create_time / 1000, > '-MM-dd HH:mm:ss'))

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-26 文章 Tianwang Li
目前,观察到另外一个现象, 如果任务出现了异常,例如写Kafka失败,任务自动重启,这个时候就会突然飙升。 应该是任务失败之后,关闭重启,rocksdb占用到内存没有回收。 通过pmap查看,占用比较多内存多是很多个(128MB 和 64MB 内存块)。 另外,失败重启和如下多jira 描述重启任务多时候比较类似。 https://issues.apache.org/jira/browse/FLINK-7289 pmap图: [image: image.png] [image: image.png] Tianwang Li 于2020年9月23日周三 下午9:11写道

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-23 文章 Tianwang Li
被延迟,也就是说状态清理的比正常预期要慢一点。如果用的是left outer join,甚至于会观察到 > > 数据晚于watermark的情况,也就是下游window会有观察到丢数据的情况。这个时间目前是 > > `minCleanUpInterval = (leftRelativeSize + rightRelativeSize) / > > 2;`,在你的SQL来讲,就是3h,也就是说 > > 状态会*多保存*这么久。这个我曾经建过一个issue来优化这个点[1] > > > > 希望这个可以解

Re: [flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
希望这个可以解答你的疑惑~ > > [1] https://issues.apache.org/jira/browse/FLINK-18996 > > Tianwang Li 于2020年9月22日周二 下午8:26写道: > > > 使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 > > > > > > 【join】 > > > > > SELECT `b`.`rowtime`, > > > `a`.`c

[flink-1.10.2] Blink SQL 超用内存严重

2020-09-22 文章 Tianwang Li
使用 Blink SQL 实现 interval join + hop window 超用内存比较严重。 【join】 > SELECT `b`.`rowtime`, > `a`.`c_id`, > `b`.`openid` > FROM `test_table_a` AS `a` > INNER JOIN `test_table_b` AS `b` ON `a`.`RoomID` = `b`.`RoomID` > AND `a`.`openid` = `b`.`openid` > AND `b`.`rowtime` BETWEEN ASYMMETRIC `a`.`rowtime`

Flink1.10.0 的checkpoint越来越大

2020-09-10 文章 Tianwang Li
Flink1.10.0 的checkpoint越来越大, 但是,我到TM机器上看,flink-io-xxx 目录也没有那么大,是统计指标大问题吗? 307 > COMPLETED > 30/30 20:55:40 20:55:54 14s 8.62 GB 0 B > 306 > COMPLETED > 30/30 20:50:40 20:50:55 15s 8.59 GB 0 B > 305 > COMPLETED > 30/30 20:45:40 20:45:54 13s 8.56 GB 0 B > 304 > COMPLETED > 30/30 20:40:40 20:40:55

Flink如何实现至多一次(At Most Once)

2020-09-02 文章 Tianwang Li
我们有一些场景,对实时性要求高,同时对数据重复会有比较大大影响。 我想关闭checkpoint,这样是不是能不能保证“至多一次” (At Most Once) ? 这里会不会有什么坑? 另外:我们允许丢失数据。 -- ** tivanli **

Re: Flink SQL 有提供参数支持大小写不敏感吗?

2020-08-27 文章 Tianwang Li
thx 期待这个功能。 Danny Chan 于2020年8月27日周四 下午4:12写道: > Hi ~ > > 要开启大小写不敏感涉及的东西比较多,例如词法解析,catalog 以及部分访问表达式 (a.b.c 或者 a[‘f0’]),社区已经有 issue > 跟进了 [1],预期在 1.12 版本可以解决。 > > [1] https://issues.apache.org/jira/browse/FLINK-16175 > > Best, > Danny Chan > 在 2020年8月27日 +0800 PM3:

Flink SQL 有提供参数支持大小写不敏感吗?

2020-08-27 文章 Tianwang Li
我们对用户在使用习惯了Hive之后,在写一些flink sql对时候经常碰到大小写对困扰。 使用对是默认对catalog。 ``` Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Column 'uid' not found in any table; did you mean 'Uid'? at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at

Re: flink interval join后按窗口聚组问题

2020-08-15 文章 Tianwang Li
展开讨论一些特点从场景。 1、inner join场景。有什么办法取两条流的的rowtime 的max吗? 使用SQL语句的场合,怎么实现? 例如: SELECT if(left.rowtime > right.rowtime, left.rowtime, right.rowtime) as rowtime, ... 如果支持了,那么这种场景我们还是可以在下游进行窗口计算和CEP之类的计算。 Tianwang Li 于2020年8月16日周日 上午10:40写道: > 展开讨论一些特点场景。 > > Benchao Li 于2020年7月6日

Re: flink interval join后按窗口聚组问题

2020-08-15 文章 Tianwang Li
展开讨论一些特点场景。 Benchao Li 于2020年7月6日周一 下午11:08写道: > 我们最开始发现这个现象的时候也有些惊讶,不过后来想了一下感觉也是合理的。 > > 因为双流Join的时间范围有可能会比较大,比如 A流 在 B流的[-10min, +10min],那这样的话, > A流来一条数据,可能会join到几分钟之前的数据,而此时的watermark其实已经大于了那条数据的事件时间。 > > 我个人感觉,这应该就是在更实时的产生Join结果和导致数据时间晚于watermark之间,需要有一个balance。 >

Re: 请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 Tianwang Li
知道了,使用TUMBLE_ROWTIME HOP_ROWTIME SESSION_ROWTIME 可以传递了。 Tianwang Li 于2020年8月6日周四 下午9:12写道: > > 我向做两次的窗口计算。 > 1、第一次5分钟窗口。 > 2、第二次10分钟窗口,使用的上一次窗口的结果。 > > > 我尝试了发生了异常, Window aggregate can only be defined over a time attribute > column, but TIMESTAMP(3) encountered. >

请教:时间属性字段传递问题,有办法解决吗?

2020-08-06 文章 Tianwang Li
我向做两次的窗口计算。 1、第一次5分钟窗口。 2、第二次10分钟窗口,使用的上一次窗口的结果。 我尝试了发生了异常, Window aggregate can only be defined over a time attribute column, but TIMESTAMP(3) encountered. 请问有什么办法可以解决吗? 我希望是一个窗口计算后面可以再接一个窗口计算。 第一次计算: CREATE VIEW tmp_5min AS SELECT max(rowtime) as rowtime, TUMBLE_START(`rowtime`,

Re: 滑动窗口数据存储多份问题

2020-07-14 文章 Tianwang Li
| 为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide 可以使用blink 的SQL,是通过pane 实现的,输出的时候才合并每个pane。参考`PanedWindowAssigner` 张浩 <13669299...@163.com> 于2020年7月6日周一 下午12:49写道: > Hi,all! > 由于第一次咨询,我不确定上一份邮件大家是否收到。 > 想咨询下大家,为什么使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide > 份? > > > |

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-29 文章 Tianwang Li
:58写道: > Hi > > > > 如果任务会经常出现反压的话,可以先解决反压问题,因为任务反压也会影响checkpoint,其次就是可以关注一下你的作业的物理资源指标,比如cpu使用率,内存使用率,gc频率是否特别高,尝试保障物理资源使用率在正常水平。 > > > Best, > Yichao Yang > > > > > ------原始邮件-- > 发件人:"Tianwang Li" 发送时间:2020年6月28日(星期

Re: 回复: 关于拓展 Tuple元组的问题

2020-06-29 文章 Tianwang Li
> > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么? > 用Row 和 Tuple 性能上会有差别吗? Jark Wu 于2020年6月19日周五 下午3:47写道: > 用 Row 啊,支持 Int.MAX 个元素,还能支持 null 值,不香么? > > > On Fri, 19 Jun 2020 at 15:42, Weixubin <18925434...@163.com> wrote: > > > 感谢你的回答,请问可否举一个参照例子? > > > > > > > > > > > > > > > > > > > > > > > > > >

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-28 文章 Tianwang Li
> > 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key 在这一两天会突然增多的情况。 > 我增加每个task处理窗口数据的时间在观察一下, 我这个是测试任务,没有sink输出。 source -> window -> window(统计上一个窗口的输出的记录数,pint 10记录左右) LakeShen 于2020年6月28日周日 上午10:35写道: > Hi Tianwang Li, > > 偶尔一两天出现 Checkpoint 超时,看下你的任务中,是否可能存在某类 key

Re: Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 文章 Tianwang Li
我补充一下,checkpoint的UI截图如下: https://imgchr.com/i/NgCUgS https://imgchr.com/i/NgChDJ https://imgchr.com/i/NgCT4x > -- ** tivanli **

Flink-1.10.0 source的checkpoint偶尔时间比较长

2020-06-27 文章 Tianwang Li
关于Flink checkpoint偶尔会比较长时间的问题。 *环境与背景:* 版本:flink1.10.0 数据量:每秒约10万左右的记录,数据源是kafka 计算逻辑:滑动窗口统计,每个窗口输出的规模大概1~2千万记录。 是否有反压:任务经常会出现反压(特别是在窗口输出的时候)。 *问题:* 大部分时候checkpoint都是在1分钟内完成,偶尔会出现checkpint需要超过30分钟的(出现的频率不高,1~2天1次)。 source的checkpoint消耗的时间比较长。Trigger checkpoint 到 Starting checkpoint消耗时间比较长。

Re: flinksql如何控制结果输出的频率

2020-03-27 文章 Tianwang Li
第一个场景: 从SQL的角度,增加时间字段精确到分钟为key,格式如-MM-dd HH:mm。这样是不是就可以实现你要到效果了。 flink小猪 <18579099...@163.com> 于2020年3月27日周五 上午11:29写道: > 我有两个需求 > 1.如果我需要开一个一分钟的窗口,但是我希望能够每5分钟输出一次结果(输出每分钟的聚合结果,即输出5行聚合结果),该怎么办? > 2.如果我需要开一个一个小时的窗口,但是我希望能够每5分钟输出一次结果(窗口数据还没全部到达,先输出当前已经聚合的结果),该怎么办? --

Re: flink sql如何支持每隔5分钟触发当日零点到当前5分钟的聚合计算

2020-03-21 文章 Tianwang Li
经处理过了,处理成了 10:00, 10:10, 10:20... 这种10分钟间隔的点,所以按照 time_str > 分组的话,一天下来也就 24*6 个点。 > 在 Flink SQL 中,并不一定要 GROUP BY TUMLBE 才能做类似窗口聚合的操作,直接 GROUP BY hour/min/ts > 也能达到类似的效果。 > 只不过前者不会输出更新,且能自动清理 state,后者会输出更新且不会自动清理 state。 > > Best, > Jark > > On Sat, 21 Mar 2020 at 11:24, Tia

Re: flink sql如何支持每隔5分钟触发当日零点到当前5分钟的聚合计算

2020-03-20 文章 Tianwang Li
SELECT HOP_START (ts, INTERVAL '3' MINUTE, INTERVAL '24' HOUR), HOP_END (ts, INTERVAL '3' MINUTE, INTERVAL '24' HOUR), dt, count(1) as pv, count(distinct userid) as uv FROM t_user_log GROUP BY HOP (ts, INTERVAL '5' MINUTE, INTERVAL '24' HOUR), dt