1. 没有初始的全量数据可能是会有问题的。

3. 你的 format 再解析 update 时,时先发的 before 还是 after?
4. 你的数据在 kafka 中时有序的么?也就是同一 key 的所有数据都在一个 partition 中不?

On Fri, 20 Nov 2020 at 12:46, kandy.wang <[email protected]> wrote:

>
>
>
>
>
>
> 1.是的。  这个程序跑起来的时候,是无状态的,然后开始慢慢积累状态吧。
>
> 2. 没有开启
>
>
>
>
> 在 2020-11-20 11:49:44,"Jark Wu" <[email protected]> 写道:
> >实现上应该没什么问题。
> >
> >1. 你的cdc 数据中是不是没有包括全量数据,直接从某天的增量开始读的?
> >2. 是否开启 mini-batch了?
> >
> >Best,
> >Jark
> >
> >On Fri, 20 Nov 2020 at 11:44, kandy.wang <[email protected]> wrote:
> >
> >> hi Jark:
> >>
> >>
> >> 打了一下log 看了一下聚合相关的几个字段: v_spu_id 、leaving_num 、price
> >> 都没有出现NULL的情况,在sql-client里看了一下也没有出现NULL的情况
> >>
> >> 自定义的format逻辑和canal的类似,insert update delete ,update 消息是需要发送2条
> update_before
> >> update_after,format逻辑是应该这么写的吧。
> >>
> >>
> >>
> >>
> >> 在 2020-11-19 23:13:19,"Jark Wu" <[email protected]> 写道:
> >> >你可以先直接 select * from hive.database.table; 看看每个字段的值是否是正常正确的,有无 null
> >> >值的,以验证你的自定义 format 没有问题。
> >> >
> >> >Best,
> >> >Jark
> >> >
> >> >On Thu, 19 Nov 2020 at 22:41, kandy.wang <[email protected]> wrote:
> >> >
> >> >> --mysql表
> >> >> CREATE TABLE IF NOT EXISTS
> `mysql_realtime_leaving_price_spu_index_agg`(
> >> >>    `id` INT UNSIGNED AUTO_INCREMENT,
> >> >>    `spu_id` BIGINT NOT NULL,
> >> >>    `leaving_price`  DECIMAL(10, 5)
> >> >>    PRIMARY KEY ( `id` ),
> >> >>    unique key idx_spu_id (spu_id)
> >> >> )ENGINE=InnoDB DEFAULT CHARSET=utf8
> >> >>
> >> >>
> >> >> --flink表
> >> >> CREATE TABLE
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> (
> >> >>    `spu_id` BIGINT ,
> >> >>    `leaving_price`  DECIMAL(10, 5),
> >> >>     PRIMARY KEY ( `spu_id`) NOT ENFORCED
> >> >> ) WITH (
> >> >>   'connector' = 'jdbc',
> >> >>    'url' = 'jdbc:mysql://...',
> >> >>    'table-name' = 'mysql_realtime_leaving_price_spu_index_agg',
> >> >>    'username' = '...',
> >> >>    'password' = '..'
> >> >> );
> >> >>
> >> >>
> >> >> --binlog 2mysql
> >> >>
> >> >> insert into
> hive.temp_flink.mysql_realtime_leaving_price_spu_index_agg
> >> >>
> >> >> SELECT v_spu_id as spu_id,sum(leaving_num*price) as leaving_price
> >> >>
> >> >> FROM hive.database.table
> >> >>
> >> >> group by v_spu_id;
> >> >>
> >> >>
> >> >> hive.database.table是指向公司自己的kafka,实现了类似cannal cdc的那4种消息格式。
> >> >>
> >> >>
> >> >> 问题是 mysql结果表 leaving_price 出现很多NULL的情况,看了一下数据leaving_num  price
> >> >> 字段都不可能出现NULL的情况。就不清楚为啥结果表可能会为NULL。
> >> >> 有什么好的排查思路么?
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >>
>

回复