Hi, 我想使用 Flink MySQL CDC Connector 以 DataStream 方式消费 MySQL Binlog
输出变更数据到下游kafka topic (1),同时监听database schema change事件,将最新的schema数据输出到下游另一个kafka
topic (2),又或者直接注册schema到 confluent / apicurio schema registry,查了一下flink
cdc官方文档[1],并没有这方面的信息。请问应该怎么实现呢?有相关文档或例子么?谢谢!
[1]
https
我想利用flink mysql cdc输出变更数据到kafka,同时将table schema注册到confluent schema
registry,以模拟debezium kafka connect效果[1]。还请指教要如何下手呢?谢谢!
[1] https://blog.csdn.net/OldDirverHelpMe/article/details/107881170
@all:
大家好,Flink Mysql CDC实时同步数据,发现mysql和下游StarRocks的数据量对不上。
StarRocks用的是primary key模型,
版本:
Flink1.13.3
Flink CDC 2.1.1
报错如下:
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received
unexpected exception while polling the records
@all:
Flink mysql cdc凌晨同步报错,流任务都失败了。报错如下:
org.apache.flink.runtime.JobException: Recovery is suppressed by
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=3,
backoffTimeMS=3)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure
这个报错日志应该没有关系,是 rest client 的报错,不是正常数据处理流程的报错。
mysql-cdc 没有 jackson json 解析相关的代码。
On Wed, 5 Jan 2022 at 17:09, Fei Han
wrote:
>
> @all:
> Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
>
> httpResponseStatus=200 OK}
> org.apache.fli
Hi!
根本原因是 Caused by: java.io.StreamCorruptedException: unexpected block
data,也就是说集群上这个 class 的版本和客户端这个 class 的版本不一致。建议检查集群和客户端的 flink 版本以及 cdc
connector 版本是否一致。
Fei Han 于2022年1月5日周三 19:12写道:
> @all:
> 大家好,在Flink mysql cdc中同步数据时,对接mysql无法同步
> 版本:
> flink1.13.3
> flink mysql cdc
@all:
Flink mysql cdc同步数据报字段不识别,是什么原因造成的?难道是关键字不识别?报错日志如下:
httpResponseStatus=200 OK}
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException:
Unrecognized field "status" (class
org.apache.flink.runtime.rest.messages.ErrorResponseBody),
er-zh"
https://ci.apache.org/projects/flink/flink-docs-master/zh/docs/connectors/table/jdbc/#sink-buffer-flush-max-rows
a <806040...@qq.com.invalid ??2021??10??26?? 9:49??
????????flink??mysqlsinkJdbcDynamicTableSink???
flink??mysqlsinkJdbcDynamicTableSink??checkpoint??1.??flush??mysql??
2.??mysql??qps??sum??1000??/s
由于各种原因,dba进行了数据库主从切换。
目前我采用flink mysql-cdc采集binlog,但是数据库主从切换后,导致binlog的pos不一致。
flink 程序会自动重启,在经过配置的重启策略后就会挂掉,日志打印
org.apache.kafka.connect.errors.ConnectException: The connector is trying to
read binlog starting at GTIDs 3fa7d5bb-65f3-11eb-9413-b0262879b560:1-730774004
and binlog file 'mysql-bin
https://debezium.io/documentation/reference/1.5/connectors/mysql.html#mysql-basic-types
支持
--
Sent from: http://apache-flink.147419.n8.nabble.com/
flink mysql cdc支持mysql的json格式吗?
??flink mysql cdc
1.flink mysql
cdc??mysql??binlog??mysql
; 你会发现文件生成了但是没有 _SUCCESS文件;
> 但是这样指定也不行??
> tEnv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(8));
>
> 它的增删改只是在hive中对数据做了标记;后面可以通过join来处理数据
>
>
>
> hdxg1101300...@163.com
>
> 发件人: chengyanan1...@foxmail.com
> 发送时间: 2020-11-02 13:37
> 收件人: user-zh
> 主题: Re: Re
mysql cdc + hive streaming疑问
你好!
看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢?
发件人: Rui Li
发送时间: 2020-11-02 10:38
收件人: user-zh
抄送: Jark Wu
主题: Re: flink mysql cdc + hive streaming疑问
Hi,
正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
file是否创建了。如果success file
你好!
看到你代码里,将增删改信息当做一个字段存到了Hive表中,那么到最后这些操作是怎么合并的呢?
发件人: Rui Li
发送时间: 2020-11-02 10:38
收件人: user-zh
抄送: Jark Wu
主题: Re: flink mysql cdc + hive streaming疑问
Hi,
正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似
- 你可以用 proc-time
- 或者在你的Source上添加 **UTC时区的Watermark**,注意是 **UTC**,SQL的watermark都是 **UTC**的
On Mon, Nov 2, 2020 at 10:38 AM Rui Li wrote:
> Hi,
>
> 正常情况是可以自动提交分区的,我看你commit policy指定了metastore+success-file,可以检查一下分区目录下success
> file是否创建了。如果success file也没有的话说明没有触发分区提交。另外提交分区时会打印类似这样的日志,可以在log中查找一下
,
发件人: 陈帅
发送时间: 2020年11月1日 下午 05:36
收件人: Jark Wu
抄送: user-zh
主题: Re: flink mysql cdc + hive streaming疑问
最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?
陈帅 于2020年11月1日周日 下午5:24写道:
> 改用 TEXTFILE 存储hive表数据以便下载hive文件观
我查过hive文件是有生成的,按照我定义的partition。按照你的建议在ds2这个stream上加了watermark,运行后hive文件也生成了,但同样通过hive
shell查不到数据。
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import
最后,在hive shell中执行 “msck repair table team;” 命令后就能查询到写的数据了,难道flink hive
streaming不能自动注册hive分区吗?还是我使用的姿势不对?
陈帅 于2020年11月1日周日 下午5:24写道:
> 改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
> ") STORED AS TEXTFILE TBLPROPERTIES ("
>
> 这是生成的hive表建表语句
>
> hive> show create table team;
> OK
> CREATE TABLE `team`(
>
改用 TEXTFILE 存储hive表数据以便下载hive文件观察内容
") STORED AS TEXTFILE TBLPROPERTIES ("
这是生成的hive表建表语句
hive> show create table team;
OK
CREATE TABLE `team`(
`team_id` int,
`team_name` string,
`create_time` string,
`update_time` string,
`op` string)
PARTITIONED BY (
`dt` string,
`hr` string,
之前没加watermark和设置分区是能够写hive文件并查询出来的,只是设置分区后hive文件是生成出来了但查询不出来,所以我感觉跟watermark设置与否没太大关系。
生成的hive分区文件路径类似于 /user/hive/warehouse/ods.db/team/dt=20201101/hr=16/mi=30/
part-dc55d200-dd03-4f26-8a3e-60bfa1dd97f2-0-3
陈帅 于2020年11月1日周日 下午4:43写道:
>
你检查一下 hive 文件是否正常生成了?
我看你上面的代码,kafka->hive 流程中是没有 watermark 的,而"partition-time" 的 trigger policy
是基于 watermark 驱动的,所以可能是这个原因导致 hive 中没有数据。
Best,
Jark
[1]:
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connectors/filesystem.html#sink-partition-commit-trigger
On Sat, 31 Oct
谢谢Jark细致解答,我按照你给的思路试了下。遇到一个问题是,在不开hive分区的情况下写入和读取是没有问题的,但在开启hive表时间分区后,写入是成功了,然而通过hive
shell查不到数据,表结构是正确的。(代码我注释掉了) 能帮忙看下是哪里写得不对吗?
cdc -> kafka示例消息如下
{"data":{"team_id":1001,"team_name":"Sun","create_time":"2020-10-31
11:25:38","update_time":"2020-10-31 11:25:38"},"op":"+I"}
import
1. 是的。目前 Hive不支持直接消费 changlog ,这个主要原因是 hive 对 cdc 的支持不是很好。即使是 hive
ACID/transaction 功能,由于其与其他计算引擎集成的不好,也鲜有人用。
2. cdc -> kafka -> hive streaming 的方案是可行的,不过 kafka -> hive streaming
相当于原始数据同步,到 hive 中仍然是 cdc logs 内容,并没有实时合并,需要用户自己写 query 在 hive
中进行合并。merge过程可以参考这篇文章[1]。
3. 你可以 ts + INTERVAL '8'
hive3可以hive2不可以,换了kafka也没用吧,hive3之前一般都不支持数据仓库的更改。不知道回答的对不对,欢迎指正。
| |
罗显宴
|
|
邮箱:15927482...@163.com
|
签名由 网易邮箱大师 定制
在2020年10月31日 12:06,陈帅 写道:
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink
我想使用flink sql的mysql-cdc connector直接将mysql表数据实时同步进hive,运行后抛
Exception in thread "main" org.apache.flink.table.api.TableException:
AppendStreamTableSink doesn't support consuming update and delete changes
which is produced by node TableSourceScan(table=[[hive_catalog, cdc,
team]], fields=[team_id,
1. 请检查所有的 jar 包是否有破损
2. 只需保留 flink-sql-connector-mysql-cdc-1.1.0.jar,
flink-format-changelog-json-1.1.0.jar 这两个 jar 包,
flink-connector-debezium-1.1.0.jar 和
flink-connector-mysql-cdc-1.1.0.jar 不需要。
Best,
Jark
On Sun, 13 Sep 2020 at 11:10, 陈帅 wrote:
> flink版本是1.11.1,我将
>
flink版本是1.11.1,我将
flink-connector-debezium-1.1.0.jar,
flink-connector-mysql-cdc-1.1.0.jar,
flink-sql-connector-kafka_2.12-1.11.1.jar,
flink-sql-connector-mysql-cdc-1.1.0.jar,
flink-format-changelog-json-1.1.0.jar
下载并加入到 $FLINK_HOME/lib目录,并以embedded模式启动flink sql
client,同时在mysql中插入一张表,然后在flink sql
> 大家好:
>
>
> 想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?
>
>
> 我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
> JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?
>
>
> 多谢。
>
>
>
>
>
>
>
>
>
>
> --
>
> 张健
大家好:
想问下目前 flink mysql sink想要实现按时间分库分表 有什么配置项可以使用嘛?
我目前的需求是数据源经过一个简单的etl处理写入到mysql中,供业务实时查询。由于业务场景只需要查询当天的数据(历史数据有另外的离线模块处理),所以想每天的数据单独写入一张表中(表名为xx_MMDD)这样。但目前Flink
JDBC sink是要指定表名的。请问下有什么简单的实现方案嘛?或者说对于我这样的场景有什么更好的实现方式嘛?
多谢。
--
张健
31 matches
Mail list logo