Re: flink sql聚合后collect收集数据问题

2021-08-11 文章 Caizhi Weng
; > > 1. 请问这个select语句要怎么写? > select name, collect(color) as colors from source_table group by > tumble(ts, interval '5' seconds) > 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? > > > 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? > > 3, 若取出现次数最多的前N个,又该怎么写flink sql? > select na

flink sql聚合后collect收集数据问题

2021-08-11 文章 casel.chen
source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select *, ROW_NUMBER() OVER (PARTITION BY name ORDER

Re:Re: Flink SQL向下兼容吗?

2021-08-11 文章 casel.chen
如果只是数据同步作业,例如从kafka消费将数据存入下游db,这种弱“状态”作业能跨版本兼容么? 在 2021-08-11 16:54:56,"Leonard Xu" 写道: >这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级, >DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 >DDL的, >只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供

flink sql聚合后collect收集数据问题

2021-08-11 文章 casel.chen
source_table group by tumble(ts, interval '5' seconds) 这里collect(color)返回的是multiset类型,怎样转成Array类型呢? 2. 如果array元素很多,我只想取其中N个,该怎么写flink sql? 3, 若取出现次数最多的前N个,又该怎么写flink sql? select name, collect(color) as colors from ( select name, color from ( select *, ROW_NUMBER() OVER (PARTITION BY name ORDER

Re: Flink SQL向下兼容吗?

2021-08-11 文章 Leonard Xu
这里的SQL是指DDL还是DML,通常 DML都是兼容的,且一般不会有不兼容的升级, DDL 语法 各家 SQL 方言都有自己的语法,这个比较灵活,FLINK SQL 的DDL 各个版本稍有不同,但 Flink SQL 新版本都是兼容老的 DDL的, 只是新版本上的DDL语法如果提供了更丰富的功能,那么老版本的DDL则不能提供 。 所以我理解你关心的兼容性问题是不存在的,但请注意如果你的SQL作业是有状态的,需要带状态升级,这些状态都是跨版本不兼容的。 祝好, Leonard > 在 2021年8月10日,11:44,Jason Lee 写道: > &

Flink SQL向下兼容吗?

2021-08-09 文章 Jason Lee
各位大佬好, 请教一个问题,我们最近打算升级Flink 版本,想问一下升级之后的已有任务的SQL会兼容到新版本吗? 比如我升级到1.13,那我1.10的SQL语法能被兼容吗? 感恩 | | Chuang Li | | jasonlee1...@163.com | 签名由网易邮箱大师定制

Re: 如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?

2021-08-09 文章 Caizhi Weng
2021年8月10日周二 上午9:35写道: > 同行们,大家好, > > 请教一个问题,现在有3个kafka的topic:device consumer order > 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。 > 我是这么设计的: > > 1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3 > ,指定事件时间、water mark > 2. 进行3张表的关联: > >

如何通过Flink-sql 实现3个kafka-topic的join以及后续的窗口聚合计算操作?

2021-08-09 文章 wang guanglei
同行们,大家好, 请教一个问题,现在有3个kafka的topic:device consumer order 想用Flink-sql计算出多个聚合指标,比如:过去12小时,每个deviceid下的订单量。 我是这么设计的: 1. 先通过 create table with(...='kafka') ... 注册出 table1 table2 table3 ,指定事件时间、water mark 2. 进行3张表的关联: create temporary view wide_table as ( select *** from

flink sql sink到hbase报错hbase版本问题

2021-08-04 文章 tomhan_th
请教各位大佬一个问题, 使用flink sql sink数据到hbase (flink版本 1.13.1 hbase版本2.2.6) 提交任务后,一直报错误是 java.lang.RuntimeException: hbase-default.xml file seems to be for an older version of HBase (2.2.3), this version is 2.2.6 已经在连接器参数里面配置了 'properties.hbase.defaults.for.version.skip'='true', hbase-default.xml也配置了跳

Re: Flink sql 维表聚合问题请教

2021-08-04 文章 carlc
始邮件-- > 发件人: > "user-zh" > > 发送时间:2021年8月4日(星期三) 下午4:44 > 收件人:"user-zh" &g

?????? Flink sql ????????????????

2021-08-04 文章 ????
??lookup??on??key,?? ??batch ---- ??:

Re: Flink sql 维表聚合问题请教

2021-08-04 文章 Caizhi Weng
Hi! 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。 [1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join carlc 于2021年8月4日周三 下午3:57写道: >

Re: Flink sql 维表聚合问题请教

2021-08-04 文章 carlc
感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。 create view v_bl_user_count as ( select user_id, count(1) from mysql_user_blacklist group by user_id ); select t1.`user_id` , t1.`event_type` , t1.`current_ts` from kafka_user_event t1 left join v_bl_user_count FOR SYSTEM_TIME AS OF

Re: Flink sql 维表聚合问题请教

2021-08-04 文章 Caizhi Weng
Hi! 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal table join 了。 carlc 于2021年8月4日周三 上午10:41写道: > 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > > -- 模拟需求(有点牵强...): > -- 过滤 kafka_user_event 中

flink sql统计IP出现次数TopN问题

2021-08-03 文章 casel.chen
场景:实时统计用户访问日志数据,求一分钟内访问事件发生次数超过5次的用户,其不同source_ip出现次数最多前3个的事件 源表数据 user_name, source_ip, ts 张三, 100, 00:08 张三, 104, 00:12 张三, 100, 00:15 张三, 101, 00:35 张三, 100, 00:38 张三, 102, 00:40 张三, 102, 00:45 张三, 101, 00:47 张三, 100, 00:55 张三, 100, 01:15 李四, 200, 01:17 李四, 200, 01:19 李四, 200, 01:27 王五,

Flink sql 维表聚合问题请教

2021-08-03 文章 carlc
请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ -- 模拟需求(有点牵强...): -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 -- 1. 创建user_blacklist表 CREATE TABLE `user_blacklist` ( `user_id` bigint(20) NOT NULL, `create_time` datetime

Re:Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Ye Chen
t;> 需求:table t 有三个字段(a,b,c) >> 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变, >> >> 例如mysql 支持: >> insert into t(a,b) select 1,2 on duplicate key update b=2; >> 主键重复的时候只更新字段b,字段c的值不变。 >> 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。 >> 我在sql-client测试了一下:inser

Re: 回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Tony Wei
ink/flink-docs-master/docs/dev/table/sql/insert/#syntax Ye Chen 於 2021年8月2日 週一 下午4:08寫道: > 你好,我们用的1.11版本。 > > 需求:table t 有三个字段(a,b,c) > 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变, > > 例如mysql 支持: > insert into t(a,b) select 1,2 on duplicate key update b=2; > 主键重复的时候只更新字段b,字段c的值不变。 >

Re:回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Ye Chen
你好,我们用的1.11版本。 需求:table t 有三个字段(a,b,c) 我们的场景只想根据主键a更新部分字段b,其余的字段c原有的值保持不变, 例如mysql 支持: insert into t(a,b) select 1,2 on duplicate key update b=2; 主键重复的时候只更新字段b,字段c的值不变。 但是flink sql 目前只支持全字段更新:insert into t(a,b,c) select 1,2,3 。 我在sql-client测试了一下:insert into t(a,b) select 1,2 on duplicate key

回复:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 silence
用的什么版本这个应该已经在1.13中修复了https://issues.apache.org/jira/browse/FLINK-18726 不行的话可以在ddl中限制列的数量 -- 发件人:Ye Chen 发送时间:2021年8月2日(星期一) 11:37 收件人:user-zh ; silence 主 题:Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 你好,我试了一下,如

Re:Re: 回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Ye Chen
需求:现有table t 三个字段 CREATE TABLE t ( abigint, bbigint, cbigint, PRIMARY KEY (a) NOT ENFORCED ) WITH ( ... ); 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, 例如mysql 支持:insert into t(a,b) select 1,2 on duplicate key update b=2; 主键重复的时候只更新字段b,字段c的值不变。但是flink sql 目前只支持全字段更新:insert into t

Re: 回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-02 文章 Shengkai Fang
bigint, > PRIMARY KEY (a) NOT ENFORCED > ) WITH ( > ... > ); > 我们的场景只想根据主键a更新部分字段b,其余的字段c保持不变, > 例如mysql 支持 insert into t(a,b,c) select '1','2','3' on duplicate key > update b='4';主键重复的时候只更新字段b,字段c的值不变。 > 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 &

Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 Ye Chen
时候只更新字段b,字段c的值不变。 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种根据主键更新部分字段的场景 使用flink sql应该怎么处理? 在 2021-08-02 10:47:55,"silence" 写道: >如果只想更新部分字段的话可以试下 >insert into t(a,b) select a,b from x > > >--

Re:回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 Ye Chen
(a,b,c) select '1','2','3' on duplicate key update b='4';主键重复的时候只更新字段b,字段c的值不变。 我在官方文档中没找到这个用法,sql-client也测试了一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理? 在 2021-08-02 10:08:28,"silence" 写道: >你在你的sink ddl定义了主键会自动的按主键进行upsert的 >参考https://ci.apache.org/proj

回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 silence
如果只想更新部分字段的话可以试下 insert into t(a,b) select a,b from x -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ? 现有table CREATE TABLE t ( abigint, b

回复:场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-08-01 文章 silence
你在你的sink ddl定义了主键会自动的按主键进行upsert的 参考https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#idempotent-writes -- 发件人:Ye Chen 发送时间:2021年7月30日(星期五) 17:57 收件人:user-zh 主 题:场景题:Flink SQL 不支持 INSERT

场景题:Flink SQL 不支持 INSERT INTO… ON DUPLICATE KEY UPDATE ?

2021-07-30 文章 Ye Chen
一下也不支持 on duplicate key update,会报错。 请问这种部分字段更新的场景 使用flink sql应该怎么处理?

Re:回复:回复:flink sql 依赖隔离

2021-07-26 文章 Michael Ran
突, >>目前初步想法是每个udf绑定一个jarpath,使用独立的classloader进行加载,避免和主jar以及其他udf之间的依赖冲突 >>-- >>发件人:Michael Ran >>发送时间:2021年7月22日(星期四) 20:07 >>收件人:user-zh ; silence >>主 题:Re:flink sql 依赖隔离 >> >>通过任务进行隔离引用呗。你们美团已经是k8s了吧? >>在 2021-07-05 14:06:53,"

回复:回复:flink sql 依赖隔离

2021-07-25 文章 silence
就是单独引用的啊,但任务逻辑比较复杂时会同时混用多个udf这个是没法避免的啊 -- 发件人:Michael Ran 发送时间:2021年7月23日(星期五) 17:42 收件人:user-zh ; silence 主 题:Re:回复:flink sql 依赖隔离 建议上传的时候单独放,提交任务的时候 拉下来单独引用 在 2021-07-23 11:01:59,"silence" 写道: > >这边目前主要还是yarn,

Re:回复:flink sql 依赖隔离

2021-07-23 文章 Michael Ran
,避免和主jar以及其他udf之间的依赖冲突 >-- >发件人:Michael Ran >发送时间:2021年7月22日(星期四) 20:07 >收件人:user-zh ; silence >主 题:Re:flink sql 依赖隔离 > >通过任务进行隔离引用呗。你们美团已经是k8s了吧? >在 2021-07-05 14:06:53,"silence" 写道: >>请教大家目前flink sql有没有办法做到依赖隔离 >>比如connector,format,udf(这个最重要)等, >

回复:flink sql 依赖隔离

2021-07-22 文章 silence
主 题:Re:flink sql 依赖隔离 通过任务进行隔离引用呗。你们美团已经是k8s了吧? 在 2021-07-05 14:06:53,"silence" 写道: >请教大家目前flink sql有没有办法做到依赖隔离 >比如connector,format,udf(这个最重要)等, >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划

Re:Re: flink sql 依赖隔离

2021-07-22 文章 Michael Ran
写道: > >> 通过任务进行隔离引用呗。你们美团已经是k8s了吧? >> 在 2021-07-05 14:06:53,"silence" 写道: >> >请教大家目前flink sql有没有办法做到依赖隔离 >> >比如connector,format,udf(这个最重要)等, >> >很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 >> >目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划 >> > > >-- >Best Regards > >Jeff Zhang

Re: flink sql 依赖隔离

2021-07-22 文章 Jeff Zhang
Zeppelin 支持依赖的动态加载 https://www.yuque.com/jeffzhangjianfeng/gldg8w/rn6g1s https://www.yuque.com/jeffzhangjianfeng/gldg8w/dthfu2 Michael Ran 于2021年7月22日周四 下午8:07写道: > 通过任务进行隔离引用呗。你们美团已经是k8s了吧? > 在 2021-07-05 14:06:53,"silence" 写道: > >请教大家目前flink sql有没有办法做到依赖隔离 > >比如

Re: Re: 能否使用命令行的 -C 命令 加载 flink sql 的 connector?

2021-07-16 文章 Caizhi Weng
> at > org.apache.hadoop.hive.conf.HiveConf.clinit(HiveConf.java:146) > ... 24 more > 在 2021-07-16 16:10:10,"Caizhi Weng" 写道: > >Hi! > > > >理论上可行,可以尝试一下。但要注意 -C 指定的路径必须是所有节点都能访问到,如果指定的是一个本地路径,那么所有节点的本地路径下都要有相应的 > >connector jar。 > > > >niko 于2021年7月16日周五 下午3:21写道: > > > >> 能否使用命令行的 -C 命令 加载 flink sql 的 connector? >

Re:Re: 能否使用命令行的 -C 命令 加载 flink sql 的 connector?

2021-07-16 文章 niko
16日周五 下午3:21写道: > >> 能否使用命令行的 -C 命令 加载 flink sql 的 connector?

Re: 能否使用命令行的 -C 命令 加载 flink sql 的 connector?

2021-07-16 文章 Caizhi Weng
Hi! 理论上可行,可以尝试一下。但要注意 -C 指定的路径必须是所有节点都能访问到,如果指定的是一个本地路径,那么所有节点的本地路径下都要有相应的 connector jar。 niko 于2021年7月16日周五 下午3:21写道: > 能否使用命令行的 -C 命令 加载 flink sql 的 connector?

能否使用命令行的 -C 命令 加载 flink sql 的 connector?

2021-07-16 文章 niko
能否使用命令行的 -C 命令 加载 flink sql 的 connector?

Re: flink sql使用HepPlanner进行编译优化

2021-07-13 文章 terry Huang
flink sql的不确定性优化相比于批处理看起来是比较少的,另外我们使用的版本(flink-1.8)的实现并没有统计信息,因此我们在尝试使用 Hep Planner来提高编译速度。不知道是否会导致其它问题,比如语义变化等 Caizhi Weng 于2021年7月14日周三 上午10:08写道: > Hi! > > Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost > based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端

Re: flink sql使用HepPlanner进行编译优化

2021-07-13 文章 Caizhi Weng
Hi! Hep planner 是一个 rule based 的 planner,较多用于确定性的优化上。Volcano planner 是一个 cost based 的 planner,多用于不确定性的优化(例如 join 方式的选择,build 端的选择等),需要靠统计信息等进行决策。目前 Flink 两者均有应用。 terry Huang 于2021年7月13日周二 下午7:31写道: > 大佬们好,目前Flink sql使用calcite 的Volcano > > Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPl

flink sql使用HepPlanner进行编译优化

2021-07-13 文章 terry Huang
大佬们好,目前Flink sql使用calcite 的Volcano Planner进行逻辑计划优化,但是我们的实践下来觉得编译时间有点长,我们准备使用HepPlanner来做优化。请问,这么做是否会带来致命问题或者flink sql 使用Volcano planner的原因是什么呢

Re:Re: Flink Sql 1.13 UDF ERROR

2021-07-11 文章 Roc Marshal
Hi, Jingsong. 最新的类型推导相对于之前版本的类型推导更加严格,对schema的非空限制校验也更加细致。 在之前提到的例子中使用基本类型做UDF参数, 表示跟UDF中参数相关的列必须非空,而在创建视图时,每个类型默认的非空限制为false,因此出现了之前描述的问题。 祝好。 Best Roc. 在 2021-06-29 11:02:55,"Jingsong Li" 写道: >Hi, > >你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题 > >Best, >Jingsong > >On

flink sql cdc数据按主键keyby入库问题

2021-07-10 文章 casel.chen
场景:mysql数据实时同步到mongodb. 上游mysql binlog日志发到一个kafka topic, 不保证同一个主键的记录发到相同的partition,为了保证下游sink mongodb同一主键的所有记录按序保存,所以需要按主键keyby。然后下游再批量写入mongodb。 问题:flink sql有办法解决上述问题?如果可以的话,要怎么写? create table person_source ( id BIGINT PRIMARY KEY NOT FORCED, name STRING, age BIGINT

flink-sql有没有类似hive里distribute by的功能

2021-07-08 文章 woods
flink-sql有没有类似hive里distribute by的功能,数据行根据某个字段hash到不同的 task中

flink-sql 连接kafka报错

2021-07-08 文章 yanyunpeng
/kafka/internals/ClosableBlockingQueue;Ljava/lang/String;JZLorg/apache/flink/metrics/MetricGroup;Lorg/apache/flink/metrics/MetricGroup;)V flink-sql 查询kafka kafka版本2.4 connector版本flink-sql-connector-kafka_2.11-1.11.2.jar 请求 这是什么原因 是 connector的版本问题有?

Re: Flink SQL MYSQL schema 特性问题

2021-07-07 文章 Terry Wang
Hi~ 你需要的应该是flink sql里提供的catalog功能 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/#postgres-database-as-a-catalog 目前PostgresCatalog实现了jdbc catalog,MysqlCatalog没有支持,有一些资料可以参考实现: https://vendanner.github.io/2020/11/25/Flink-SQL-%E4%B9%8B

Flink SQL MYSQL schema 特性问题

2021-07-07 文章 Roc Marshal
Hi, 请问目前的 Flink SQL 在创建source表的时候支持自动拉取所有的表列信息并解析吗? 谢谢。 Best, Roc.

回复:flink sql 依赖隔离

2021-07-05 文章 silence
没用放在lib下,是启动时通过-C动态添加udf jar,一个sql作业可能会用到很多udf,可能是不同的用户写的,所以经常会出现依赖冲突 -- 发件人:yzhhui 发送时间:2021年7月5日(星期一) 14:09 收件人:user-zh@flink.apache.org ; silence 抄 送:user-zh 主 题:回复:flink sql 依赖隔离 提交任务的时候提交自己的jar就好了,这个不要放公共lib下 就OK 在2021年

flink sql 依赖隔离

2021-07-05 文章 silence
请教大家目前flink sql有没有办法做到依赖隔离 比如connector,format,udf(这个最重要)等, 很多情况下不同用户定义的组件会经常混用,会有很多的依赖冲突问题难以解决。 目前有没有办法做到udf的依赖隔离(比如不同的udf使用独立的jar和classloader),或者社区对此有没有什么规划

回复:flink sql 空闲数据源场景如何配置

2021-06-30 文章 杨光跃
15:36,silence 写道: 可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置

回复:flink sql 空闲数据源场景如何配置

2021-06-30 文章 杨光跃
-- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制

回复:flink sql 空闲数据源场景如何配置

2021-06-30 文章 silence
可参考 https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/config/#table-exec-source-idle-timeout -- 发件人:杨光跃 发送时间:2021年6月30日(星期三) 10:54 收件人:user-zh@flink.apache.org 主 题:flink sql 空闲数据源场景如何配置 在代码中可以通过

flink sql 空闲数据源场景如何配置

2021-06-29 文章 杨光跃
在代码中可以通过 .withIdleness(Duration.ofMinutes(1)); 指定空闲数据源也触发水印,那么sql中怎么表示呢 | | 杨光跃 | | yangguangyuem...@163.com | 签名由网易邮箱大师定制

Re: Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Jingsong Li
Hi, 你可以创建个JIRA,让Timo看看,UDAF引入了新的类型推导,可能有问题 Best, Jingsong On Tue, Jun 29, 2021 at 7:10 AM Roc Marshal wrote: > > > Hi, All. > > > 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: > > > 版本: 1.13.1 > 运行模式: IDE-application > --- > about udf define...

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public

Flink Sql 1.13 UDF ERROR

2021-06-28 文章 Roc Marshal
Hi, All. 请教一个在最新的1.13.1 api升级调研中遇到的问题,谢谢大家: 版本: 1.13.1 运行模式: IDE-application --- about udf define... public static class UDFAggregateFunction extends AggregateFunction { //返回最终结果 @Override public

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-28 文章 Wei JI10 季伟
Hi, 貌似是jar包冲突了,我再确认确认。 在 2021/6/28 下午2:33,“王刚” 写入: 注意:此封邮件来自于公司外部,请注意信息安全! Attention: This email comes from outside of the company, please pay attention to the information security! 把flink parquet包放在flink客户端lib包下试试呢

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-28 文章 王刚
把flink parquet包放在flink客户端lib包下试试呢 原始邮件 发件人: Wei JI10 季伟 收件人: user-zh@flink.apache.org 发送时间: 2021年6月28日(周一) 14:14 主题: Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath 您好, 我没有设置scope,我看jar包中是有org/apache/flink/formats/parquet/这个目录的... 在 2021

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-28 文章 Wei JI10 季伟
您好, 我没有设置scope,我看jar包中是有org/apache/flink/formats/parquet/这个目录的... 在 2021/6/28 下午12:47,“zhisheng” 写入: 注意:此封邮件来自于公司外部,请注意信息安全! Attention: This email comes from outside of the company, please pay attention to the information security! 看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧?

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
看下你引入的 jar 包是咋引入的,scope 设置的是 provided 吧? Wei JI10 季伟 于2021年6月28日周一 下午12:19写道: > 您好, > 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么? > > 在 2021/6/28 上午11:59,“Jingsong Li” 写入: > > 注意:此封邮件来自于公司外部,请注意信息安全! > Attention: This email comes from outside of the company, please pay > attention to the

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好, 版本都是1.12.3的,有其他方式能够定位到这个问题出在哪么? 在 2021/6/28 上午11:59,“Jingsong Li” 写入: 注意:此封邮件来自于公司外部,请注意信息安全! Attention: This email comes from outside of the company, please pay attention to the information security! Hi, 你的版本check下?集群和flink-parquet是同一个版本吗? BEST, Jingsong

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Jingsong Li
Hi, 你的版本check下?集群和flink-parquet是同一个版本吗? BEST, Jingsong On Mon, Jun 28, 2021 at 11:22 AM Wei JI10 季伟 wrote: > 您好, > 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。 > > -- Best, Jingsong Lee

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好, 不是sql client,是在yarn环境上跑的,运行的jar包中引入了上面的依赖,然后就看到有这个报错。

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 zhisheng
使用的是 sql client 测试的 sql 吗?如果是的话,记得在 flink lib 目录下添加 flink-sql-parquet jar 包,然后重启集群和 sql client Wei JI10 季伟 于2021年6月28日周一 上午9:35写道: > 您好, > 添加的parquet 依赖如下,不知道全不全 > > org.apache.flink > flink-parquet_${scala.binary.version} > ${flink.version} > >

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
您好, 添加的parquet 依赖如下,不知道全不全 org.apache.flink flink-parquet_${scala.binary.version} ${flink.version} org.apache.parquet parquet-avro 1.10.1 

Re: flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Zhiwen Sun
parquet 相关依赖增加了吗? Zhiwen Sun On Sun, Jun 27, 2021 at 3:57 PM Wei JI10 季伟 wrote: > Hi: > 在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 > Caused by: org.apache.flink.table.api.ValidationException: Could not find > any format factory for identifier 'parquet' in t

flink sql报错: Could not find any format factory for identifier 'parquet' in the classpath

2021-06-27 文章 Wei JI10 季伟
Hi: 在使用flink sql connector的filesytem时,指定format为parquet。抛出异常信息 Caused by: org.apache.flink.table.api.ValidationException: Could not find any format factory for identifier 'parquet' in the classpath. at org.apache.flink.table.filesystem.FileSystemTableSource

Re: flink sql cdc如何获取元数据

2021-06-22 文章 Leonard Xu
Hello, Flink sql cdc 还不支持获取元数据, 获取元数据的业务场景通常是怎么样的呢? 祝好, Leonard > 在 2021年6月23日,08:21,casel.chen 写道: > > flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。 > > > create table xxx_tbl ( > k_op varchar, -- 操作类型 > k_database varchar, -- 数据库名 > k_table varc

flink sql cdc如何获取元数据

2021-06-22 文章 casel.chen
flink sql cdc如何获取元数据?像数据库名,表名,操作类型,binlog产生时间等。 create table xxx_tbl ( k_op varchar, -- 操作类型 k_database varchar, -- 数据库名 k_table varchar, -- 表名 k_ts. BIGINT, -- binlog产生时间 idBIGINT, name. varchar ) with ( 'connector' = 'mysql-cdc', . 'meta.fields-prefix' = 'k_' )

Re:?????? flink sql??????????????????

2021-06-21 文章 Michael Ran
thub.com/DataLinkDC/dlink > > > > >---- >??: "todd": 2021??6??16??(??) 5:48 >??: "user-zh": Re: flink sql?? > > > >?

Re: Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2021-06-19 文章 JasonLee
hi 先执行一下 export HADOOP_CLASSPATH=`hadoop classpath` 就可以了 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-17 文章 Jark Wu
社区最近重新设计了 mysql-cdc 的实现,可以支持全量阶段并发读取、checkpoint,移除全局锁依赖。 可以关注 GitHub 仓库的动态 https://github.com/ververica/flink-cdc-connectors。 7月的 meetup 上也会分享相关设计和实现,敬请期待。 Best, Jark On Thu, 17 Jun 2021 at 09:34, casel.chen wrote: > Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql > cdc写mysql遇到数据同步跟不上数

Re: Re:Re: Re: Re:Re: flink sql job 提交到yarn上报错

2021-06-17 文章 yangpengyi
请问该问题有解决吗?我使用FLINK yarn-per-job方式提交到yarn集群也出现了这个错误 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 文章 casel.chen
Flink CDC什么时候能够支持修改并行度,进行细粒度的资源控制?目前我也遇到flink sql cdc写mysql遇到数据同步跟不上数据写入速度问题,何时能支持像mysql并行复制这种机制呢? 在 2021-06-16 17:27:14,"Leonard Xu" 写道: >看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception, >可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。 > >祝好, &

?????? flink sql??????????????????

2021-06-16 文章 ??????
FlinkSql WebIDE?? FlinkSQLSQL??SqlCli?? https://github.com/DataLinkDC/dlink ---- ??:

Re: flink sql平台多版本支持问题

2021-06-16 文章 todd
补充一种使用Flink api提交方式,参考:https://github.com/todd5167/flink-spark-submiter。 任务提交、状态获取继承统一的接口,上层服务在引用时,通过spi的方式进行加载即可。 缺点: - 需要对Flink client源码、类加载机制有了解。 优点: - 良好的外部集成 - 不需要额外部署服务 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 文章 Leonard Xu
看起来和 Flink-CDC 关系不大,看异常栈是 ES 侧抛出的异常 version_conflict_engine_exception, 可以查下这个异常,看下是不是有写(其他作业/业务 也在写同步表)冲突。 祝好, Leonard > 在 2021年6月16日,17:05,mokaful <649713...@qq.com> 写道: > > 相同问题,请问有处理方式吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2021-06-16 文章 mokaful
相同问题,请问有处理方式吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

??????Flink SQL ????????????DynamoDB

2021-06-14 文章 Asahi Lee
https://flink-packages.org/packages/streaming-flink-dynamodb-connector ---- ??: "user-zh"

flink sql cdc到kafka,如何提高sink并行度?

2021-06-14 文章 casel.chen
flink sql cdc发到kafka,显示下游写kafka并行度只有1,有没有办法提高并行度呢? 显示job-parallelism, table.exec.resource.default-parallelism, parallelism.default 都是 24,但execution graph显示parallelism还是1,我设置了pipeline.operator-chaining=false

Flink SQL 是否支持读写DynamoDB

2021-06-14 文章 wang guanglei
Hey 社区的同行们好, 请问 Flink 1.10以及以后的版本会支持通过Flink SQL 读写 DynamoDB么?有对应的connector么? 谢谢。

Re: flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
t; >> 在 2021-06-13 07:21:46,"Jeff Zhang" 写道: >> >另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink >> >job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524 >> > >> >casel.chen 于2021年6月12日周六 下午5:56写道: >> > >&g

Re: Re: flink sql平台多版本支持问题

2021-06-12 文章 Jeff Zhang
所有版本。钉钉群:32803524 > > > >casel.chen 于2021年6月12日周六 下午5:56写道: > > > >> 需求背景: > >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink > >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink > >> SQL作业采用的是1.13开发的。 >

Re:Re: flink sql cdc数据同步至mysql

2021-06-12 文章 casel.chen
请问 flink sql cdc 场景下如何增大下游sink端并行度? 我试了修改default.parallism=2参数,并且将operator chain参数设置成false,并没有效果。 而后,我将作业分成两步:首先 源mysql cdc sink到 upsert kafka,再从 upsert kafka sink到 目标mysql。是想通过kafka partition增大sink并行度 初步测试效果是可以的,kafka建了3个partitions,每个partitions都按主键hash分配到数据,下游并行度跟partitions个数对齐。 以下是作业内容

Re:Re: flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
有版本。钉钉群:32803524 > >casel.chen 于2021年6月12日周六 下午5:56写道: > >> 需求背景: >> 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink >> SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink >> SQL作业采用的是1.13开发的。 >> >> >> 而让平台支持不同Flink版本,我能想

Re: flink sql平台多版本支持问题

2021-06-12 文章 Jeff Zhang
另外一个选择是flink on zeppelin,可以调用flink on zeppelin的rest api,把zeppelin当做是flink job server, zeppelin天然支持flink 1.10之后的所有版本。钉钉群:32803524 casel.chen 于2021年6月12日周六 下午5:56写道: > 需求背景: > 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink > SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业

flink sql平台多版本支持问题

2021-06-12 文章 casel.chen
需求背景: 因为不同Flink版本SQL底层实现不相同,同一个带状态的Flink SQL作业目前跨版本升级社区还不支持。所以如果基于Flink SQL开发实时计算平台的话,平台需要支持不同Flink版本。早期的Flink SQL作业用的是1.11,最新的Flink SQL作业采用的是1.13开发的。 而让平台支持不同Flink版本,我能想到有三种实现方案: 1. 平台直接调用 flink run 或 flink run-application 提交作业 优点:实现简单,每个flink版本都会带这个shell脚本 缺点:受限于脚本提供的功能,像语法检查、格式化sql、获取sql执行

Re: Re:Re: flink sql维表延迟join如何实现?

2021-06-11 文章 chenchencc
temporary join -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink sql维表延迟join如何实现?

2021-06-11 文章 casel.chen
有例子吗?或者相关资料连接也行 在 2021-06-11 12:40:10,"chenchencc" <1353637...@qq.com> 写道: >使用事件时间就可以延时 > > > >-- >Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql维表延迟join如何实现?

2021-06-10 文章 chenchencc
使用事件时间就可以延时 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql维表延迟join如何实现?

2021-06-10 文章 Smile
我们之前试过用 session window 来实现,只要流数据有唯一键就可以按唯一键开 session window 来把整个流(而不是关联不上的部分数据)延迟,能凑合用 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re:Re: flink sql cdc数据同步至mysql

2021-06-10 文章 casel.chen
针对现在flink sql cdc下游并行度无法修改问题,是否可以分两步实现?谢谢! 1. flink sql cdc发到下游kafka,通过 upsert kafka connector,以debezium或canal格式,kafka topic开多个分区 2. 再从kafka消费,通过flink sql同步到最终mysql库 在 2021-06-08 19:49:40,"Leonard Xu" 写道: >试着回答下这两个问题。 > >> flink 1.12的jdbc connector不支持 sink.

flink sql cdc支持额外字段问题

2021-06-10 文章 casel.chen
flink sql cdc写入kafka,期望kafka消息带上数据库database,表table,变更时间和变更标记+I/-U/+U/-D这几个特殊字段,目前来看是做不到的,对吗?

flink sql维表延迟join如何实现?

2021-06-09 文章 casel.chen
延迟join主要是为了解决维表数据后于事实表数据到达问题。java代码可以实现,那flink sql这块能否通过sql hint解决呢?有没有示例?

回复:flink sql写mysql中文乱码问题

2021-06-09 文章 Jason Lee
c:mysql://host:3306/datav_test?useUnicode=true=utf8 本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢! 在 2021-05-19 17:52:01,"Michael Ran" 写道: 数据库的字段字符编码 在 2021-05-18 18:19:31,"casel.chen" 写道: 我的URL连接串已经使用了 useUnicode=truecharacterEncoding

Re: flink sql cdc数据同步至mysql

2021-06-08 文章 Leonard Xu
试着回答下这两个问题。 > flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc > connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? 是的,关键问题是cdc connector为了保证数据一致性只能单并发,所以作业也只能单并发。这个需要cdc connector支持多并发读取,下游sink自然就能解决。 > flink 1.13的jdbc connector新增

flink sql cdc数据同步至mysql

2021-06-08 文章 casel.chen
flink 1.12的jdbc connector不支持 sink.parallism 参数,是不是说只能和上游cdc connector保持相同的并行度1?如果是的话,下游sink会有瓶颈,因为表中存有大量的历史数据,将存量数据同步到目标表需要花费不少时间,而新的业务数据也源源不断进来,导致追不上。这要如何解决? flink 1.13的jdbc connector新增 sink.parallism 参数,问题是如果增大并行度的话能否确保同一个主键记录发到同一个sink task呢?SQL正确的写法是什么?

Re: Flink sql 状态过期后,checkpoint 大小没变化

2021-06-08 文章 chenchencc
你好,我也遇到这个问题,flink 1.12.2 sql,想问下 1.有什么方式能本地物理上删除那些ttl过期的数据吗 2.有什么方式能checkpoint时候删除ttl过期的数据吗?让checkpoint数据不再继续增长? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 如何获取flink sql的血缘关系?

2021-06-07 文章 LakeShen
一种方法就是借助 Flink SQL Parser,解析你的 SQL,然后获取到不同的 SQL node, 然后每个 SQL Node 都有对应的类型,以及 connector 后面的 with 参数,你需要自己在 写代码判定一下即可。本质是通过解析 SQL,来获取血缘关系。 Best, LakeShen casel.chen 于2021年6月8日周二 上午12:05写道: > 如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!

flink sql 从savepoint 重启遭遇新旧状态序列化不匹配的问题

2021-06-07 文章 mangguozhi
各位好,我在flink 1.13中使用flink sql 在一次修改代码后的重启任务中,报以下错误: For heap backends, the new state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@a5b17bdb) must not be incompatible with the old state serializer (org.apache.flink.api.common.typeutils.base.MapSerializer@e5a9c6d8). 我更改了sql中

如何获取flink sql的血缘关系?

2021-06-07 文章 casel.chen
如何获取flink sql的血缘关系?如:表A -> 表B。有代码示例吗?谢谢!

<    1   2   3   4   5   6   7   8   9   10   >