tks??
-- --
??:
"user-zh"
https://issues.apache.org/jira/browse/FLINK-19449
kcz <573693...@qq.com.invalid> ??2021??9??22?? 11:41??
>
> behavior,next_bv
>
>
> ??
> {
> "user_id": 1,
> "item_id": 1,
> "behavior":"pv1"
> }
> {
> "user_id": 1,
> "item_id": 1,
> "behavior":"pv2"
> }
>
>
>
>
>
>
> CREATE TABLE KafkaTable (
> `user_id` BIGINT,
> `item_id` BIGINT,
> `behavior` STRING,
> proctime as PROCTIME()
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'user_behavior',
> 'properties.bootstrap.servers' = '',
> 'properties.group.id' = 'testGroup',
> 'scan.startup.mode' = 'earliest-offset',
> 'format' = 'json'
> );
>
>
>
> SELECT
> user_id,
> item_id,
> behavior,
> next_bv
> FROM
> ( SELECT *, lag( behavior,
1 ) over ( PARTITION BY user_id ORDER
> BY proctime ) AS next_bv FROM KafkaTable ) t;
--
Best,
Benchao Li