额...,说的太对了,  batch任务没问题,但流任务就发生意想不到的问题.

该需求就是翻译原离线SQL(传统数仓), 现要改成实时分析. 结果发现有些需求好像实现不了

非常感谢!



> 在 2021年8月4日,16:50,黑色 <[email protected]> 写道:
> 
> 你这是维表lookup,上流来数据来了,根据on后面的key,是当前去查快照返回结果,不可能是聚合之后的
> 当然你要是batch来了,没问题
> 
> 
> 
> 
> ------------------&nbsp;原始邮件&nbsp;------------------
> 发件人:                                                                          
>                                               "user-zh"                       
>                                                              
> <[email protected]&gt;;
> 发送时间:&nbsp;2021年8月4日(星期三) 下午4:44
> 收件人:&nbsp;"user-zh"<[email protected]&gt;;
> 
> 主题:&nbsp;Re: Flink sql 维表聚合问题请教
> 
> 
> 
> Hi!
> 
> 我查了一下,processing time temporal join 确实还没有实现... 这里可能需要变成 event time temporal
> join[1] 或者双流 join 了。但更好的方法可能是维表本身就已经计算好所需的数据。
> 
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
> 
> carlc <[email protected]&gt; 于2021年8月4日周三 下午3:57写道:
> 
> &gt; 感谢大佬回复,我尝试着换种写法,但这样些的话会直接报错。
> &gt;
> &gt; create view v_bl_user_count as (
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; select user_id, count(1)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; from mysql_user_blacklist
> &gt;&nbsp;&nbsp;&nbsp;&nbsp; group by user_id
> &gt; );
> &gt;
> &gt; select t1.`user_id`
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; , t1.`event_type`
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; , t1.`current_ts`
> &gt; from kafka_user_event t1
> &gt; left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> &gt; t1.`user_id` = t2.`user_id`
> &gt; where t1.`event_type` = ‘LOGIN’
> &gt;
> &gt; 异常信息:
> &gt; org.apache.flink.table.api.TableException: Processing-time temporal join
> &gt; is not supported yet.
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; 
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> &gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
> &gt; 
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
> &gt;
> &gt;
> &gt;
> &gt;
> &gt; &gt; 在 2021年8月4日,14:18,Caizhi Weng <[email protected]&gt; 写道:
> &gt; &gt;
> &gt; &gt; Hi!
> &gt; &gt;
> &gt; &gt; 这是因为每次维表 join 都会向下游发送两条数据,一共发送了四条,所以最后 count 的结果为 4,是符合预期的。
> &gt; &gt;
> &gt; &gt; 为什么不直接对维表做 agg 呢?当然对维表做 agg 的话,这里就不是 lookup join 而是 process time 
> temporal
> &gt; &gt; table join 了。
> &gt; &gt;
> &gt; &gt; carlc <[email protected]&gt; 于2021年8月4日周三 上午10:41写道:
> &gt; &gt;
> &gt; &gt;&gt; 请教下如何在维表上做聚合操作?&nbsp; 如下操作与预期不符合,不知道是姿势不正确还是其他原因,麻烦大佬些指教下 ~
> &gt; &gt;&gt;
> &gt; &gt;&gt; -- 模拟需求(有点牵强...):
> &gt; &gt;&gt; -- 过滤 kafka_user_event 中 event_type = LOGIN 数据,并且关联维表
> &gt; mysql_user_blacklist
> &gt; &gt;&gt; 统计对应 user_id 在维表中的次数 -&gt; 即: 在维表上做聚合操作
> &gt; &gt;&gt;
> &gt; &gt;&gt; -- 1. 创建user_blacklist表
> &gt; &gt;&gt; CREATE TABLE `user_blacklist` (
> &gt; &gt;&gt; `user_id` bigint(20) NOT NULL,
> &gt; &gt;&gt; `create_time` datetime NOT NULL,
> &gt; &gt;&gt; PRIMARY KEY (`user_id`,`create_time`)
> &gt; &gt;&gt; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> &gt; &gt;&gt; INSERT INTO user_blacklist (`user_id`, `create_time`)
> &gt; &gt;&gt; VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> &gt; &gt;&gt; (2,'2021-01-04 00:00:00');
> &gt; &gt;&gt;
> &gt; &gt;&gt; -- 2. 模拟kafka数据:
> &gt; &gt;&gt; -- 第1条: 
> {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> &gt; &gt;&gt; 00:00:00"}
> &gt; &gt;&gt; -- 第2条: 
> {"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> &gt; &gt;&gt; 00:00:00"}
> &gt; &gt;&gt;
> &gt; &gt;&gt; -- 操作步骤:
> &gt; &gt;&gt; 当发送第1条kafka数据得到如下输出:
> &gt; &gt;&gt; | OP| user_id| event_type | current_ts| bl_count |
> &gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> &gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> &gt; &gt;&gt; 当再次发送第1条kafka数据得到如下输出:
> &gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> &gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> &gt; &gt;&gt;
> &gt; &gt;&gt; — SQL 如下:
> &gt; &gt;&gt;
> &gt; &gt;&gt; create table kafka_user_event
> &gt; &gt;&gt; (
> &gt; &gt;&gt; `user_id` BIGINT,
> &gt; &gt;&gt; `event_type` STRING,
> &gt; &gt;&gt; `current_ts` timestamp(3),
> &gt; &gt;&gt; `proc_time` AS PROCTIME()
> &gt; &gt;&gt; ) WITH (
> &gt; &gt;&gt; 'connector' = 'kafka',
> &gt; &gt;&gt; ...
> &gt; &gt;&gt; );
> &gt; &gt;&gt;
> &gt; &gt;&gt; create table mysql_user_blacklist
> &gt; &gt;&gt; (
> &gt; &gt;&gt; user_id BIGINT,
> &gt; &gt;&gt; create_time timestamp(3),
> &gt; &gt;&gt; primary key (user_id,create_time) not enforced
> &gt; &gt;&gt; ) WITH (
> &gt; &gt;&gt; 'connector' = 'jdbc',
> &gt; &gt;&gt; …
> &gt; &gt;&gt; );
> &gt; &gt;&gt;
> &gt; &gt;&gt; create view v2_user_event as (
> &gt; &gt;&gt; select t1.`user_id`
> &gt; &gt;&gt; , t1.`event_type`
> &gt; &gt;&gt; , t1.`current_ts`
> &gt; &gt;&gt; , count(1) over ( partition by t2.`user_id` order by 
> t1.`proc_time` ROWS
> &gt; &gt;&gt; BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> &gt; &gt;&gt; from kafka_user_event t1
> &gt; &gt;&gt; left join mysql_user_blacklist FOR SYSTEM_TIME AS OF 
> t1.`proc_time` AS
> &gt; t2
> &gt; &gt;&gt; on t1.`user_id` = t2.`user_id`
> &gt; &gt;&gt; where t1.`event_type` = 'LOGIN'
> &gt; &gt;&gt; );
> &gt; &gt;&gt;
> &gt; &gt;&gt; select * from v2_user_event;
> &gt; &gt;&gt;
> &gt; &gt;&gt;
> &gt;
> &gt;

回复