taochang????????????????????????????????????????????????????????????????????????
 [17:00,18:00)????????watermark??5???????? (event time - 5) ???? >= 
18:00????????????????????????????????????????????????????


------------------ ???????? ------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<taochangl...@163.com&gt;;
????????:&nbsp;2020??9??3??(??????) ????10:35
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;"samuel....@ubtrobot.com"<samuel....@ubtrobot.com&gt;;

????:&nbsp;Re: ??????????????????????????????????????



????????????????????????????????????????????org.apache.flink.streaming.api.windowing.windows.TimeWindow????
 


getWindowStartWithOffset??????????????????17-18????????????????????2020-09-01 
18:00:00.0
????????????????????????????????2020-09-01 18:00:00.0
??????????????2020-09-01 18:00:00.001??????

??????????wartermarker5??????????2020-09-01 18:00:05.001 ??????

?? 2020/9/2 15:20, samuel....@ubtrobot.com ????:
&gt; ????????????flink SQL,????????????tumble 
window????????????????????????????????????????????????????????????????????????
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 //????eventtime??????????watermark
&gt; DataStream<Tuple4<String,String,String,Long&gt;&gt; 
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
&gt; WatermarkStrategy
&gt; 
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
&gt; //.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
&gt; .withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; 
//????????????????????watermark
&gt; .withTimestampAssigner((event, timestamp)-&gt;event.f3));
&gt;
&gt; StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
&gt; tenv.registerDataStream(
&gt; "log",
&gt; withTimestampsAndWatermarksDS,
&gt; "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
&gt;
&gt; String sql = "select appid,eventid,cnt," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "(starttime + interval '8' hour ) as stime," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "(endtime + interval '8' hour ) as etime&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "from (select appid,eventid,count(*) as cnt," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&gt;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 "from log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME 
'00:00:00')";&nbsp;&nbsp;&nbsp; //????????????????????????????
&gt;
&gt; Table table = tenv.sqlQuery(sql);
&gt; DataStream<Result&gt; dataStream = tenv.toAppendStream(table, 
Result.class);
&gt;
&gt; ??????????????
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
&gt; (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
????????2020-09-01 18:00:00.0??????????????????????????????????
&gt; (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 
????????????????????????
&gt; ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 
17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 
15:23:35}
&gt; ????????????????????????????????

回复