感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
create view v_bl_user_count as (
select user_id, count(1)
from mysql_user_blacklist
group by user_id
);
select t1.`user_id`
, t1.`event_type`
, t1.`current_ts`
from kafka_user_event t1
left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
t1.`user_id` = t2.`user_id`
where t1.`event_type` = ‘LOGIN’
异常信息:
org.apache.flink.table.api.TableException: Processing-time temporal join is not
supported yet.
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> 在 2021年8月4日,14:18,Caizhi Weng <[email protected]> 写道:
>
> Hi!
>
> 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
>
> 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time temporal
> table join 了。
>
> carlc <[email protected]> 于2021年8月4日周三 上午10:41写道:
>
>> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
>>
>> -- 模拟需求(有点牵强...):
>> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 mysql_user_blacklist
>> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作
>>
>> -- 1. 创建user_blacklist表
>> CREATE TABLE `user_blacklist` (
>> `user_id` bigint(20) NOT NULL,
>> `create_time` datetime NOT NULL,
>> PRIMARY KEY (`user_id`,`create_time`)
>> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
>> INSERT INTO user_blacklist (`user_id`, `create_time`)
>> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
>> (2,'2021-01-04 00:00:00');
>>
>> -- 2. 模拟kafka数据:
>> -- 第1条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
>> 00:00:00"}
>> -- 第2条: {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
>> 00:00:00"}
>>
>> -- 操作步骤:
>> 当发送第1条kafka数据得到如下输出:
>> | OP| user_id| event_type | current_ts| bl_count |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
>> 当再次发送第1条kafka数据得到如下输出:
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
>> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
>>
>> — SQL 如下:
>>
>> create table kafka_user_event
>> (
>> `user_id` BIGINT,
>> `event_type` STRING,
>> `current_ts` timestamp(3),
>> `proc_time` AS PROCTIME()
>> ) WITH (
>> 'connector' = 'kafka',
>> ...
>> );
>>
>> create table mysql_user_blacklist
>> (
>> user_id BIGINT,
>> create_time timestamp(3),
>> primary key (user_id,create_time) not enforced
>> ) WITH (
>> 'connector' = 'jdbc',
>> …
>> );
>>
>> create view v2_user_event as (
>> select t1.`user_id`
>> , t1.`event_type`
>> , t1.`current_ts`
>> , count(1) over ( partition by t2.`user_id` order by t1.`proc_time` ROWS
>> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
>> from kafka_user_event t1
>> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2
>> on t1.`user_id` = t2.`user_id`
>> where t1.`event_type` = 'LOGIN'
>> );
>>
>> select * from v2_user_event;
>>
>>