额...,说的太对了, batch任务没问题,但流任务就发生意想不到的问题.
该需求就是翻译原离线SQL(传统数仓), 现要改成实时分析. 结果发现有些需求好像实现不了 非常感谢! > 在 2021年8月4日,16:50,黑色 <[email protected]> 写道: > > 你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的 > 当然你要是batch来了,没问题 > > > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > > <[email protected]>; > 发送时间: 2021年8月4日(星期三) 下午4:44 > 收件人: "user-zh"<[email protected]>; > > 主题: Re: Flink sql 维表聚合问题请教 > > > > Hi! > > 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal > join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。 > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join > > carlc <[email protected]> 于2021年8月4日周三 下午3:57写道: > > > 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。 > > > > create view v_bl_user_count as ( > > select user_id, count(1) > > from mysql_user_blacklist > > group by user_id > > ); > > > > select t1.`user_id` > > , t1.`event_type` > > , t1.`current_ts` > > from kafka_user_event t1 > > left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on > > t1.`user_id` = t2.`user_id` > > where t1.`event_type` = ‘LOGIN’ > > > > 异常信息: > > org.apache.flink.table.api.TableException: Processing-time temporal join > > is not supported yet. > > at > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273) > > at > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224) > > at > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115) > > at > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56) > > at > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59) > > > > > > > > > > > 在 2021年8月4日,14:18,Caizhi Weng <[email protected]> 写道: > > > > > > Hi! > > > > > > 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。 > > > > > > 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time > temporal > > > table join 了。 > > > > > > carlc <[email protected]> 于2021年8月4日周三 上午10:41写道: > > > > > >> 请教下如何在维表上做聚合操作? 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~ > > >> > > >> -- 模拟需求(有点牵强...): > > >> -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表 > > mysql_user_blacklist > > >> 统计对应 user_id 在维表中的次数 -> 即: 在维表上做聚合操作 > > >> > > >> -- 1. 创建user_blacklist表 > > >> CREATE TABLE `user_blacklist` ( > > >> `user_id` bigint(20) NOT NULL, > > >> `create_time` datetime NOT NULL, > > >> PRIMARY KEY (`user_id`,`create_time`) > > >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; > > >> INSERT INTO user_blacklist (`user_id`, `create_time`) > > >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'), > > >> (2,'2021-01-04 00:00:00'); > > >> > > >> -- 2. 模拟kafka数据: > > >> -- 第1条: > {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01 > > >> 00:00:00"} > > >> -- 第2条: > {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02 > > >> 00:00:00"} > > >> > > >> -- 操作步骤: > > >> 当发送第1条kafka数据得到如下输出: > > >> | OP| user_id| event_type | current_ts| bl_count | > > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 | > > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 | > > >> 当再次发送第1条kafka数据得到如下输出: > > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 | > > >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 | > > >> > > >> — SQL 如下: > > >> > > >> create table kafka_user_event > > >> ( > > >> `user_id` BIGINT, > > >> `event_type` STRING, > > >> `current_ts` timestamp(3), > > >> `proc_time` AS PROCTIME() > > >> ) WITH ( > > >> 'connector' = 'kafka', > > >> ... > > >> ); > > >> > > >> create table mysql_user_blacklist > > >> ( > > >> user_id BIGINT, > > >> create_time timestamp(3), > > >> primary key (user_id,create_time) not enforced > > >> ) WITH ( > > >> 'connector' = 'jdbc', > > >> … > > >> ); > > >> > > >> create view v2_user_event as ( > > >> select t1.`user_id` > > >> , t1.`event_type` > > >> , t1.`current_ts` > > >> , count(1) over ( partition by t2.`user_id` order by > t1.`proc_time` ROWS > > >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count > > >> from kafka_user_event t1 > > >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF > t1.`proc_time` AS > > t2 > > >> on t1.`user_id` = t2.`user_id` > > >> where t1.`event_type` = 'LOGIN' > > >> ); > > >> > > >> select * from v2_user_event; > > >> > > >> > > > >
