????????????????????????????????????????????
public class ImmediatelyTrigger<T> extends Trigger<T, TimeWindow> {
private final long milliseconds;
private ValueStateDescriptor<Long> triggerStateDescriptor = new
ValueStateDescriptor<>("last-trigger-time", Long.class);
public ImmediatelyTrigger(Time time) {
milliseconds = time.toMilliseconds();
}
/**
* use state for judge if need fire
*/
@Override
public TriggerResult onElement(T element, long timestamp, TimeWindow
window, TriggerContext ctx) throws Exception {
ValueState<Long> lastTriggerTimeState =
ctx.getPartitionedState(triggerStateDescriptor);
Long ltt = lastTriggerTimeState.value();
if (ltt == null) {
ltt = window.getStart();
lastTriggerTimeState.update(ltt);
}
if (timestamp > ltt + milliseconds) {
lastTriggerTimeState.update(timestamp);
return TriggerResult.FIRE;
} else {
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
return time == window.maxTimestamp() ?
TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(triggerStateDescriptor).clear();
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception
{
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
}streamOperator
?6?9?6?9.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<EventItem>() {
?6?9?6?9@Override
?6?9?6?9public long extractAscendingTimestamp(EventItem eventItem) {
?6?9?6?9?6?9return eventItem.getWindowEnd();
?6?9?6?9}
?6?9?6?9})
?6?9?6?9.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
?6?9?6?9.keyBy(1)
?6?9?6?9// ????????????23:59:59:999??????????????????
?6?9?6?9.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
.trigger(new ImmediatelyTrigger<>(Time.seconds(5)))
.reduce((ReduceFunction<Tuple2<String, Long>>) (t0, t1) -> Tuple2.of(t0.f0,
t0.f1 + t1.f1))
?6?9?6?9.addSink(textLongSink);
------------------ Original ------------------
From: "taowang"<[email protected]>;
Date: Wed, Mar 6, 2019 03:20 PM
To: "user-zh"<[email protected]>;"??????"<[email protected]>;
Subject: Re:????????????????????????????????????????
??????.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
????????????????Trigger??????????????????????????????Trigger??????
public class WindowTrigger extends Trigger
@Override
public TriggerResult onElement(final T element, final long timestamp, final
TimeWindow window, final TriggerContext ctx) {
return TriggerResult.FIRE;
}
@Override
public TriggerResult onProcessingTime(final long time, final TimeWindow window,
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(final long time, final TimeWindow window,
final TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public void clear(final TimeWindow window, final TriggerContext ctx) {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
}
????????????????
?? ...
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
.tirgger(new WindowTrigger<>())
?? ...
*********************
?? 2019??3??6?? +0800 11:52??????@?????? <[email protected]>????????
> ??????????????????????????????????????????key????????????????????????????????????????????????????
> ????????????????
> ?6?9????????????windows????Tupel2??????reduce??????????????????????????ProcessTimeWindow????????????????windwos????evictor(CountEvictor.of(1))????????????
> ?6?9??????????
> streamOperator
> ?6?9?6?9.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<EventItem>() {
> ?6?9?6?9@Override
> ?6?9?6?9public long extractAscendingTimestamp(EventItem eventItem) {
> ?6?9?6?9?6?9return eventItem.getWindowEnd();
> ?6?9?6?9}
> ?6?9?6?9})
> ?6?9?6?9.map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L))
> ?6?9?6?9.keyBy(1)
> ?6?9?6?9// ????????????23:59:59:999??????????????????
> ?6?9?6?9.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(+16)))
> ?6?9?6?9// ??window??key????????????
> ?6?9?6?9.reduce((x1,x2)->new Tuple2<>(x2._1(),x1._1()+x2._2()))
>
> ?6?9?6?9// ??5??????ProcessTime????????????????????????????
> ?6?9?6?9.keyBy(1)
> ?6?9?6?9.window(TumblingProcessingTimeWindows.of(Time.minutes(5)))
> ?6?9?6?9.evictor(CountEvictor.of(1))
> ?6?9?6?9.reduce((ReduceFunction) (value1, value2) -> value2)
>
> ?6?9?6?9.addSink(textLongSink);
>
>
>
> ??????????????????????????????????????????????