?????? flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz
tks??




--  --
??: 
   "user-zh"

https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid> ??2021??9??22?? 11:41??

> 
> behavior,next_bv 
>
>
> ??
> {
>   "user_id": 1,
>   "item_id": 1,
>   "behavior":"pv1"
> }
> {
>   "user_id": 1,
>   "item_id": 1,
>   "behavior":"pv2"
> }
>
>
>
>
>
>
> CREATE TABLE KafkaTable (
>   `user_id` BIGINT,
>   `item_id` BIGINT,
>   `behavior` STRING,
>   proctime as PROCTIME()
> ) WITH (
>   'connector' = 'kafka',
>   'topic' = 'user_behavior',
>   'properties.bootstrap.servers' = '',
>   'properties.group.id' = 'testGroup',
>   'scan.startup.mode' = 'earliest-offset',
>   'format' = 'json'
> );
>
>
>
> SELECT
> user_id,
> item_id,
> behavior,
> next_bv 
> FROM
> ( SELECT *, lag( behavior, 
1 ) over ( PARTITION BY user_id ORDER
> BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li

flink-1.12.0 ?????? ???? lag????

2021-09-21 文章 kcz

behavior,next_bv 


??
{
  "user_id": 1,
  "item_id": 1,
  "behavior":"pv1"
}
{
  "user_id": 1,
  "item_id": 1,
  "behavior":"pv2"
}






CREATE TABLE KafkaTable (
  `user_id` BIGINT,
  `item_id` BIGINT,
  `behavior` STRING,
  proctime as PROCTIME()
) WITH (
  'connector' = 'kafka',
  'topic' = 'user_behavior',
  'properties.bootstrap.servers' = '',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);



SELECT
user_id,
item_id,
behavior,
next_bv 
FROM
( SELECT *, lag( behavior, 1 ) over ( PARTITION BY user_id ORDER BY 
proctime ) AS next_bv FROM KafkaTable ) t;