请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
-- 模拟需求(有点牵强...):
-- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist 统计对应
user_id 在维表中的次数 -> 即: 在维表上做聚合操作
-- 1. 创建user_blacklist表
CREATE TABLE `user_blacklist` (
`user_id` bigint(20) NOT NULL,
`create_time` datetime NOT NULL,
PRIMARY KEY (`user_id`,`create_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO user_blacklist (`user_id`, `create_time`)
VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), (2,'2021-01-04
00:00:00');
-- 2. 模拟kafka数据:
-- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 00:00:00"}
-- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 00:00:00"}
-- 操作步骤:
当发送第1条kafka数据得到如下输出:
| OP| user_id| event_type | current_ts| bl_count |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
当再次发送第1条kafka数据得到如下输出:
| +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
| +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
— SQL 如下:
create table kafka_user_event
(
`user_id` BIGINT,
`event_type` STRING,
`current_ts` timestamp(3),
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'kafka',
...
);
create table mysql_user_blacklist
(
user_id BIGINT,
create_time timestamp(3),
primary key (user_id,create_time) not enforced
) WITH (
'connector' = 'jdbc',
…
);
create view v2_user_event as (
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
, count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
from kafka_user_event t1
left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
t1.`user_id` = t2.`user_id`
where t1.`event_type` = 'LOGIN'
);
select * from v2_user_event;