以下是我写了的一段代码,在 processElement 中每次进来一条日志就注册一次 onTimer事件
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
如果onTimer触发则说明窗口内所有日志都被处理然后我要对窗口内的所有数据进行ctr计算但是onTimer不触发,我使用
KeyedProcessFunction 就可以触发,这是怎么回事呢?
public class TopCTRProcessFunction extends KeyedBroadcastProcessFunction<Long,
ItemViewCount, Tuple2<String, TopCTRInfo>, String> {
private ListState<ItemViewCount> itemState;
private MapStateDescriptor<String, TopCTRInfo> ruleMapStateDescr =
new MapStateDescriptor<>("ruleMapState", String.class,
TopCTRInfo.class);
@Override
public void open(Configuration parameters) throws Exception {
ListStateDescriptor<ItemViewCount> itemsStateDesc = new
ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void close() throws Exception {
super.close();
if (itemState != null) {
itemState.clear();
itemState = null;
}
}
@Override
public void processElement(ItemViewCount value, ReadOnlyContext ctx,
Collector<String> out) throws Exception {
//计算ctr
value.setCtr(value.getClkCount() == 0 ? 0 : ((double)
value.getClkCount() / value.getImpCount() * 100));
itemState.add(value);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1L);
}
@Override
public void processBroadcastElement(Tuple2<String, TopCTRInfo> value,
Context ctx, Collector<String> out) throws Exception {
ctx.getBroadcastState(ruleMapStateDescr).put(value.f0, value.f1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String>
out) throws Exception {
ReadOnlyBroadcastState<String, TopCTRInfo> topCTRMap =
ctx.getBroadcastState(ruleMapStateDescr);
TopCTRInfo topCTRInfo =
topCTRMap.get(Constant.REDIS_BROADCAST_TOPCTR_KEY);
if (topCTRInfo == null) {
topCTRInfo = new TopCTRInfo();
}
List<ItemViewCount> allItems = new ArrayList<>();
List<ItemViewCount> jsonItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
if (item.getCtr() >= topCTRInfo.getCtrAlertThreshold()) {
allItems.add(item);
}
}
itemState.clear();
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) ((o2.getCtr() - o1.getCtr()) * 100);
}
});
int totalSize = topCTRInfo.getTopSize() > allItems.size() ?
allItems.size() : topCTRInfo.getTopSize();
for (int i = 0; i < totalSize; i++) {
jsonItems.add(allItems.get(i));
}
out.collect(JSON.toJSONString(jsonItems));
}
}
以下是可以触发onTimer的代码 使用 KeyedProcessFunction
.process(new KeyedProcessFunction<Long, ItemViewCount, String>() {
private final int topSize = 10;
private ListState<ItemViewCount> itemState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ListStateDescriptor<ItemViewCount> itemsStateDesc = new
ListStateDescriptor<>(
"itemState-state",
ItemViewCount.class);
itemState = getRuntimeContext().getListState(itemsStateDesc);
}
@Override
public void processElement(ItemViewCount value, Context ctx,
Collector<String> out) throws Exception {
//计算ctr
value.setCtr(value.getClkCount() == 0 ? 0 : ((double)
value.getClkCount() / value.getImpCount() * 100));
itemState.add(value);
// 注册 windowEnd+1 的 EventTime Timer, 当触发时,说明收齐了属于windowEnd窗口的所有商品数据
ctx.timerService().registerEventTimeTimer(value.getWindowEnd() + 1);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String>
out) throws Exception {
super.onTimer(timestamp, ctx, out);
List<ItemViewCount> allItems = new ArrayList<>();
List<ItemViewCount> jsonItems = new ArrayList<>();
for (ItemViewCount item : itemState.get()) {
allItems.add(item);
}
itemState.clear();
allItems.sort(new Comparator<ItemViewCount>() {
@Override
public int compare(ItemViewCount o1, ItemViewCount o2) {
return (int) ((o2.getCtr() - o1.getCtr()) * 100);
}
});
int totalSize = topSize > allItems.size() ? allItems.size() : topSize;
for (int i = 0; i < totalSize; i++) {
jsonItems.add(allItems.get(i));
}
out.collect(JSON.toJSONString(jsonItems));
}
})
[email protected]