Flink MySQL CDC 注册 schema registry 问题

2022-04-22 文章 casel.chen
Hi, 我想使用 Flink MySQL CDC Connector 以 DataStream 方式消费 MySQL Binlog 输出变更数据到下游kafka topic (1),同时监听database schema change事件,将最新的schema数据输出到下游另一个kafka topic (2),又或者直接注册schema到 confluent / apicurio schema registry,查了一下flink cdc官方文档[1],并没有这方面的信息。请问应该怎么实现呢?有相关文档或例子么?谢谢! [1] https

flink mysql cdc注册confluent schema registry

2022-01-29 文章 casel.chen
我想利用flink mysql cdc输出变更数据到kafka,同时将table schema注册到confluent schema registry,以模拟debezium kafka connect效果[1]。还请指教要如何下手呢?谢谢! [1] https://blog.csdn.net/OldDirverHelpMe/article/details/107881170

Flink mysql CDC 进程正常,但发现数据丢失了

2022-01-17 文章 Fei Han
@all: 大家好,Flink Mysql CDC实时同步数据,发现mysql和下游StarRocks的数据量对不上。 StarRocks用的是primary key模型, 版本: Flink1.13.3 Flink CDC 2.1.1 报错如下: Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records

Flink mysql cdc凌晨同步报错

2022-01-12 文章 Fei Han
@all: Flink mysql cdc凌晨同步报错,流任务都失败了。报错如下: org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3, backoffTimeMS=3) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure

Re: flink mysql cdc同步字段不识别

2022-01-06 文章 Jark Wu
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。 mysql-cdc 没有 jackson json 解析相关的代码。 On Wed, 5 Jan 2022 at 17:09, Fei Han wrote: > > @all: > Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下: > > httpResponseStatus=200 OK} > org.apache.fli

Re: flink mysql cdc 同步数据报错

2022-01-05 文章 Caizhi Weng
Hi! 根本原因是 Caused by: java.io.StreamCorruptedException: unexpected block data,也就是说集群上这个 class 的版本和客户端这个 class 的版本不一致。建议检查集群和客户端的 flink 版本以及 cdc connector 版本是否一致。 Fei Han 于2022年1月5日周三 19:12写道: > @all: > 大家好,在Flink mysql cdc中同步数据时,对接mysql无法同步 > 版本: > flink1.13.3 > flink mysql cdc

flink mysql cdc同步字段不识别

2022-01-05 文章 Fei Han
@all: Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下: httpResponseStatus=200 OK} org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "status" (class org.apache.flink.runtime.rest.messages.ErrorResponseBody),

?????? flink??mysql????

2021-10-26 文章 zya
er-zh" https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows a <806040...@qq.com.invalid ??2021??10??26?? 9:49?? ????????flink??mysqlsinkJdbcDynamicTableSink???

flink??mysql????

2021-10-25 文章 a
flink??mysqlsinkJdbcDynamicTableSink??checkpoint??1.??flush??mysql?? 2.??mysql??qps??sum??1000??/s

mysql主从切换导致通过flink mysql-cdc消费binlog 点位出错

2021-06-03 文章 董建
由于各种原因,dba进行了数据库主从切换。 目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。 flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印 org.apache.kafka.connect.errors.ConnectException: The connector is trying to read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004 and binlog file 'mysql-bin

Re: flink mysql cdc支持mysql的json格式吗?

2021-05-23 文章 hk__lrzy
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-basic-types 支持 -- Sent from: http://apache-flink.147419.n8.nabble.com/

flink mysql cdc支持mysql的json格式吗?

2021-05-18 文章 董建
flink mysql cdc支持mysql的json格式吗?

flink mysql cdc????

2021-04-22 文章 ????
??flink mysql cdc 1.flink mysql cdc??mysql??binlog??mysql

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-02 文章 陈帅
; 你会发现文件生成了但是没有 _SUCCESS文件; > 但是这样指定也不行?? > tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8)); > > 它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据 > > > > hdxg1101300...@163.com > > 发件人: chengyanan1...@foxmail.com > 发送时间: 2020-11-02 13:37 > 收件人: user-zh > 主题: Re: Re

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-02 文章 hdxg1101300...@163.com
mysql cdc + hive streaming疑问 你好! 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? 发件人: Rui Li 发送时间: 2020-11-02 10:38 收件人: user-zh 抄送: Jark Wu 主题: Re: flink mysql cdc + hive streaming疑问 Hi, 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file

Re: Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 chengyanan1...@foxmail.com
你好! 看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢? 发件人: Rui Li 发送时间: 2020-11-02 10:38 收件人: user-zh 抄送: Jark Wu 主题: Re: flink mysql cdc + hive streaming疑问 Hi, 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似

Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 Jingsong Li
- 你可以用 proc-time - 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的 On Mon, Nov 2, 2020 at 10:38 AM Rui Li wrote: > Hi, > > 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success > file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下

回复: flink mysql cdc + hive streaming疑问

2020-11-01 文章 Zhang Yuxiao
, 发件人: 陈帅 发送时间: 2020年11月1日 下午 05:36 收件人: Jark Wu 抄送: user-zh 主题: Re: flink mysql cdc + hive streaming疑问 最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive streaming不能自动注册hive分区吗?还是我使用的姿势不对? 陈帅 于2020年11月1日周日 下午5:24写道: > 改用 TEXTFILE 存储hive表数据以便下载hive文件观

Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 陈帅
我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive shell查不到数据。 import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import org.apache.flink.api.common.serialization.SimpleStringSchema; import

Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 陈帅
最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive streaming不能自动注册hive分区吗?还是我使用的姿势不对? 陈帅 于2020年11月1日周日 下午5:24写道: > 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 > ") STORED AS TEXTFILE TBLPROPERTIES (" > > 这是生成的hive表建表语句 > > hive> show create table team; > OK > CREATE TABLE `team`( >

Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 陈帅
改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容 ") STORED AS TEXTFILE TBLPROPERTIES (" 这是生成的hive表建表语句 hive> show create table team; OK CREATE TABLE `team`( `team_id` int, `team_name` string, `create_time` string, `update_time` string, `op` string) PARTITIONED BY ( `dt` string, `hr` string,

Re: flink mysql cdc + hive streaming疑问

2020-11-01 文章 陈帅
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。 生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/ part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3 陈帅 于2020年11月1日周日 下午4:43写道: >

Re: flink mysql cdc + hive streaming疑问

2020-10-31 文章 Jark Wu
你检查一下 hive 文件是否正常生成了? 我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger policy 是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。 Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger On Sat, 31 Oct

Re: flink mysql cdc + hive streaming疑问

2020-10-31 文章 陈帅
谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗? cdc -> kafka示例消息如下 {"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31 11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"} import

Re: flink mysql cdc + hive streaming疑问

2020-10-30 文章 Jark Wu
1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。 2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming 相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive 中进行合并。merge过程可以参考这篇文章[1]。 3. 你可以 ts + INTERVAL '8'

回复:flink mysql cdc + hive streaming疑问

2020-10-30 文章 罗显宴
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。 | | 罗显宴 | | 邮箱:15927482...@163.com | 签名由 网易邮箱大师 定制 在2020年10月31日 12:06,陈帅 写道: 我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink

flink mysql cdc + hive streaming疑问

2020-10-30 文章 陈帅
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛 Exception in thread "main" org.apache.flink.table.api.TableException: AppendStreamTableSink doesn't support consuming update and delete changes which is produced by node TableSourceScan(table=[[hive_catalog, cdc, team]], fields=[team_id,

Re: 通过sql client操作flink mysql-cdc异常

2020-09-14 文章 Jark Wu
1. 请检查所有的 jar 包是否有破损 2. 只需保留 flink-sql-connector-mysql-cdc-1.1.0.jar, flink-format-changelog-json-1.1.0.jar 这两个 jar 包, flink-connector-debezium-1.1.0.jar 和 flink-connector-mysql-cdc-1.1.0.jar 不需要。 Best, Jark On Sun, 13 Sep 2020 at 11:10, 陈帅 wrote: > flink版本是1.11.1,我将 >

通过sql client操作flink mysql-cdc异常

2020-09-12 文章 陈帅
flink版本是1.11.1,我将 flink-connector-debezium-1.1.0.jar, flink-connector-mysql-cdc-1.1.0.jar, flink-sql-connector-kafka_2.12-1.11.1.jar, flink-sql-connector-mysql-cdc-1.1.0.jar, flink-format-changelog-json-1.1.0.jar 下载并加入到 $FLINK_HOME/lib目录,并以embedded模式启动flink sql client,同时在mysql中插入一张表,然后在flink sql

Re: Flink Mysql sink按时间分库分表

2020-08-05 文章 Leonard Xu
> 大家好: > > > 想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛? > > > 我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink > JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛? > > > 多谢。 > > > > > > > > > > > -- > > 张健

Flink Mysql sink按时间分库分表

2020-08-05 文章 张健
大家好: 想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛? 我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛? 多谢。 -- 张健