我想通过trigger的方式来实现这个需求,想法是重写EventTimeTrigger
,在首条记录进入系统时,触发一下purge,通过ValueStatus
记录状态。但是现在遇到的问题是,会重复触发多少。而且窗口关闭时的触发,WindowFunction Function收到的记录也不对。请问是我使用方式不对吗?
自定义的trigger:
public class SessionComputeTrigger extends Trigger<SessionConvertBean,
TimeWindow> {
private static final long serialVersionUID = 1L;
static Logger logger = LoggerFactory.getLogger(SessionComputeTrigger.class);
private final ReducingStateDescriptor<Long> countDesc = new
ReducingStateDescriptor<>("count", new SessionComputeTrigger.Sum(),
LongSerializer.INSTANCE);
private final ValueStateDescriptor<Boolean> existDesc = new
ValueStateDescriptor<>("exist", BooleanSerializer.INSTANCE);
private SessionComputeTrigger() {
}
@Override
public TriggerResult onElement(SessionConvertBean element, long timestamp,
TimeWindow window, TriggerContext ctx) throws Exception {
ValueState<Boolean> exist = ctx.getPartitionedState(existDesc);
ReducingState<Long> count = ctx.getPartitionedState(countDesc);
if (exist == null || exist.value() == null || !exist.value()) {
count.add(1L);
logger.info("mau fire =====>exits:{}
====>count:{}====>date_time:{}", exist.value(), count.get(), element.dateTime);
exist.update(true);
return TriggerResult.FIRE;
}
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window,
TriggerContext ctx) {
return time == window.maxTimestamp() ? TriggerResult.FIRE :
TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window,
TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.deleteEventTimeTimer(window.maxTimestamp());
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
long windowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
ctx.registerEventTimeTimer(windowMaxTimestamp);
}
}
@Override
public String toString() {
return "EventTimeTrigger()";
}
public static SessionComputeTrigger create() {
return new SessionComputeTrigger();
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
logger.info("value1={},value2={}", value1, value2);
return value1 + value2;
}
}
}
输出值:
mau fire =====>exits:null
====>count:1====>date_time:2020-05-10T23:22:40.298+0800
(date_time是当前记录的eventtime)
result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:10.298+0800,间隔:30000.0,总数:1
mau fire =====>exits:null
====>count:1====>date_time:2020-05-10T23:22:43.298+0800
result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:13.298+0800,间隔:33000.0,总数:2
mau fire =====>exits:null
====>count:1====>date_time:2020-05-10T23:22:46.298+0800
result=>用户:48b132cb-f40b-4cf5-bf7f-b6411525b1982,开始时间:2020-05-10T23:22:40.298+0800,结束时间:2020-05-10T23:23:16.298+0800,间隔:36000.0,总数:3
mau fire =====>exits:null
====>count:1====>date_time:2020-05-10T23:22:49.298+0800
在 2020/5/11 下午4:12,“Benchao
Li”<[email protected] 代表
[email protected]> 写入:
这个暂时应该是没有办法做到这一点。或者你可以用两个query来实现这个?
比如一个query是统计first_value;第二个是真正的session window。
gang.gou <[email protected]> 于2020年5月11日周一 下午3:07写道:
> Hi,
>
> 我想使用Flink的Session Window
>
去统计一个用户的在线访问时长,和统计当前活跃的用户数量,目前只能在window结束的时候才输出一条包含窗口开始和结束时间的记录,怎么在窗口创建的时候就先输出一条记录,结束的时候再去更新这条记录呢,谢谢!
>
>
--
Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: [email protected]; [email protected]