??????????lookup??????????????????????on??????key,??????????????????????????????????????????
??????????batch????????????




------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<[email protected]&gt;;
????????:&nbsp;2021??8??4??(??????) ????4:44
??????:&nbsp;"user-zh"<[email protected]&gt;;

????:&nbsp;Re: Flink sql ????????????????



Hi??

????????????processing time temporal join ??????????????... ???????????????? 
event time temporal
join[1] ???????? join ??????????????????????????????????????????????????????

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join

carlc <[email protected]&gt; ??2021??8??4?????? ????3:57??????

&gt; ????????????????????????????????????????????????????????
&gt;
&gt; create view v_bl_user_count as (
&gt;&nbsp;&nbsp;&nbsp;&nbsp; select user_id, count(1)
&gt;&nbsp;&nbsp;&nbsp;&nbsp; from mysql_user_blacklist
&gt;&nbsp;&nbsp;&nbsp;&nbsp; group by user_id
&gt; );
&gt;
&gt; select t1.`user_id`
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; , t1.`event_type`
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; , t1.`current_ts`
&gt; from kafka_user_event t1
&gt; left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
&gt; t1.`user_id` = t2.`user_id`
&gt; where t1.`event_type` = ??LOGIN??
&gt;
&gt; ????????:
&gt; org.apache.flink.table.api.TableException: Processing-time temporal join
&gt; is not supported yet.
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; at
&gt; 
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
&gt;
&gt;
&gt;
&gt;
&gt; &gt; ?? 2021??8??4????14:18??Caizhi Weng <[email protected]&gt; ??????
&gt; &gt;
&gt; &gt; Hi??
&gt; &gt;
&gt; &gt; ???????????????? join 
???????????????????????????????????????????????? count ???????? 
4????????????????
&gt; &gt;
&gt; &gt; ???????????????????? agg ???????????????? agg ???????????????? lookup 
join ???? process time temporal
&gt; &gt; table join ????
&gt; &gt;
&gt; &gt; carlc <[email protected]&gt; ??2021??8??4?????? ????10:41??????
&gt; &gt;
&gt; &gt;&gt; ?????????????????????????????&nbsp; 
?????????????????????????????????????????????????????????????????????? ~
&gt; &gt;&gt;
&gt; &gt;&gt; -- ????????(????????...):
&gt; &gt;&gt; -- ???? kafka_user_event ?? event_type = LOGIN ??????????????????
&gt; mysql_user_blacklist
&gt; &gt;&gt; ???????? user_id ?????????????? -&gt; ??: ??????????????????
&gt; &gt;&gt;
&gt; &gt;&gt; -- 1. ????user_blacklist??
&gt; &gt;&gt; CREATE TABLE `user_blacklist` (
&gt; &gt;&gt; `user_id` bigint(20) NOT NULL,
&gt; &gt;&gt; `create_time` datetime NOT NULL,
&gt; &gt;&gt; PRIMARY KEY (`user_id`,`create_time`)
&gt; &gt;&gt; ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
&gt; &gt;&gt; INSERT INTO user_blacklist (`user_id`, `create_time`)
&gt; &gt;&gt; VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
&gt; &gt;&gt; (2,'2021-01-04 00:00:00');
&gt; &gt;&gt;
&gt; &gt;&gt; -- 2. ????kafka????:
&gt; &gt;&gt; -- ??1??: 
{"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
&gt; &gt;&gt; 00:00:00"}
&gt; &gt;&gt; -- ??2??: 
{"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
&gt; &gt;&gt; 00:00:00"}
&gt; &gt;&gt;
&gt; &gt;&gt; -- ????????:
&gt; &gt;&gt; ????????1??kafka????????????????:
&gt; &gt;&gt; | OP| user_id| event_type | current_ts| bl_count |
&gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
&gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
&gt; &gt;&gt; ????????????1??kafka????????????????:
&gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
&gt; &gt;&gt; | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
&gt; &gt;&gt;
&gt; &gt;&gt; ?? SQL ????:
&gt; &gt;&gt;
&gt; &gt;&gt; create table kafka_user_event
&gt; &gt;&gt; (
&gt; &gt;&gt; `user_id` BIGINT,
&gt; &gt;&gt; `event_type` STRING,
&gt; &gt;&gt; `current_ts` timestamp(3),
&gt; &gt;&gt; `proc_time` AS PROCTIME()
&gt; &gt;&gt; ) WITH (
&gt; &gt;&gt; 'connector' = 'kafka',
&gt; &gt;&gt; ...
&gt; &gt;&gt; );
&gt; &gt;&gt;
&gt; &gt;&gt; create table mysql_user_blacklist
&gt; &gt;&gt; (
&gt; &gt;&gt; user_id BIGINT,
&gt; &gt;&gt; create_time timestamp(3),
&gt; &gt;&gt; primary key (user_id,create_time) not enforced
&gt; &gt;&gt; ) WITH (
&gt; &gt;&gt; 'connector' = 'jdbc',
&gt; &gt;&gt; ??
&gt; &gt;&gt; );
&gt; &gt;&gt;
&gt; &gt;&gt; create view v2_user_event as (
&gt; &gt;&gt; select t1.`user_id`
&gt; &gt;&gt; , t1.`event_type`
&gt; &gt;&gt; , t1.`current_ts`
&gt; &gt;&gt; , count(1) over ( partition by t2.`user_id` order by 
t1.`proc_time` ROWS
&gt; &gt;&gt; BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
&gt; &gt;&gt; from kafka_user_event t1
&gt; &gt;&gt; left join mysql_user_blacklist FOR SYSTEM_TIME AS OF 
t1.`proc_time` AS
&gt; t2
&gt; &gt;&gt; on t1.`user_id` = t2.`user_id`
&gt; &gt;&gt; where t1.`event_type` = 'LOGIN'
&gt; &gt;&gt; );
&gt; &gt;&gt;
&gt; &gt;&gt; select * from v2_user_event;
&gt; &gt;&gt;
&gt; &gt;&gt;
&gt;
&gt;

回复