大家好,
我遇到一个问题一直想不明白原因,想请教大家
我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。
代码大致如下
// Topn聚合
DataStream itemList = resultDataStream
.assignTimestampsAndWatermarks(
new
BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100))
{
@Override
public long extractTimestamp(PredictResult
predictResult) {
return predictResult.getDate_timestamp();
}
}
)
.keyBy("userId")
.window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
.process(new TopNService(11));
itemList.print("IRS_RESULT: ");
作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
ProcessTimeSessionWindow后,延迟降低为一秒以内。
使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~
谢谢
// top n方法
public static class TopNService extends
ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> {
private final int topSize;
public TopNService(int topSize) {
this.topSize = topSize;
}
@Override
public void process(Tuple tuple, Context context,
Iterable<PredictResult> iterable, Collector<Object> collector) throws
Exception {
List<PredictResult> allItems = new ArrayList<>();
for (PredictResult predictResult:iterable){
allItems.add(predictResult);
}
allItems.sort(new Comparator<PredictResult>() {
@Override
public int compare(PredictResult o1, PredictResult o2) {
return o2.probability.compareTo(o1.probability);
}
});
int userId = allItems.get(0).userId ;
String logonType=allItems.get(0).getLogonType();
StringBuilder result = new StringBuilder();
for (int i=0;i<topSize;i++) {
PredictResult currentItem = allItems.get(i);
result.append(currentItem.serviceId).append(",");
}
LocalDate localDate = LocalDate.now();
LocalTime localTime = LocalTime.now();
//NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 start
JSONObject resultJson = new JSONObject();
resultJson.put("user_id", userId);
resultJson.put("logon_type", logonType);
resultJson.put("date", localDate + " " + localTime);
JSONArray jsonArray = new JSONArray();
jsonArray.add(resultJson);
resultJson.put("service_id", result.toString());
//NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 end
collector.collect(jsonArray.toString());
}
}