taochang????????????????????????????????????????????????????????????????????????
[17:00,18:00)????????watermark??5???????? (event time - 5) ???? >=
18:00????????????????????????????????????????????????????
------------------ ???????? ------------------
??????:
"user-zh"
<[email protected]>;
????????: 2020??9??3??(??????) ????10:35
??????: "user-zh"<[email protected]>;"[email protected]"<[email protected]>;
????: Re: ??????????????????????????????????????
????????????????????????????????????????????org.apache.flink.streaming.api.windowing.windows.TimeWindow????
getWindowStartWithOffset??????????????????17-18????????????????????2020-09-01
18:00:00.0
????????????????????????????????2020-09-01 18:00:00.0
??????????????2020-09-01 18:00:00.001??????
??????????wartermarker5??????????2020-09-01 18:00:05.001 ??????
?? 2020/9/2 15:20, [email protected] ????:
> ????????????flink SQL,????????????tumble
window????????????????????????????????????????????????????????????????????????
>
//????eventtime??????????watermark
> DataStream<Tuple4<String,String,String,Long>>
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
> WatermarkStrategy
>
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
> //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
> .withIdleness(Duration.ofSeconds(10))
//????????????????????watermark
> .withTimestampAssigner((event, timestamp)->event.f3));
>
> StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
> tenv.registerDataStream(
> "log",
> withTimestampsAndWatermarksDS,
> "appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
>
> String sql = "select appid,eventid,cnt," +
>
"(starttime + interval '8' hour ) as stime," +
>
"(endtime + interval '8' hour ) as etime " +
>
"from (select appid,eventid,count(*) as cnt," +
>
"TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," +
>
"TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " +
>
"from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME
'00:00:00')"; //????????????????????????????
>
> Table table = tenv.sqlQuery(sql);
> DataStream<Result> dataStream = tenv.toAppendStream(table,
Result.class);
>
> ??????????????
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
> (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
????????2020-09-01 18:00:00.0??????????????????????????????????
> (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39
????????????????????????
> ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01
17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2
15:23:35}
> ????????????????????????????????