Re:关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 文章 李轲
报错信息: Exception in thread "main" org.apache.flink.table.client.SqlClientException: Unexpected exception. This is a bug. Please consider filing an issue. at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) Caused by: org.apache.flink.table.api.TableException: Unsupported conversion

关于flink sql往postgres写数据遇到的timestamp问题

2020-12-08 文章 李轲
项目需求要向 postgres 中插入数据,用 catalog 之后,插入数据貌似需要和数据库表定义完全一致,而且没找到只插入部分字段的写法 在时间转 TIMESTAMP(6) WITH LOCAL TIME ZONE 时报了错,这个格式是 postgres 中的时间戳定义 select cast(localtimestamp as TIMESTAMP(6) WITH LOCAL TIME ZONE); 有没有什么转换方法?或者只插入部分数据的方法?

Re: flink sql 任务滑动窗口失效

2020-12-08 文章 xushanshan
-- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql ddl????????????java.lang.IncompatibleClassChangeError: Implementing class

2020-12-08 文章 bigdata
??flink1.10.1,pom??

flink sql 任务滑动窗口失效

2020-12-07 文章 xushanshan
业务场景: 滑动窗口大小5分钟,滑动频率1分钟,使用事件事件做watermark,发现滑动窗口的计算结果输出包含delete状态的数据且窗口计算的触发频率不是配置的1分钟,问题的原因是什么? -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 Leonard Xu
Hi, 你是不是没有订阅flink的用户邮件列表,所以有些邮件你看不到。 你可以发送任意内容的邮件到user-zh-subscr...@flink.apache.org 即可订阅用户邮件列表,订阅后邮件列表里大家的提问和回答你都可以看见了。 [1] https://flink.apache.org/zh/community.html > 在 2020年12月7日,16:50,Leonard Xu 写道: > > Hi, > >> 在 2020年12月7日,16:41,爱成绕指柔

flink sql????????UpsertStreamTableSink

2020-12-07 文章 ??????????
?? flink1.10??hbase Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. s""" |INSERT INTO ${databaseName}.response_time_sink

flink sql????????UpsertStreamTableSink

2020-12-07 文章 ??????????
?? flink1.10??hbase Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. s""" |INSERT INTO ${databaseName}.response_time_sink

flink sql????????UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 ??????????
?? flink1.10??hbase Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. s""" |INSERT INTO ${databaseName}.response_time_sink

Re: flink sql实时计算UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 Leonard Xu
Hi, > 在 2020年12月7日,16:41,爱成绕指柔 <1194803...@qq.com> 写道: > > Exception in thread "main" org.apache.flink.table.api.TableException: > UpsertStreamTableSink requires that Table has a full primary keys if it is > updated. 这个错误是在query 没法推断出主键,而 hbase sink 是一个upsert sink, 需要PK来实现upsert语义。

flink sql????????UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 ??????????
?? flink1.10??hbase Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. s""" |INSERT INTO ${databaseName}.response_time_sink

flink sql????????UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 ??????????
?? flink1.10??hbase Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated. at

flink sql????????UpsertStreamTableSink requires that Table has a full primary keys if it is updated

2020-12-07 文章 ??????????
?? flink1.10??hbasehttp://apache-flink.147419.n8.nabble.com/; Exception in thread "main" org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is

flink sql ?????????????????? 1.10.1??sql??????????

2020-12-06 文章 ??????
??ab ??flink sql?? flink1.10.1

Re: flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-12-05 文章 奚焘
找到原因了,host文件里localhost的ip不对 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql实时计算分位数如何实现

2020-12-03 文章 Jark Wu
可以看下UDAF的文档: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/udfs.html#aggregate-functions On Thu, 3 Dec 2020 at 12:06, 爱成绕指柔 <1194803...@qq.com> wrote: > 你好: > 目前flink sql实时计算中没有percentile函数吗?如果没有,如何实现这一功能。 > 期待你的答复,谢谢!

Re: Flink SQL共享source 问题

2020-12-03 文章 Jark Wu
1. 是不是共享了 source,看下 web ui 中的拓扑图就知道了 2. 追数据的时候,或者下游消费速度不一的时候,分区之间消费不均衡是很正常的。 3. 你可以调大 sink 的并发,以及增加 buffer size 来缓解这个问题。 Best, Jark On Wed, 2 Dec 2020 at 19:22, zz wrote: > hi各位: > 目前我有一个任务,source table是读取一个topic生成的,但是有6个sink,使用了多条insert > 语句输出到同一张mysql表中,按照我的理解,这些insert语句 > 应该都是共享这个source

Re: flink sql 1.11.1 貌似出现bug

2020-12-03 文章 Jark Wu
看样子是提交作业超时失败了,请确认 1. flink cluster 已经起来了 2. sql client 的环境与 flink cluster 环境连通 3. sql-client-defaults.yaml 中配置了正确的 gateway-address 地址 (如果是本地 cluster,则不用配置) Best, Jark On Wed, 2 Dec 2020 at 14:12, zzy wrote: > 遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql > > > sql语句如下: &g

Re: flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-12-03 文章 Jark Wu
你本地 ping 一下 localhost 看看能不能 ping 通。 另外看看本地有没有开网络代理,有的话关掉试试。 Best, Jark On Tue, 1 Dec 2020 at 09:38, 奚焘 <759928...@qq.com> wrote: > 本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello > World';报错 > Flink SQL> SELECT 'Hello World'; > [ERROR] Could not execute

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了: https://issues.apache.org/jira/browse/FLINK-20186 > 在 2020年12月3日,20:08,Wei Zhong 写道: > > Hi, > >

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 Wei Zhong
Hi, 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了: List result = new LinkedList<>(); ServiceLoader .load(Factory.class, Thread.currentThread().getContextClassLoader()) .iterator() .forEachRemaining(result::add); List jdbcResult =

Re:用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 文章 hailongwang
Hi, 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 Connector? Best, Hailong 在 2020-12-03 14:44:18,"xuzh" 写道: >错误: > > >Caused by: org.apache.flink.table.api.ValidationException: Multiple factories >for identifier 'jdbc' that implement

Re: Flink SQL使用Tumble窗口函数报NoSuchMethodError functions/AggregateFunction 异常

2020-12-03 文章 JasonLee
hi 从报错信息看应该jar包冲突了,可以贴一下相关的依赖包吗 - Best Wishes JasonLee -- Sent from: http://apache-flink.147419.n8.nabble.com/

??????????flink sql ???? Multiple factories for identifier 'jdbc' that implement

2020-12-02 文章 xuzh
?? Caused by: org.apache.flink.table.api.ValidationException: Multiple factories for identifier 'jdbc' that implement 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the classpath DynamicTableSinkFactory ?? package

flink sql??????????????????????

2020-12-02 文章 ??????????
?? flink sql??percentile

Flink SQL????source ????

2020-12-02 文章 zz
hi?? ??source table??topic??6??sinkinsert mysqlinsert source tablekafkakafka

flink sql 1.11.1

2020-12-01 文章 zzy
遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql sql语句如下: CREATE TABLE sls_log_sz_itsp ( request STRING, http_bundleId STRING, upstream_addr STRING, http_appid STRING, bodyUserId STRING, http_sequence STRING, http_version STRING, response_body STRING, uri STRING, bytes_sent STRING, http_userId

flink sql 1.11.1 貌似出现bug

2020-12-01 文章 zzy
遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql sql语句如下: CREATE TABLE sls_log_sz_itsp ( request STRING, http_bundleId STRING, upstream_addr STRING, http_appid STRING, bodyUserId STRING, http_sequence STRING, http_version STRING, response_body STRING, uri STRING, bytes_sent

flink sql 1.11.1 貌似出现bug

2020-12-01 文章 zzy
遇到的问题如下, flink版本1.11.1,sql client 中使用flink sql sql语句如下: CREATE TABLE sls_log_sz_itsp ( request STRING, http_bundleId STRING, upstream_addr STRING, http_appid STRING, bodyUserId STRING, http_sequence STRING, http_version STRING, response_body STRING, uri STRING, bytes_sent

Re:Re:大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

2020-12-01 文章 18293503878
好的,十分感谢 在 2020-12-01 23:35:05,"hailongwang" <18868816...@163.com> 写道: >Hi, > Collect 函数返回 Multiset 类型 ,可以使用 Map 试试 > > >Best, >Hailong > >在 2020-12-01 18:03:15,"chegg_work" 写道: >>大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

Re: 【Flink SQL】无法启动 env.yaml

2020-12-01 文章 李轲
,因为已经支持了用DDL这种纯SQL的方式定义表。 > > 推荐你可以拉起sql-client后,用DDL的方式建表 > > 祝好 > Leonard > > > >> 在 2020年12月1日,21:43,李轲 写道: >> >> 在服务器上试用sql-client时,启动指令如下: >> >> ./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d >> /data_gas/flink/

Re:使用flink-sql解析debezium采集的mysql timestamp字段报错

2020-12-01 文章 hailongwang
Hi, 引用 Jark 对邮件列表中另一个相关的问题的回答,详情可查看[1]。 希望对你有帮助。 [1] http://apache-flink.147419.n8.nabble.com/flink-sql-td8884.html#a Best, Hailong 在 2020-12-01 10:09:21,"王羽凡" 写道: >flink-sql-client执行建表: > >CREATE TABLE source_xxx ( > id INT, > ctime TIMESTAMP >) WITH ( >

Re:大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

2020-12-01 文章 hailongwang
Hi, Collect 函数返回 Multiset 类型 ,可以使用 Map 试试 Best, Hailong 在 2020-12-01 18:03:15,"chegg_work" 写道: >大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

Re: 【Flink SQL】无法启动 env.yaml

2020-12-01 文章 Leonard Xu
写道: > > 在服务器上试用sql-client时,启动指令如下: > > ./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d > /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e > /root/flink-sql-client/sql-client-demo.yml > > 配置如下: > > # 定义表 > tables: > - nam

【Flink SQL】无法启动 env.yaml

2020-12-01 文章 李轲
在服务器上试用sql-client时,启动指令如下: ./sql-client.sh embedded -l /root/flink-sql-client/libs/ -d /data_gas/flink/flink-1.11.2/conf/sql-client-defaults.yaml -e /root/flink-sql-client/sql-client-demo.yml 配置如下: # 定义表 tables: - name: SourceTable type: source-table update-mode: append

大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

2020-12-01 文章 chegg_work
大家有用Flink SQL中的collect函数执行的结果用DataStream后,用什么数据类型匹配该字段的结果,数据类型的类名和需要的依赖是什么?

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-30 文章 kandy.wang
@Jianzhi Zhang 嗯,是这个原因,感谢 回复。 就是decimal的精度问题 在 2020-12-01 13:24:23,"Jianzhi Zhang" 写道: >是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > >> 2020年11月19日 下午10:41,kandy.wang 写道: >> >> --mysql表 >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >> `id` INT

Re: flink sql cdc sum 结果出现NULL

2020-11-30 文章 Jianzhi Zhang
是不是你的decimal字段长度太短了,计算结果超出了精度范围导致null的出现 > 2020年11月19日 下午10:41,kandy.wang 写道: > > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > `id` INT UNSIGNED AUTO_INCREMENT, > `spu_id` BIGINT NOT NULL, > `leaving_price` DECIMAL(10, 5) > PRIMARY KEY ( `id` ),

使用flink-sql解析debezium采集的mysql timestamp字段报错

2020-11-30 文章 王羽凡
flink-sql-client执行建表: CREATE TABLE source_xxx ( id INT, ctime TIMESTAMP ) WITH ( 'connector' = 'kafka', 'topic' = 'xxx', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'debezium-json', 'scan.startup.mode' = 'earliest-offset', 'debezium-json.schema-include' = 'false

Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
不需要,设置用户名和密码就行 Best zhisheng HunterXHunter <1356469...@qq.com> 于2020年12月1日周二 上午9:46写道: > 你说的是es的 xpack 认证吗,需要你载入certificate文件是吗 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 HunterXHunter
你说的是es的 xpack 认证吗,需要你载入certificate文件是吗 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 zhisheng
1.12 支持了,参考 https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/elasticsearch.html#username Kyle Zhang 于2020年12月1日周二 上午9:35写道: > Hi,你说的是这个问题么 > > https://issues.apache.org/jira/browse/FLINK-16788 > > On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com wrote: > > >

flink sql client 报错java.net.NoRouteToHostException: 没有到主机的路由

2020-11-30 文章 奚焘
本人刚学习flink ,下载解压了flink,启动./sql-client.sh embedded ,输入SELECT 'Hello World';报错 Flink SQL> SELECT 'Hello World'; [ERROR] Could not execute SQL statement. Reason: java.net.NoRouteToHostException: 没有到主机的路由 -- Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 Kyle Zhang
Hi,你说的是这个问题么 https://issues.apache.org/jira/browse/FLINK-16788 On Mon, Nov 30, 2020 at 7:23 PM cljb...@163.com wrote: > 看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? > 除了用api之外。 > > 感谢! > > > > cljb...@163.com >

flink sql es写入时,用户名密码认证不支持

2020-11-30 文章 cljb...@163.com
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? 除了用api之外。 感谢! cljb...@163.com

flink sql cdc做计算后, elastic search connector 多并发sink 会有问题,偶现!数据量较大,目前还没较好复现。请问有人是否也遇到过类似的问题。

2020-11-30 文章 jindy_liu
flink 版本: 1.11.2 * Caused by: [rt_caliber_1000/Rn_SXw45Qk2FY8ujAMpmmQ][[rt_caliber_1000][1]] ElasticsearchException[Elasticsearch exception [type=version_conflict_engine_exception, reason=[64_40108_0_1]: version conflict, required seqNo [95958], primary term [1]. current document has seqNo

Re: Flink SQL 是否存在类似MySQL的group_concat函数

2020-11-29 文章 zhengzhongni
:41, zhengzhongni wrote: > 各位社区大佬, > 您好! > > > 不知Flink SQL中是否存在类似MySQL的group_concat函数的功能: > 例如: > 数据: > +--+---+ > | Id | Name | > +--+---+ > | 10 | Larry | > | 11 | Mike | > | 12 | John | > | 10 | Elon | > | 10 | Bob | >

Re: Flink SQL 是否存在类似MySQL的group_concat函数

2020-11-29 文章 Jark Wu
I think you are looking for LISTAGG [1] which is more SQL standard compliant. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/functions/systemFunctions.html On Mon, 30 Nov 2020 at 11:41, zhengzhongni wrote: > 各位社区大佬, > 您好! > > > 不知Flink S

Flink SQL 是否存在类似MySQL的group_concat函数

2020-11-29 文章 zhengzhongni
各位社区大佬, 您好! 不知Flink SQL中是否存在类似MySQL的group_concat函数的功能: 例如: 数据: +--+---+ | Id | Name | +--+---+ | 10 | Larry | | 11 | Mike | | 12 | John | | 10 | Elon | | 10 | Bob | | 11 | Sam | +--+———+ 执行SQL :select Id,group_concat(Name SEPARATOR ',') as resultName

回复:FLINK SQL 消费kafka消息乱序问题

2020-11-29 文章 smq
可能是有退款呢,我也做过类似的统计 ---原始邮件--- 发件人: "user-zh-return-9443-374060171=qq.com"

Re: FLINK SQL 消费kafka消息乱序问题

2020-11-29 文章 359502...@qq.com
确定是消息乱序吗?是不是计算的数据类型有问题? 发自我的iPhone > 在 2020年11月27日,下午5:44,bulterman <15618338...@163.com> 写道: > > Hi All, > kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况 > 用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

Re: Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 文章 cljb...@163.com
感谢回复! 刚才找到问题了,从maven官网拷贝过来的 pom依赖, scope被设置成 test了。。。改成compile就好了 cljb...@163.com 发件人: Jark Wu 发送时间: 2020-11-27 19:14 收件人: user-zh 主题: Re: flink sql cdc 写数据到mysql,找不到相关的类 估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ? On Fri, 27 Nov 2020 at 19:03, cljb...@163

Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 文章 Jark Wu
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ? On Fri, 27 Nov 2020 at 19:03, cljb...@163.com wrote: > 相关的依赖以及添加,不知道如下问题是如何导致,求解! > 已添加的依赖有: > flink-connector-mysql-cdc > flink-format-changelog-json > flink-json > > 报错信息如下: > > java.util.ServiceConfigurationError: >

flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 文章 cljb...@163.com
相关的依赖以及添加,不知道如下问题是如何导致,求解! 已添加的依赖有: flink-connector-mysql-cdc flink-format-changelog-json flink-json 报错信息如下: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not be instantiated at

FLINK SQL 消费kafka消息乱序问题

2020-11-27 文章 bulterman
Hi All, kafka消息里有一个随时间递增的“成交额”字段,写了一个UDAF统计累加当前成交额与上一条数据的成交额的差值,发现差值有出现负数的情况 用工具看topic里的消息是有序的,分区数为1。flink版本1.11.2

Re: flink sql cdc 如果只处理一次全量数据问题

2020-11-26 文章 俞剑波
你说的有变化是后续的数据库进行增删改操作吗,如果是的话你从checkpoint启动就好了啊 cljb...@163.com 于2020年11月27日周五 上午11:10写道: > 之前一直使用streaming api,这两天开始使用sql。 > 有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。 > 那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢? > > > cdc里面有进行state保存消费过的changelog的位置吗?这样我重

flink sql cdc 如果只处理一次全量数据问题

2020-11-26 文章 cljb...@163.com
之前一直使用streaming api,这两天开始使用sql。 有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。 那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢? cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了? 感谢! cljb...@163.com

flink sql??????????????????????

2020-11-26 文章 ??????????
?? flink sql??percentile

使用per-job部署成功的flink sql应用但是用applicationMode部署失败,提交到yarn上不到2秒就死掉,并且读取不到日志

2020-11-25 文章 shimin huang
``` 14:56:44.536 [main] ERROR org.apache.flink.client.cli.CliFrontend - Error while running the command. org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn Application Cluster at

?????? flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi DEBUG??mysql ---- ??: "user-zh"

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Kyle Zhang
上面写错了,table要转成stream再打印 On Thu, Nov 26, 2020 at 11:46 AM Kyle Zhang wrote: > 调用executeSql,应该输出到另一张表里,比如printTable才能打印。 > 要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来; > > On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu wrote: > >> Hi >> >>

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Kyle Zhang
调用executeSql,应该输出到另一张表里,比如printTable才能打印。 要不就是bsTableEnv.sqlQuery("select * from meson_budget_data").print()打印出来; On Thu, Nov 26, 2020 at 9:54 AM Leonard Xu wrote: > Hi > > 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。 > > > 在 2020年11月25日,18:42,冯草纸 写道: > > > >

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 Leonard Xu
Hi 调用了executeSql,作业就已经执行了,不用再调用下面的这个执行方法,看你已经配置了cp,确认下mysql的参数是否正确。 > 在 2020年11月25日,18:42,冯草纸 写道: > > env.execute("sql test"); > // bsTableEnv.execute("sql test");

?????? flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi, ??execute?? StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings settings = EnvironmentSettings.newInstance().useAnyPlanner().inStreamingMode().build(); StreamTableEnvironment bsTableEnv =

??????flink sql ????mysql ??????????

2020-11-25 文章 ??????
mysql ---- ??: "??"

Re: flink sql 连接mysql 无数据输出

2020-11-25 文章 caozhen
是不是没有加这一行代码,tableEnv.execute("test"); AlfredFeng wrote > Hi All, > 我在Idea里用flink-jdbc-connector连接mysql, > 建完表后执行env.executeSql("select * from my_table").print()方法,只打印了表头,没有数据是什么原因? > flink版本1.11.2 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql ????mysql ??????????

2020-11-25 文章 ??????
Hi All, Ideaflink-jdbc-connectormysql, env.executeSql("select * from my_table").print() flink1.11.2

Re: flink sql时间戳字段类型转换问题

2020-11-24 文章 Jark Wu
你可以用这篇文章中的 docker: https://flink.apache.org/2020/07/28/flink-sql-demo-building-e2e-streaming-application.html https://raw.githubusercontent.com/wuchong/flink-sql-demo/v1.11-EN/docker-compose.yml 这个容器里面的 ts 数据格式是 SQL 格式的。 1. 像上述时间格式字段在Flink SQL中应该解析成什么类型? TIMESTAMP WITH LOCAL TIME ZONE, 1.12 的

flink sql时间戳字段类型转换问题

2020-11-24 文章 陈帅
数据源来自Jark项目 https://github.com/wuchong/flink-sql-submit 中的kafka消息,里面user_behavior消息例如 {"user_id": "470572", "item_id":"3760258", "category_id": "1299190", "behavior": "pv", "ts": "2017-11-26T01:00:01Z&

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
好的,感谢Benchao的解答~ Benchao Li 于2020年11月24日周二 下午7:49写道: > 从这一行代码看出来的: > > https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107 > > 现在社区还没有正式支持ProtoBuf

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
从这一行代码看出来的: https://github.com/yangyichao-mango/flink-protobuf/blob/616051d74d0973136f931189fd29bd78c0e5/src/main/java/flink/formats/protobuf/ProtobufRowDeserializationSchema.java#L107 现在社区还没有正式支持ProtoBuf Format,不过已经有相关issue和讨论了[1] [1] https://issues.apache.org/jira/browse/FLINK-18202

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
这是从哪看出来的呢 求指点,另外如果想用DDL写的schema 应该怎么做呢? Benchao Li 于2020年11月24日周二 下午4:33写道: > 看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 > > zilong xiao 于2020年11月24日周二 下午4:13写道: > > > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > > https://github.com/yangyichao-mango/flink-protobuf > > > > Benchao Li

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 Benchao Li
看起来这个format是用的自动推导schema,而不是用的DDL写的schema。 zilong xiao 于2020年11月24日周二 下午4:13写道: > 用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, > https://github.com/yangyichao-mango/flink-protobuf > > Benchao Li 于2020年11月24日周二 下午3:43写道: > > > 看起来你的DDL写的没有什么问题。 > > > > 你用的是哪个Flink版本呢? > > 此外就是可以发下更完整的异常栈么? > > > >

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-24 文章 zilong xiao
用的Flink1.11 不过是用的别人写的format,估计是这里面有bug吧, https://github.com/yangyichao-mango/flink-protobuf Benchao Li 于2020年11月24日周二 下午3:43写道: > 看起来你的DDL写的没有什么问题。 > > 你用的是哪个Flink版本呢? > 此外就是可以发下更完整的异常栈么? > > zilong xiao 于2020年11月24日周二 下午2:54写道: > > > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > > >

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
看起来你的DDL写的没有什么问题。 你用的是哪个Flink版本呢? 此外就是可以发下更完整的异常栈么? zilong xiao 于2020年11月24日周二 下午2:54写道: > Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ > > Benchao Li 于2020年11月24日周二 下午2:49写道: > > > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > > > [image:

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
Hi Benchao,图片可以看https://imgchr.com/i/DtoGge,期待您的解答~ Benchao Li 于2020年11月24日周二 下午2:49写道: > 你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 > > zilong xiao 于2020年11月24日周二 上午10:49写道: > > > [image: image.png] > > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > > > > > -- > > Best, > Benchao Li >

Re: Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 Benchao Li
你的图片挂了,可以将图片上传到第三方的图床再发出来;或者直接发送文本。 zilong xiao 于2020年11月24日周二 上午10:49写道: > [image: image.png] > 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。 > -- Best, Benchao Li

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
我使用的是release-1.12.0-rc1 Best Jark Wu 于2020年11月24日周二 上午11:42写道: > 看报错像是一个 bug。 请问使用的是哪个版本呢? > 可以去 JIRA issue 提个 issue。 > > Best, > Jark > > On Tue, 24 Nov 2020 at 11:27, jy l wrote: > > > Hi: > > FlinkSQL我在使用时发生一件很诡异的事件。具体如下: > > > > 我的DDL: > > create table if not exists t_order( > > id int

Re: Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 Jark Wu
看报错像是一个 bug。 请问使用的是哪个版本呢? 可以去 JIRA issue 提个 issue。 Best, Jark On Tue, 24 Nov 2020 at 11:27, jy l wrote: > Hi: > FlinkSQL我在使用时发生一件很诡异的事件。具体如下: > > 我的DDL: > create table if not exists t_order( > id int PRIMARY KEY comment '订单id', > timestamps bigint comment '订单创建时间', > orderInformationId string

Flink SQL的灵异事件----查询语句中增加表中的某个字段时就没法正常查询了。

2020-11-23 文章 jy l
Hi: FlinkSQL我在使用时发生一件很诡异的事件。具体如下: 我的DDL: create table if not exists t_order( id int PRIMARY KEY comment '订单id', timestamps bigint comment '订单创建时间', orderInformationId string comment '订单信息ID', userId string comment '用户ID', categoryId int comment '商品类别', productId int comment '商品ID', price

Flink SQL Row里嵌套Array该如何用DDL定义?

2020-11-23 文章 zilong xiao
[image: image.png] 如题,尝试用以下方式定义时会遇到异常,求社区大佬指点正确的打开姿势。

Flink SQL 对延迟数据怎么处理?

2020-11-23 文章 jy l
Hi: 请教一下,FlinkSQL中,我在创建表时设置了watermark并设置了最大延迟,可是还是有数据依旧会迟到晚到,对于这样的数据我们又不想直接丢弃,那这个依旧迟到的数据我该怎么收集?是否有与StreamAPI一样可以将依旧迟到的数据进行分流的方案? 祝好!

flink sql 中是否可以使用 mysql 的存储过程和函数?

2020-11-23 文章 macdoor
需求是这样,mysql中使用 binary(16) 存储 uuid,读取到 flink中需要转换成文本串的uuid,sql是这样 select bin_to_uuid(id, true) as text_uuid from usertable 我尝试使用,报错说 bin_to_uuid 找不到 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? 聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" +

flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据

2020-11-23 文章 dpzhoufengdev
flink sql 通过group by 滑窗计算的结果sink到kafka后有重复数据,每条数据都有两条完全一样的数据。这个是什么原因导致的? 聚合计算的逻辑 Table tableoneHour = tableEnv.sqlQuery( "select appname" + ",productCode" + ",link" +

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的

Re:Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.没有初始的全量数据可能是会有问题的 这个怎么理解,默认情况,就是从kafka group-sets 消费的,怎么才能保证全量数据? 我们这个binlog同步都是增量同步。不会做一次初始化的全量同步。 2.先发的before 后发的after 3. 数据在kafka里,是按照mysql的id主键hash的。是有序的,group key 的所有数据不能保证 都在同 一个 partition 中。由于是按照主键id hash的。 在 2020-11-20 13:25:53,"Jark Wu" 写道: >1. 没有初始的全量数据可能是会有问题的。 > >3. 你的

Re: Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
1. 没有初始的全量数据可能是会有问题的。 3. 你的 format 再解析 update 时,时先发的 before 还是 after? 4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不? On Fri, 20 Nov 2020 at 12:46, kandy.wang wrote: > > > > > > > 1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 > > 2. 没有开启 > > > > > 在 2020-11-20 11:49:44,"Jark Wu" 写道: >

Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id

Re:Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
1.是的。 这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。 2. 没有开启 在 2020-11-20 11:49:44,"Jark Wu" 写道: >实现上应该没什么问题。 > >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? >2. 是否开启 mini-batch了? > >Best, >Jark > >On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > >> hi Jark: >> >> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num

Re: Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了? Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table;

Re:Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
hi Jark: 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before update_after,format逻辑是应该这么写的吧。 在 2020-11-19 23:13:19,"Jark Wu" 写道: >你可以先直接 select * from hive.database.table;

Re: flink sql cdc sum 结果出现NULL

2020-11-19 文章 Jark Wu
你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null 值的,以验证你的自定义 format 没有问题。 Best, Jark On Thu, 19 Nov 2020 at 22:41, kandy.wang wrote: > --mysql表 > CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( >`id` INT UNSIGNED AUTO_INCREMENT, >`spu_id`

flink sql cdc sum 结果出现NULL

2020-11-19 文章 kandy.wang
--mysql表 CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( `id` INT UNSIGNED AUTO_INCREMENT, `spu_id` BIGINT NOT NULL, `leaving_price` DECIMAL(10, 5) PRIMARY KEY ( `id` ), unique key idx_spu_id (spu_id) )ENGINE=InnoDB DEFAULT CHARSET=utf8 --flink表 CREATE

Flink sql-client/sql-gateway 支持multiple insert

2020-11-16 文章 朱广彬
Hi,Community: 目前sql-client和sql-gateway只能支持单条SQL statement,这样就没法利用multiple insert的优化。如下: INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%' INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product LIKE '%Glass%'

Re: Flink sql 无法用!=

2020-11-15 文章 Danny Chan
是的 <> 是 SQL 标准推荐的用法。 jindy_liu <286729...@qq.com> 于2020年11月16日周一 下午2:24写道: > 用<> > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >

Re: Flink sql 无法用!=

2020-11-15 文章 jindy_liu
用<> -- Sent from: http://apache-flink.147419.n8.nabble.com/

Flink sql 无法用!=

2020-11-15 文章 丁浩浩
我想在where条件下用不等号报错,难道flink sql不等号不是!=这个吗? [ERROR] Could not execute SQL statement. Reason: org.apache.calcite.runtime.CalciteException: Bang equal '!=' is not allowed under the current SQL conformance level

??????flink sql hive streaming??????????????????bug

2020-11-15 文章 Excalibur
??hive table read blinkStreamTableEnv.getConfig().getConfiguration().setBoolean("table.dynamic-table-options.enabled", true); Table table=blinkStreamTableEnv.sqlQuery("SELECT * FROM test.table_config /*+ OPTIONS('streaming-source.enable'='true','streaming-source.monitor-interval'

<    5   6   7   8   9   10   11   12   13   14   >