??????????lookup??????????????????????on??????key,??????????????????????????????????????????
??????????batch????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2021??8??4??(??????) ????4:44
??????: "user-zh"<[email protected]>;
????: Re: Flink sql ????????????????
Hi??
????????????processing time temporal join ??????????????... ????????????????
event time temporal
join[1] ???????? join ??????????????????????????????????????????????????????
[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/sql/queries/joins/#event-time-temporal-join
carlc <[email protected]> ??2021??8??4?????? ????3:57??????
> ????????????????????????????????????????????????????????
>
> create view v_bl_user_count as (
> select user_id, count(1)
> from mysql_user_blacklist
> group by user_id
> );
>
> select t1.`user_id`
> , t1.`event_type`
> , t1.`current_ts`
> from kafka_user_event t1
> left join v_bl_user_count FOR SYSTEM_TIME AS OF t1.`proc_time` AS t2 on
> t1.`user_id` = t2.`user_id`
> where t1.`event_type` = ??LOGIN??
>
> ????????:
> org.apache.flink.table.api.TableException: Processing-time temporal join
> is not supported yet.
> at
>
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.createJoinOperator(StreamExecTemporalJoin.scala:273)
> at
>
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoinToCoProcessTranslator.getJoinOperator(StreamExecTemporalJoin.scala:224)
> at
>
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:115)
> at
>
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecTemporalJoin.translateToPlanInternal(StreamExecTemporalJoin.scala:56)
> at
>
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:59)
>
>
>
>
> > ?? 2021??8??4????14:18??Caizhi Weng <[email protected]> ??????
> >
> > Hi??
> >
> > ???????????????? join
???????????????????????????????????????????????? count ????????
4????????????????
> >
> > ???????????????????? agg ???????????????? agg ???????????????? lookup
join ???? process time temporal
> > table join ????
> >
> > carlc <[email protected]> ??2021??8??4?????? ????10:41??????
> >
> >> ?????????????????????????????
?????????????????????????????????????????????????????????????????????? ~
> >>
> >> -- ????????(????????...):
> >> -- ???? kafka_user_event ?? event_type = LOGIN ??????????????????
> mysql_user_blacklist
> >> ???????? user_id ?????????????? -> ??: ??????????????????
> >>
> >> -- 1. ????user_blacklist??
> >> CREATE TABLE `user_blacklist` (
> >> `user_id` bigint(20) NOT NULL,
> >> `create_time` datetime NOT NULL,
> >> PRIMARY KEY (`user_id`,`create_time`)
> >> ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
> >> INSERT INTO user_blacklist (`user_id`, `create_time`)
> >> VALUES (1,'2021-01-01 00:00:00'), (1,'2021-01-02 00:00:00'),
> >> (2,'2021-01-04 00:00:00');
> >>
> >> -- 2. ????kafka????:
> >> -- ??1??:
{"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-01
> >> 00:00:00"}
> >> -- ??2??:
{"user_id":1,"event_type":"LOGIN","current_ts":"2021-10-02
> >> 00:00:00"}
> >>
> >> -- ????????:
> >> ????????1??kafka????????????????:
> >> | OP| user_id| event_type | current_ts| bl_count |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 1 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 2 |
> >> ????????????1??kafka????????????????:
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 3 |
> >> | +I | 1 | LOGIN | 2021-10-01T00:00 | 4 |
> >>
> >> ?? SQL ????:
> >>
> >> create table kafka_user_event
> >> (
> >> `user_id` BIGINT,
> >> `event_type` STRING,
> >> `current_ts` timestamp(3),
> >> `proc_time` AS PROCTIME()
> >> ) WITH (
> >> 'connector' = 'kafka',
> >> ...
> >> );
> >>
> >> create table mysql_user_blacklist
> >> (
> >> user_id BIGINT,
> >> create_time timestamp(3),
> >> primary key (user_id,create_time) not enforced
> >> ) WITH (
> >> 'connector' = 'jdbc',
> >> ??
> >> );
> >>
> >> create view v2_user_event as (
> >> select t1.`user_id`
> >> , t1.`event_type`
> >> , t1.`current_ts`
> >> , count(1) over ( partition by t2.`user_id` order by
t1.`proc_time` ROWS
> >> BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW ) as bl_count
> >> from kafka_user_event t1
> >> left join mysql_user_blacklist FOR SYSTEM_TIME AS OF
t1.`proc_time` AS
> t2
> >> on t1.`user_id` = t2.`user_id`
> >> where t1.`event_type` = 'LOGIN'
> >> );
> >>
> >> select * from v2_user_event;
> >>
> >>
>
>