实现上应该没什么问题。 1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的? 2. 是否开启 mini-batch了?
Best, Jark On Fri, 20 Nov 2020 at 11:44, kandy.wang <[email protected]> wrote: > hi Jark: > > > 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price > 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况 > > 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条 update_before > update_after,format逻辑是应该这么写的吧。 > > > > > 在 2020-11-19 23:13:19,"Jark Wu" <[email protected]> 写道: > >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null > >值的,以验证你的自定义 format 没有问题。 > > > >Best, > >Jark > > > >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[email protected]> wrote: > > > >> --mysql表 > >> CREATE TABLE IF NOT EXISTS `mysql_realtime_leaving_price_spu_index_agg`( > >> `id` INT UNSIGNED AUTO_INCREMENT, > >> `spu_id` BIGINT NOT NULL, > >> `leaving_price` DECIMAL(10, 5) > >> PRIMARY KEY ( `id` ), > >> unique key idx_spu_id (spu_id) > >> )ENGINE=InnoDB DEFAULT CHARSET=utf8 > >> > >> > >> --flink表 > >> CREATE TABLE hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > ( > >> `spu_id` BIGINT , > >> `leaving_price` DECIMAL(10, 5), > >> PRIMARY KEY ( `spu_id`) NOT ENFORCED > >> ) WITH ( > >> 'connector' = 'jdbc', > >> 'url' = 'jdbc:mysql://...', > >> 'table-name' = 'mysql_realtime_leaving_price_spu_index_agg', > >> 'username' = '...', > >> 'password' = '..' > >> ); > >> > >> > >> --binlog 2mysql > >> > >> insert into hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg > >> > >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price > >> > >> FROM hive.database.table > >> > >> group by v_spu_id; > >> > >> > >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。 > >> > >> > >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num price > >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。 > >> 有什么好的排查思路么? > >> > >> > >> > >> > >> > >> >
