taochang???????????????????????????????????????????????????????????????????????? [17:00,18:00)????????watermark??5???????? (event time - 5) ???? >= 18:00????????????????????????????????????????????????????
------------------ ???????? ------------------ ??????: "user-zh" <taochangl...@163.com>; ????????: 2020??9??3??(??????) ????10:35 ??????: "user-zh"<user-zh@flink.apache.org>;"samuel....@ubtrobot.com"<samuel....@ubtrobot.com>; ????: Re: ?????????????????????????????????????? ????????????????????????????????????????????org.apache.flink.streaming.api.windowing.windows.TimeWindow???? getWindowStartWithOffset??????????????????17-18????????????????????2020-09-01 18:00:00.0 ????????????????????????????????2020-09-01 18:00:00.0 ??????????????2020-09-01 18:00:00.001?????? ??????????wartermarker5??????????2020-09-01 18:00:05.001 ?????? ?? 2020/9/2 15:20, samuel....@ubtrobot.com ????: > ????????????flink SQL,????????????tumble window???????????????????????????????????????????????????????????????????????? > //????eventtime??????????watermark > DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) //????????????????????watermark > .withTimestampAssigner((event, timestamp)->event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > "(starttime + interval '8' hour ) as stime," + > "(endtime + interval '8' hour ) as etime " + > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //???????????????????????????? > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); > > ?????????????? > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 ????????2020-09-01 18:00:00.0?????????????????????????????????? > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 ???????????????????????? > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} > ????????????????????????????????