Re: flink sql解析kafka数据

2022-07-04 文章 林影
> 回复的原邮件 > | 发件人 | 小昌同学 | > | 发送日期 | 2022年06月30日 15:02 | > | 收件人 | user-zh@flink.apache.org | > | 主题 | flink sql解析kafka数据 | > 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink > sql建表语句拿到最里面的字段的值 > 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'=

回复:flink sql解析kafka数据

2022-07-04 文章 JasonLee
Hi 解析嵌套 JSON 可以参考这篇文章哈,https://mp.weixin.qq.com/s/KHVUlOsLSHPzCprRWSJYcA Best JasonLee 回复的原邮件 | 发件人 | 小昌同学 | | 发送日期 | 2022年06月30日 15:02 | | 收件人 | user-zh@flink.apache.org | | 主题 | flink sql解析kafka数据 | 各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值 我百度找

Re: Flink SQL Client 解析 Protobuf

2022-07-04 文章 Min Tu
多谢,我们会去试一下这个PR。 On Sun, Jul 3, 2022 at 5:21 PM Benchao Li wrote: > Hi Min, > > ProtoBuf Format[1] 有一个相关的PR,我们正在推进review和改进,预期是在1.16 > 中可以release出去。你也可以基于这个PR的代码编译打包一下,提前试用一下。 > > [1] https://github.com/apache/flink/pull/14376 > > Min Tu 于2022年7月4日周一 02:38写道: > >

Re: Flink SQL Client 解析 Protobuf

2022-07-03 文章 Benchao Li
Hi Min, ProtoBuf Format[1] 有一个相关的PR,我们正在推进review和改进,预期是在1.16 中可以release出去。你也可以基于这个PR的代码编译打包一下,提前试用一下。 [1] https://github.com/apache/flink/pull/14376 Min Tu 于2022年7月4日周一 02:38写道: > 各位大佬, > > 我们想利用 Flink SQL Client 解析 Kafka 数据流: 对于Kafka 数据流是Json 或者Avro 格式,已经可以解析, > 但是对于 Protobuf 数据

Re: flink sql生成执行图中GroupWindowAggregate算子数不符合预期

2022-06-30 文章 Shengkai Fang
hi. 能展示下具体想要的plan 和实际的 plan 吗? Best, Shengkai 明寒 于2022年7月1日周五 09:50写道: > > HI:​在flink1.12中,对于如下的Sql,生成的执行图中有两个GroupWindowAggregate算子,该如何调整Sql或者配置保证只生成一个GroupWindowAggregate算子 > CREATE TEMPORARY TABLE RawSource ( > `key` STRING, > `accessNum` INT, > `status` STRING, > rowTime

flink sql????????????GroupWindowAggregate????????????????

2022-06-30 文章 ????
HI???6?7??flink1.12??Sql??GroupWindowAggregateSql??GroupWindowAggregate CREATE TEMPORARY TABLE RawSource ( `key` STRING, `accessNum` INT, `status` STRING, rowTime TIMESTAMP(3), WATERMARK FOR rowTime AS

flink sql解析kafka数据

2022-06-30 文章 小昌同学
各位大佬 请教一下就是我kafka的数据是这样的嵌套格式 ARRAY里面嵌套了ROW类型 我这边想直接通过flink sql建表语句拿到最里面的字段的值 我百度找到了 'json.infer-schema.flatten-nested-columns.enable'='true' 但是我在客户端执行的时候 发现识别不到这个字段 有大佬遇到我这样的问题嘛 或者有啥其他的解决法子嘛 CREATE TABLE ccc_test_20220630_2 ( trans_number STRING

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-18 文章 Jingsong Li
-05-13 15:06 > 收件人: user-zh > 主题: Re: Re: flink sql无法读取Hive映射的HBase表 > Hi, 推荐 https://www.deepl.com/translator > 非常好用 > > 我记得对Hive Custom Storage Handler(hbase)是有问题的 > > Best, > Jingsong > > On Fri, May 13, 2022 at 2:12 PM 18579099...@163.com <18579099...@1

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-16 文章 18579099...@163.com
第一次弄,不知道这么写的对不对 https://issues.apache.org/jira/projects/FLINK/issues/FLINK-27604 18579099...@163.com 发件人: Jingsong Li 发送时间: 2022-05-13 15:06 收件人: user-zh 主题: Re: Re: flink sql无法读取Hive映射的HBase表 Hi, 推荐 https://www.deepl.com/translator 非常好用 我记得对Hive Custom Storage Handler(hbase)是有问题的 Best

Re:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-16 文章 RS
Hi, cancel的时候要加savepoint,然后启动的时候指定savepoint应该就不会丢数据了,直接cancel的话是可能丢数据的, checkpoint的作用和你想到可能不一样,你再看看 Thx 在 2022-05-12 10:38:33,"徐战辉" 写道: hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。 目前有一份作业,开启checkpoint, cancel 后重新启动,发现数据会丢失1小部分。 1. flink.conf

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-13 文章 Jingsong Li
15:11 > 收件人: user-zh > 主题: Re: flink sql无法读取Hive映射的HBase表 > 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY > 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题? > 我之后空了再debug 看看。 > > 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个 hive 表之后,它的 > StorageDescriptor 的 inpu

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-13 文章 18579099...@163.com
我英文能力不允许啊 18579099...@163.com 发件人: yuxia 发送时间: 2022-05-11 15:11 收件人: user-zh 主题: Re: flink sql无法读取Hive映射的HBase表 不好意思,我尝试复现你的问题,但是我没有 hbase 环境,不过看起来是只有当 STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 有问题? 我之后空了再debug 看看。 不过我看了一下 flink 这块的代码,从 flink 这块的代码来看,应该是 get 这个

回复:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 文章 徐战辉
| | Jerry Guo | | wangyixuhongm...@163.com | 回复的原邮件 | 发件人 | yuxia | | 发送日期 | 2022年5月12日 15:16 | | 收件人 | user-zh | | 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) | hi,可以解释一下具体是想咨询什么问题? Best regards, Yuxia - 原始邮件 - 发件人: "徐战辉" 收件人: "user-zh" 发送时间: 星

回复: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 文章 徐战辉
件 | 发件人 | yuxia | | 发送日期 | 2022年5月12日 15:16 | | 收件人 | user-zh | | 主题 | Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) | hi,可以解释一下具体是想咨询什么问题? Best regards, Yuxia - 原始邮件 - 发件人: "徐战辉" 收件人: "user-zh" 发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00 主题: 转发:基于flink sql作业失败与

Re: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-12 文章 yuxia
hi,可以解释一下具体是想咨询什么问题? Best regards, Yuxia - 原始邮件 - 发件人: "徐战辉" 收件人: "user-zh" 发送时间: 星期四, 2022年 5 月 12日 上午 10:53:00 主题: 转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) | | Jerry Guo | | wangyixuhongm...@163.com | 转发的原邮件 | 发件人 | 徐战辉 | | 发送日期 | 2022年5月12日 10:

Re:Re: How can I set job parameter in flink sql

2022-05-11 文章 wang
Ok, got it. Thanks so much! Regards, Hunk -- 发自我的网易邮箱手机智能版 在 2022-05-11 16:46:14,yuxia 写道: Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. If you still want to get the parameters in your udf, you can use the following code to set the parameter: env

转发:基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-11 文章 徐战辉
| | Jerry Guo | | wangyixuhongm...@163.com | 转发的原邮件 | 发件人 | 徐战辉 | | 发送日期 | 2022年5月12日 10:38 | | 收件人 | user-zh@flink.apache.org | | 主题 | 基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2) |

基于flink sql作业失败与取消状况下的作业参数设置咨询(flink-1.14.2)

2022-05-11 文章 徐战辉
hello, 请教下,如何设置flink配置及作业参数,在取消作业重新部署、flink作业失败重跑情况下,保证不丢失数据。 目前有一份作业,开启checkpoint, cancel 后重新启动,发现数据会丢失1小部分。 1. flink.conf execution.checkpointing.interval: 1 execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

Re: How can I set job parameter in flink sql

2022-05-11 文章 yuxia
Hi, AFAK, you can't get the parameter setted via Flink SQL client in udf. If you still want to get the parameters in your udf, you can use the following code to set the parameter: env = StreamExecutionEnvironment.getExecutionEnvironment parameter = new HashMap(); parameter .put

Re: flink sql无法读取Hive映射的HBase表

2022-05-11 文章 yuxia
jira~ https://issues.apache.org/jira/projects/FLINK/summary Best regards, Yuxia - 原始邮件 - 发件人: 18579099...@163.com 收件人: "user-zh" 发送时间: 星期二, 2022年 5 月 10日 上午 10:39:16 主题: Re: Re: flink sql无法读取Hive映射的HBase表 版本: flink:1.13.6 hive:2.1.1-cdh6.2.0 hbase:2.1.0-cdh6.2.0 flinksql执行工具

How can I set job parameter in flink sql

2022-05-11 文章 wang
Hi dear engineer, I want to override the function open() in my UDF, like: | public class BlackListConvertFunction extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { String path = context.getJobParameter("black_list_path",

How can I set job parameter in flink sql

2022-05-11 文章 wang
Hi dear engineer, I want to override the function open() in my UDF, like: In open() function, I want to fetch the configred value "black_list_path", then simply print that value out. And I config this value in ./sql-client.sh console: SET black_list_path = /root/list.properties Then

Re: Re: flink sql无法读取Hive映射的HBase表

2022-05-09 文章 18579099...@163.com
版本: flink:1.13.6 hive:2.1.1-cdh6.2.0 hbase:2.1.0-cdh6.2.0 flinksql执行工具:flink sql client sql 提交模式:yarn-per-job - flink lib目录下的包 antlr-runtime-3.5.2.jar

Re: flink sql无法读取Hive映射的HBase表

2022-05-09 文章 yuxia
用的是 Hive Catalog 吗? Hive connector 和 Hive 的版本 都是多少呢? 另外,详细堆栈贴一下。 Best regards, Yuxia - 原始邮件 - 发件人: 18579099...@163.com 收件人: "user-zh" 发送时间: 星期一, 2022年 5 月 09日 下午 5:46:02 主题: flink sql无法读取Hive映射的HBase表 我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式

Re: flink sql无法读取Hive映射的HBase表

2022-05-09 文章 Shengkai Fang
hi, 能从日志中拿到更多详细的日志吗?请同时分享下你的执行步骤? Best, Shengkai 18579099...@163.com <18579099...@163.com> 于2022年5月9日周一 17:46写道: > 我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式 > 读取数据(不直接使用flink > 读取hbase是我使用的catalog是hive,不用再写建表语句然后再查),当我用sql-client尝试的时候报错。 > 读取正常的hiv

flink sql无法读取Hive映射的HBase表

2022-05-09 文章 18579099...@163.com
我有一部分表的数据是存在hbase上的,平时通过hive加载外部表的方式读取hbase的数据,我想通过flink sql读取hive表的方式 读取数据(不直接使用flink 读取hbase是我使用的catalog是hive,不用再写建表语句然后再查),当我用sql-client尝试的时候报错。 读取正常的hive是可以正常读取的,但是读取hive on hbase表却报 [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.catalog.exceptions.CatalogException

Flink SQL??????Java code????debug

2022-04-25 文章 zhiyezou
Ideadebug??

Re: flink-connector和flink-sql-connector的区别

2022-04-21 文章 Shengkai Fang
hi sql jar 往往是 shade 了相关的依赖,而 普通的 jar 则不带有相关的依赖。正如名字所说,在 table api/sql 的情况下建议使用 sql jar,datastream 建议使用 普通的jar。 Best, Shengkai weishishuo...@163.com 于2022年4月21日周四 16:52写道: > > cdc项目中每种connector都分成flink-connector-xxx和flink-sql-connector-xxx,比如flink-connector-mysql-cdc和flink-sql-connector

flink-connector和flink-sql-connector的区别

2022-04-21 文章 weishishuo...@163.com
cdc项目中每种connector都分成flink-connector-xxx和flink-sql-connector-xxx,比如flink-connector-mysql-cdc和flink-sql-connector-mysql-cdc,这两个的区别是什么呢?在什么场景下用前者,什么场景下用后者? weishishuo...@163.com

flink sql 任务中jm Blob server 总是在凌晨报 java.io.exception :unknow opreation 71

2022-04-07 文章 su wenwen
hi,all.想问大家下,是否有遇到过这个问题,flink 1.12 的版本 在线上运行的flink sql 作业,总是在凌晨报错如下: [cid:b11b980a-9bcd-4e7d-993a-e83a9322c66c] blobserver 我理解是传输二进制jar 包,从hdfs 到 本地工作目录。但没发现其他环节出现问题,对任务数据未产生影响。。

Could you please give me a hand about json object in flink sql

2022-03-31 文章 wang
Hi dear engineer, Thanks so much for your precious time reading my email! Resently I'm working on the Flink sql (with version 1.13) in my project and encountered one problem about json format data, hope you can take a look, thanks! Below is the description of my issue. I use kafka as source

Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-22 文章 wushijjian5
1. 两个流的数据分别存入hbase; 2. 然后起新的消费者,两条流做outer join , 并设置一定时间的TTL: A, 能关联上直接输出 B, 右流为空,左流关联hbase补充右流数据 C, 左流为空,右流关联hbase补充左流数据 3, 全局数据根据一个version或ts可以排序去重 > 2022年3月22日 17:07,Michael Ran 写道: > > 可以考虑存储层 局部更新 > 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道: >> Cdc join >>

Re:回复:flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 文章 Michael Ran
因为 KafkaConnectorOptions 里面没有,所有WITH 参数里面不知道如何加入了 在 2022-03-22 18:22:44,"写虫师" 写道: >--原始邮件-- >发件人: > "user-zh"

??????flink-sql??????kafka ??????????????????????

2022-03-22 文章 ??????
---- ??: "user-zh"

flink-sql,对于kafka 的一些额外参数如何配置

2022-03-22 文章 Michael Ran
dear all : 目前用flink1.4 table api +kafka 的情况下,有各种警告,比如: The configuration 'partition.discovery.interval.ms' was supplied but isn't a known config. 这些额外的参数,在SQL WITH参数里面没定义,不知道各位时在哪个位置加入配置的? 有什么建议吗? 感谢!

Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-22 文章 Michael Ran
可以考虑存储层 局部更新 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道: >Cdc join > >> 2022年3月21日 14:01,JianWen Huang 写道: >> >> 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。 >> 例子: >> 变化前: >> A流: >> name gender >> a male >> b male >> c female >> >> 纬度表B: >> name

Re:Re: Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 RS
Hi, 你这个例子中,捕获到B的变更CDC,若最终结果表支持部分字段更新,就直接更新结果表就行,都不需要关联, 只要你的B的CDC处理 晚于 A流的join处理就行 如果一定要全部关联的话,ttl又不可行,那你这个数据量会无限增大,后面就无法关联了的,设计肯定得改 在 2022-03-22 09:01:30,"JianWen Huang" 写道: >是的。其实我想到的也是将维度表和事实表都通过Cdc方式做成流,然后regular

Re: Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 JianWen Huang
是的。其实我想到的也是将维度表和事实表都通过Cdc方式做成流,然后regular join,实现这样的需求好像只能把双流数据都得存到状态里才可以实现,但是状态会不断增大且业务上不能接受数据不准确的结果,配ttl又不可行。所以来请教大家有没有碰到过这种场景。 casel.chen 于2022年3月22日周二 08:43写道: > > 用cdc join也需要将事实表缓存下来才能实现吧,这就是普通的regular > join,优点是双流驱动,缺点是需要缓存两边的数据,状态会变得很大,建议使用带ssd的rocksdb增量状态后端。 >

Re:Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 casel.chen
用cdc join也需要将事实表缓存下来才能实现吧,这就是普通的regular join,优点是双流驱动,缺点是需要缓存两边的数据,状态会变得很大,建议使用带ssd的rocksdb增量状态后端。 业务上如果可以接受超过一定时间范围不用关联的话,还可以设置state ttl 进一步使状态大小可控。 在 2022-03-21 17:00:31,"zns" <865094...@qq.com.INVALID> 写道: >Cdc join > >> 2022年3月21日 14:01,JianWen Huang 写道: >> >>

Re: 维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 zns
Cdc join > 2022年3月21日 14:01,JianWen Huang 写道: > > 事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。 > 例子: > 变化前: > A流: > name gender > a male > b male > c female > > 纬度表B: > nameage > a 16 > b17 > > 结果: > name gender age > a

维度表变化,关联结果全部更新在Flink SQL该如何实现

2022-03-21 文章 JianWen Huang
事实表流A需关联维度表B做数据打宽。需求是当纬度表B发生变化时,关联结果需全部发生变化更新到最新。 例子: 变化前: A流: name gender a male b male c female 纬度表B: nameage a 16 b17 结果: name gender age a male 16 b male 17 发生变化后: 纬度表B: nameage a 16->17 b

flink sql jdbc sink事务提交问题

2022-02-14 文章 casel.chen
最近在扩展flink sql jdbc connector以支持phoenix数据库,测试debug的时候发现数据能够通过PhoenixStatement.executeBatch()写入,但因为没有提交事务,所以其他人看不到。 源码中PhoenixPreparedStatement.execute()方法会调用executeMutation(statement)方法,继而判断connection.getAutoCommit()与否来执行connection.commit()方法。完了回到PhoenixStatement.executeBatch()执行flushIfNecessary

?????? Flink sql ??????????

2022-02-09 文章 ??????
?? ---- ??: "user-zh"

Re: Flink sql 客户端实现

2022-02-09 文章 JianWen Huang
-- > 发件人: > "user-zh" > > 发送时间:2022年2月10日(星期四) 中午11:19 > 收件人:"user-zh" > 主题:Flink sql 客户端实现 > > > > 我想实现一个Flink sql客户端,以支

??????Flink sql ??????????

2022-02-09 文章 ??????
https://github.com/DataLinkDC/dlink ?? ---- ??: "user-zh"

Flink sql 客户端实现

2022-02-09 文章 JianWen Huang
我想实现一个Flink sql客户端,以支持解析一段Flink sql后提交到对应的集群(K8s yarn等)。Flink sql不需要二次开发支持更多语法特性,能兼容Flink社区版本的语法即可。 部署方式可以支持Native k8s . standalone on K8s .yarn application .perjob 等等。我查看了目前官方的flink sql client.只支持session模式。 请问有没有相关的资料或者开源项目可以进行参考。谢谢。

flink1.12.2??????1.13.5??flink sql??????savepoint????

2022-02-08 文章 If
flink1.12.2??1.13.5??flink sql??savepoint switched from INITIALIZING to FAILED with failure cause: java.lang.Exception: Exception while creating StreamOperatorStateContext

Re: 回复: flink sql 如何提高下游并发度?

2022-01-16 文章 venn
flink 1.12 及以上版本  sql kafka sink 支持参数: sink.parallelism  指定 sink 的并行度 On 2022/1/11 17:06, 许友昌 wrote: hi, 设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。 在2022年1月11日 16:53,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10?

回复: flink sql 如何提高下游并发度?

2022-01-11 文章 许友昌
hi, 设置了parallelism=10 ,实际上是分配了 10 个 slot,flink 是会共享 slot 的,所以 sink 会有 10 线程。 在2022年1月11日 16:53,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: Hi! 可以设置 parallelism.default 为需要的并发数。

Re: flink sql 如何提高下游并发度?

2022-01-11 文章 Chang Li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的 Jeff 于2022年1月9日周日 19:45写道: > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

回复: flink sql 如何提高下游并发度?

2022-01-11 文章 JasonLee
hi 是 10 目前 source 还不支持单独设置并发度,但是 sink 是支持的,当然如果没有单独设置的话 sink 也是 10 Best JasonLee 在2022年01月11日 16:52,RS 写道: Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: Hi! 可以设置 parallelism.default

Re: flink sql 如何提高下游并发度?

2022-01-11 文章 chang li
可以的,提供一个思路,读取了kafka的数据后,直接输出原生的字节流后,接一层map算子做序列化相关工作,map算子的并发度你可以自己控制,这样kafka拉取就不会是瓶颈,大量的计算工作放到了map中,而map的并发度是可以自己控制的 Caizhi Weng 于2022年1月11日周二 11:11写道: > Hi! > > 可以设置 parallelism.default 为需要的并发数。 > > Jeff 于2022年1月9日周日 19:44写道: > > > 当source为kafka时,最大并发度由kafka分区决定的,

Re:Re: flink sql 如何提高下游并发度?

2022-01-11 文章 RS
Hi, 请教下,比如设置了parallelism=10,source kafka的topic分区为3,那source、后面的处理和sink的并发度是3还是10? 如果source是10的话,那还有7个线程就空闲了? 在 2022-01-11 11:10:41,"Caizhi Weng" 写道: >Hi! > >可以设置 parallelism.default 为需要的并发数。 > >Jeff 于2022年1月9日周日 19:44写道: > >> 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

Re: flink sql 如何提高下游并发度?

2022-01-10 文章 Caizhi Weng
Hi! 可以设置 parallelism.default 为需要的并发数。 Jeff 于2022年1月9日周日 19:44写道: > 当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

flink sql动态累计窗口实现问题

2022-01-10 文章 casel.chen
听了FFA2021快手Flink SQL分享有讲到动态累计窗口实现,想问一下Flink开源社区是否有相应实现?或者有相应的JIRA?我们也有这样的使用场景,如果暂时没有的话要如何自己实现?特别是自定义sql语法这块,有没有一些相关的教程?谢谢!

flink sql 如何提高下游并发度?

2022-01-09 文章 Jeff
当source为kafka时,最大并发度由kafka分区决定的, 有没有办法在不增加kafka分区情况下提高整个任务的并发度呢?

Re:Re: 咨询个Flink SQL的问题,如何去除null的字段

2022-01-06 文章 RS
29日周三 16:41写道: > >> Hi, >> 使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? >> >> >> 比如:源数据有3个字段,a,b,c >> insert into table2 >> select >> a,b,c >> from table1 >> 当b=null的时候,只希望写入a和c >> 当c=null的时候,只希望写入a和b >> >> > >-- > >Best, >Benchao Li

Re: Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
> >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2021-12-25 22:54:19,"郭伟权" 写道: > >> > >> > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输

Re:Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 casel.chen
t;> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2021-12-25 22:54:19,"郭伟权" 写道: >> >> >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 >> > >> >casel

Re: Re: flink sql回撤流sink优化问题

2022-01-06 文章 Benchao Li
sink,另外还是从sink > kafka消费再写入mysql,链路有点长,最好是能在原来作业的基础上在sink前添加一个聚合算子。 > > > > > > > > > > > > > > > > > > 在 2021-12-25 22:54:19,"郭伟权" 写道: > > >结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 >

Re: Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 Caizhi Weng
Hi! 感谢说明。目前 elastic search sink 确实没有这样的功能。一种方式是如其他回复所说对 SQL 进行判断,不过我觉得更好的方式是写一个自定义的 es sink。es sink 的逻辑详见 ElasticsearchDynamicSink 类,可能只需要实现一个自定义的 RowElasticsearchEmitter 就可以了。 RS 于2021年12月31日周五 10:29写道: > Hi, > 你好,是这样的,从kafka消费的话,如果表定义了 a,b,c三个字段,如果kafka的数据少了一个a,那么在flink sql里面,读出来的就是 &g

Re:Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 RS
Hi, 你好,是这样的,从kafka消费的话,如果表定义了 a,b,c三个字段,如果kafka的数据少了一个a,那么在flink sql里面,读出来的就是 a=null,写入ES的话,就会有个a=null 比如从ES查询数据的话 期望 没有a的时候,查询结果类似 {b=1,c=2} 如果写了a=null进去,查询结果类似 {a=null,b=1,c=2} 这样结果就和期望的不一样了,所以期望是Flink SQL insert的时候 ,不写数值为null字段 在 2021-12-31 10:15:41,"Caizhi Weng" 写道: >Hi! >

Re: Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 Caizhi Weng
>可以使用case when试一下 > >在 2021-12-29 16:40:39,"RS" 写道: > >>Hi, > >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? > >> > >> > >>比如:源数据有3个字段,a,b,c > >>insert into table2 > >>select > >>a,b,c > >>from table1 > >>当b=null的时候,只希望写入a和c > >>当c=null的时候,只希望写入a和b > >> >

Re:Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-30 文章 RS
有10~20个字段,这样一个个写,手都敲断了,还有其他的方式吗?或者如何开发代码适配到SQL? 在 2021-12-30 11:36:21,"Xuyang" 写道: >可以使用case when试一下 >在 2021-12-29 16:40:39,"RS" 写道: >>Hi, >>使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? >> >> >>比如:源数据有3个字段,a,b,c >>inser

Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Xuyang
可以使用case when试一下 在 2021-12-29 16:40:39,"RS" 写道: >Hi, >使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? > > >比如:源数据有3个字段,a,b,c >insert into table2 >select >a,b,c >from table1 >当b=null的时候,只希望写入a和c >当c=null的时候,只希望写入a和b >

Re:咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 Michael Ran
可以写两个insert 语句,后面用判断分开~。~ 在 2021-12-29 16:40:39,"RS" 写道: >Hi, >使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? > > >比如:源数据有3个字段,a,b,c >insert into table2 >select >a,b,c >from table1 >当b=null的时候,只希望写入a和c >当c=null的时候,只希望写入a和b >

咨询个Flink SQL的问题,如何去除null的字段

2021-12-29 文章 RS
Hi, 使用Flink SQL消费Kafka写ES,有时候有的字段不存在,不存在的不想写入ES,这种情况怎么处理呢? 比如:源数据有3个字段,a,b,c insert into table2 select a,b,c from table1 当b=null的时候,只希望写入a和c 当c=null的时候,只希望写入a和b

Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
大大减少输出到kafka的消息的数量 > >casel.chen 于2021年12月23日周四 08:15写道: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql >> 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? >> 可以再over window开窗用last_value函数吗?over window支持

Re:Re: flink sql回撤流sink优化问题

2021-12-26 文章 casel.chen
-flush.max-rows >参数 > >[1] : >https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/ > > >Zhiwen Sun > > > >On Thu, Dec 23, 2021 at 8:15 AM casel.chen wrote: > >> flink sql中aggregate without >> window产生的统计回撤流sink输出如果不做优化的

Re: flink sql回撤流sink优化问题

2021-12-25 文章 Zhiwen Sun
: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? > > > 例如有下面binlog cdc购买数据(订单购买金额会更新): > >

Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 casel.chen 于2021年12月23日周四 08:15写道: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再ov

Re: flink sql回撤流sink优化问题

2021-12-25 文章 郭伟权
结果输出到upsert-kafka,1.13版本,upsert-kafka有两个参数可以在flink里面buffer,可以大大减少输出到kafka的消息的数量 casel.chen 于2021年12月23日周四 08:15写道: > flink sql中aggregate without > window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql > 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? > 可以再ov

Flink SQL Calcite 解析出错

2021-12-24 文章 Pinjie Huang
我的原SQL: CREATE TABLE consumer_session_created ( consumer ROW (consumerUuid STRING), clientIp STRING, deviceId STRING, eventInfo ROW < eventTime BIGINT >, ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime / 1000, '-MM-dd HH:mm:ss')), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH (

Re: Flink SQL DECIMAL精度问题

2021-12-23 文章 Caizhi Weng
Hi! 当时应该是参考了其他系统(比如 sql server)的精度范围,SQL 标准里好像确实没有提到。 目前如果需要更高精度,可以考虑先用 string 存储,并通过 udf 把 string 转成 java big decimal,运算完之后再变回 string。 Michael Ran 于2021年12月24日周五 10:59写道: > clickhouse 还提供 Decimal64 Decimal128 ,我也想知道38 这个是什么数据库的标准吗? > 在 2021-12-23 19:58:24,"Ada Wong" 写道: >

Flink SQL DECIMAL精度问题

2021-12-23 文章 Ada Wong
最大精度为38,这个是有什么说法吗,为什么不是1000。如果我需要更高精度的DECIMAL我改怎么做?例如我需要DECIMAL(50, 18)

Re: 请教flink sql作业链路延迟监控如何实现

2021-12-22 文章 刘建刚
,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的 > > > > > > > > > 在 2021-12-23 08:36:33,"casel.chen" 写道: > >想问一下flink sql作业链路延迟监控如何实现? > >我们的flink > sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储 > >

Re:请教flink sql作业链路延迟监控如何实现

2021-12-22 文章 RS
我是直接监控kafka的lag,如果lag数值较大或持续上升,肯定就有延迟了。收到告警后,再查看下plan,有个busy指标,红色的节点就是有问题的 在 2021-12-23 08:36:33,"casel.chen" 写道: >想问一下flink sql作业链路延迟监控如何实现? >我们的flink >sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储 >想监控如下三种延迟,目前有什么办法实现吗?会有相应的metric

请教flink sql作业链路延迟监控如何实现

2021-12-22 文章 casel.chen
想问一下flink sql作业链路延迟监控如何实现? 我们的flink sql作业基本上都是上游接kafka,下游sink到es/hbase/kafka/mongodb/redis/clickhouse/doris这些存储 想监控如下三种延迟,目前有什么办法实现吗?会有相应的metrics暴露出来吗?目前我们在用的flink版本是1.13.2 1. 端到端的延迟 2. kafka本身的延迟 3. flink处理的延迟

flink sql回撤流sink优化问题

2021-12-22 文章 casel.chen
flink sql中aggregate without window产生的统计回撤流sink输出如果不做优化的话会随着source数据来一条就输出一条,这样会导致下游写入的压力会很大,例如mysql 请问这种统计回撤流sink输出优化的话要怎么cache起来取相同key的最后一条数据sink到下游? 可以再over window开窗用last_value函数吗?over window支持作用在回撤流上吗? 例如有下面binlog cdc购买数据(订单购买金额会更新): orderid. categorydt

Re: 双流窗口内join用flink sql实现的语法是什么?

2021-12-16 文章 Caizhi Weng
Hi! 从 Flink 1.14 开始,Flink SQL 支持 window join [1]。 [1] https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/table/sql/queries/window-join/ casel.chen 于2021年12月17日周五 08:47写道: > 每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么? > 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13

双流窗口内join用flink sql实现的语法是什么?

2021-12-16 文章 casel.chen
每隔5分钟join来自两条流的数据,用flink sql实现的写法是什么? 需要先join再窗口计算还是可以直接窗口内join? flink版本是1.13

Re: Flink SQL 有办法access State吗

2021-12-14 文章 Caizhi Weng
] https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/table/functions/udfs/#%E8%81%9A%E5%90%88%E5%87%BD%E6%95%B0 Pinjie Huang 于2021年12月14日周二 15:48写道: > 之前的DataStream API 我们可以通过State进行一些复杂的逻辑。比如所有message的某个field的最大值。Flink > SQL有类似的方法吗? >

Flink SQL 有办法access State吗

2021-12-13 文章 Pinjie Huang
之前的DataStream API 我们可以通过State进行一些复杂的逻辑。比如所有message的某个field的最大值。Flink SQL有类似的方法吗?

Re: Re: flink sql支持细粒度的状态配置

2021-12-09 文章 Yun Tang
Hi, 如果你们可以自己实现一套SQL语句到jobgraph的预编译转换IDE,然后在IDE中可以手动配置jobgraph每个算子的配置,应该是可以达到你们的目的 (可能还需要结合细粒度调度模式)。 祝好 唐云 From: gygz...@163.com Sent: Thursday, December 9, 2021 16:14 To: user-zh Subject: 回复: Re: flink sql支持细粒度的状态配置 Hi Yun Tang 感谢你的回复,我们在调研的过程中也发现,正如你所说的生成的

回复: Re: flink sql支持细粒度的状态配置

2021-12-09 文章 gygz...@163.com
TTL的配置再调用每个execNode转换成operator的方法,来做到Transformation级别的TTL控制,这个配置开放给平台的用户,通过Transformation的id做识别,是否能给一些建议 gygz...@163.com 发件人: Yun Tang 发送时间: 2021-12-09 10:57 收件人: user-zh 主题: Re: flink sql支持细粒度的状态配置 Hi 你好, 我认为这是一个很好的需求,对于data stream以及python API来说,state TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于

Re: flink sql支持细粒度的状态配置

2021-12-08 文章 Yun Tang
Hi 你好, 我认为这是一个很好的需求,对于data stream以及python API来说,state TTL都是通过API逐个配置的,你的需求就可以直接满足。但是对于SQL来说,由于相同的SQL语句,不同优化器其生成的执行plan可能会差异很大,很难对某个operator内的state进行TTL进行配置,可能一种方式是增加一些SQL的优化hint,对于你示例中的join语句和groupBy 的count语句配以不同的TTL,但是目前Flink SQL尚未支持该功能。 祝好 唐云 From: gygz

flink sql支持细粒度的状态配置

2021-12-07 文章 gygz...@163.com
Hi all 在我们生产中发现,如果在sql中配置状态的TTL会导致这个 ttl时间全局生效 如果我存在一个如下sql select count(1),region from (select * from A join B on a.uid = b.uid) group by region 如果我配置一个全局的TTL会导致count这个GroupAggFunction的状态被淘汰掉,比如说一天以后累计就被清零 如果不配置,又会导致Regular join的状态增大 这是其中一个场景,这里只是举一个例子 主要是想询问针对

Re:Re:Re:Re: flink sql collect函数使用问题

2021-12-05 文章 casel.chen
需求应该其他人也会遇到吧? >>功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。 >> 输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身 >> >> >>目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? >> >> >> >> >> >&

Re:Re:Re: flink sql collect函数使用问题

2021-12-03 文章 RS
怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? > > > > > > > > > > > > > > >在 2021-12-02 09:58:28,"cyril cui" 写道: >>af里acc为个list,merge的时候合并,输出的时候 list拼成string即可 >> >>casel.chen 于2021年12月2日周四 上午9:46写道: >> >>> 使用场景如下

flink sql怎样获取到Multiset类型的keys和values?

2021-12-02 文章 casel.chen
我通过flink sql的collect函数获取到Multiset类型,即Map类型,现在只想获取到所有的Kt集合,请问要怎么用flink sql表达?

Re:Re: flink sql collect函数使用问题

2021-12-02 文章 casel.chen
可我要的最终结果不是string,最好是通用的Row类型,这样的话下次聚合其他维度就不用重复开发UDF了。 类似我这样的需求应该其他人也会遇到吧? 功能:collect出一个Multiset即map,key是数据本身,value是数据出现的次数,可以按出现次数排序等。 输出可以是去重或不去重的Array(按出现次数排序或不排序),也可以就是map本身 目前collect函数可以输出一个Multiset即map,但要怎么按value即出现次数排序并只输出排序后的keyset,用flink sql要怎么写呢? 在 2021-12-02 09

Re: flink sql collect函数使用问题

2021-12-01 文章 cyril cui
af里acc为个list,merge的时候合并,输出的时候 list拼成string即可 casel.chen 于2021年12月2日周四 上午9:46写道: > 使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group > by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? > 如果不能的话要怎么写UDAF,有例子参考吗?谢谢! > > kafka源表: > 班级 学号 姓名 年龄 >

flink sql collect函数使用问题

2021-12-01 文章 casel.chen
使用场景如下,将kafka源表通过flink sql处理成mongodb汇表存入。按照班级进行group by,输出对应班级所有的学生数据集合。请问用flink sql自带的collect函数能实现吗?如果能的话要怎么写sql? 如果不能的话要怎么写UDAF,有例子参考吗?谢谢! kafka源表: 班级 学号 姓名 年龄 1 20001张三 15 2 20011李四 16 1 20002王五 16 2 20012吴六

flink sql group by后收集数据集合问题

2021-12-01 文章 casel.chen
业务中使用flink sql group by操作后想收集每个分组下所有的数据,如下示例: kafka源表: 班级 学号 姓名 年龄 1 20001张三 15 2 20011李四 16 1 20002王五 16 2 20012吴六 15 create table source_table ( class_no: INT, student_no: INT, name: STRING, age

Re: flink sql lookup join中维表不可以是视图吗?

2021-12-01 文章 Tony Wei
info 有 2 筆數據,原先的 SQL 會得到 1 筆 left join 沒成功的數據,上面提供的 SQL 則會輸出 2 筆。 casel.chen 於 2021年12月1日 週三 下午6:33寫道: > lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup > join关联视图,会抛 > Temporal Table Join requires primary key in versioned table, but no > primary key can be found.

flink sql lookup join中维表不可以是视图吗?

2021-12-01 文章 casel.chen
lookup join用的维表需要从两张mysql表做关联后得到,因此创建了一个视图。但发现flink sql不支持lookup join关联视图,会抛 Temporal Table Join requires primary key in versioned table, but no primary key can be found. 请问这种情况要怎么解决? CREATE VIEW query_mer_view (mer_cust_id, update_time) AS SELECT a.mer_cust_id, k.update_time FROM

Re:Re: flink sql group by后收集数据问题

2021-12-01 文章 casel.chen
ink-docs-master/zh/docs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc > >casel.chen 于2021年12月1日周三 上午7:56写道: > >> 业务中使用flink sql group by操作后想收集所有的数据,如下示例: >> >> >> kafka源表: >> 班级 学号 姓名 年龄 >> 1

Re: Flink sql jdbc Partitioned Scan timestamp不生效

2021-11-30 文章 天下五帝东
年12月1日周三 上午9:23写道: > >> Hi: >> 我在使用flink sql jdbc connector测试partitioned scan功能,发现指定 >> scan.partition.column 为timestamp类型时,scan.partition.lower-bound >> >> 和scan.partition.upper-bound指定具体的值后,没有读取到相关数据,哪位大佬帮忙解答下 >> >> 谢谢 >> >>

Re:Re: flink sql group by后收集数据问题

2021-11-30 文章 casel.chen
ocs/dev/table/functions/udfs/#%e8%87%aa%e5%8a%a8%e7%b1%bb%e5%9e%8b%e6%8e%a8%e5%af%bc > >casel.chen 于2021年12月1日周三 上午7:56写道: > >> 业务中使用flink sql group by操作后想收集所有的数据,如下示例: >> >> >> kafka源表: >> 班级 学号 姓名 年龄 >> 1 20001张三 15

Re:Re: flink sql中如何使用异步io关联维表?

2021-11-30 文章 Michael Ran
Hello,咨询一下,目前connector-hbase 的异步join,是能保证顺序的吗? 在 2021-03-05 11:10:41,"Leonard Xu" 写道: >目前Flink SQL 中的connector都没实现异步io关联维表,接口是上已经支持了的,如果是自己实现可以参考[1] >另外,HBase connector 社区有人正在支持异步io关联维表,预计1.13可以使用[2] > >祝好 > >[1]https://github.com/apache/flink/blob/73cdd3d0d9f6a807b3

<    1   2   3   4   5   6   7   8   9   10   >