tks??????????????????



------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<libenc...@apache.org&gt;;
????????:&nbsp;2021??9??22??(??????) ????12:04
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

????:&nbsp;Re: flink-1.12.0 ?????? ???? lag????



Hi??????????????????bug[1]????????1.13.1????1.4????????????
????????????1.13.1????????1.4????????????????????????????????????????????????

[1] https://issues.apache.org/jira/browse/FLINK-19449

kcz <573693...@qq.com.invalid&gt; ??2021??9??22?????? ????11:41??????

&gt; ????????????????????????????????
&gt; behavior,next_bv ????????????????????????????????????????????????
&gt;
&gt;
&gt; ??????????
&gt; {
&gt; &amp;nbsp;&amp;nbsp;"user_id":&amp;nbsp;1,
&gt; &amp;nbsp;&amp;nbsp;"item_id":&amp;nbsp;1,
&gt; &amp;nbsp;&amp;nbsp;"behavior":"pv1"
&gt; }
&gt; {
&gt; &amp;nbsp;&amp;nbsp;"user_id":&amp;nbsp;1,
&gt; &amp;nbsp;&amp;nbsp;"item_id":&amp;nbsp;1,
&gt; &amp;nbsp;&amp;nbsp;"behavior":"pv2"
&gt; }
&gt;
&gt;
&gt;
&gt;
&gt;
&gt;
&gt; CREATE TABLE KafkaTable (
&gt; &amp;nbsp; `user_id` BIGINT,
&gt; &amp;nbsp; `item_id` BIGINT,
&gt; &amp;nbsp; `behavior` STRING,
&gt; &amp;nbsp; proctime as PROCTIME()
&gt; ) WITH (
&gt; &amp;nbsp; 'connector' = 'kafka',
&gt; &amp;nbsp; 'topic' = 'user_behavior',
&gt; &amp;nbsp; 'properties.bootstrap.servers' = '',
&gt; &amp;nbsp; 'properties.group.id' = 'testGroup',
&gt; &amp;nbsp; 'scan.startup.mode' = 'earliest-offset',
&gt; &amp;nbsp; 'format' = 'json'
&gt; );
&gt;
&gt;
&gt;
&gt; SELECT
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; user_id,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; item_id,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; behavior,
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; next_bv&amp;nbsp;
&gt; FROM
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; ( SELECT *, lag( behavior, 
1 ) over ( PARTITION BY user_id ORDER
&gt; BY proctime ) AS next_bv FROM KafkaTable ) t;



-- 

Best,
Benchao Li

回复