????????????keyed ??????????????????????????trigger????????????????????????????????????????????????????


????????????????????????key??????????????????????????????????????????24????????????????????????????????????onElement()????????????????????????????????????????????????????FIRE_AND_PURGE????????????????????????????????????????????????valuestate??????????????????????????onEventTime()????????????????????????????2??????????????????????????



1??????keyby??????????????????????????????????watermark??????????????????????????onEventTime()????????????????????key????????????????????key?????????????????????????????????????????????????????????????????????????????? 
 
????????????????????????key??????????????????????????????????????????????????valuestate??????????????????????????????????????????????????????????????????????????????????????????????????????????????????????


2????????????????onElement()????????onEventTime()????????????????TriggerResult??????????????????????????????????????????????????????????????????????????




??????
??????????????
class MyTrigger extends Trigger<RateInfo, TimeWindow&gt;&nbsp;
????????????
class MyProcessFunction extends ProcessWindowFunction<RateInfo, String, String, 
TimeWindow&gt;&nbsp;


????????
&nbsp; &nbsp; &nbsp; WindowedStream<RateInfo, String, TimeWindow&gt; trigger = 
timeStream
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .keyBy(new 
KeySelector<RateInfo, String&gt;() {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public 
String getKey(RateInfo value) throws Exception {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
&nbsp; return value.getId();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; })
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(16)))
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .trigger(new 
MyTrigger());


&nbsp; &nbsp; &nbsp; &nbsp; SingleOutputStreamOperator<String&gt; resultStream 
= trigger.process(new MyProcessFunction());







????????????


-----------------
??????
????&nbsp;????
TEL:13737047391
2021??6??11??

回复