大家好, 我遇到一个问题一直想不明白原因,想请教大家
我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。 代码大致如下 // Topn聚合 DataStream itemList = resultDataStream .assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100)) { @Override public long extractTimestamp(PredictResult predictResult) { return predictResult.getDate_timestamp(); } } ) .keyBy("userId") .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) .process(new TopNService(11)); itemList.print("IRS_RESULT: "); 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用 ProcessTimeSessionWindow后,延迟降低为一秒以内。 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~ 谢谢 // top n方法 public static class TopNService extends ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> { private final int topSize; public TopNService(int topSize) { this.topSize = topSize; } @Override public void process(Tuple tuple, Context context, Iterable<PredictResult> iterable, Collector<Object> collector) throws Exception { List<PredictResult> allItems = new ArrayList<>(); for (PredictResult predictResult:iterable){ allItems.add(predictResult); } allItems.sort(new Comparator<PredictResult>() { @Override public int compare(PredictResult o1, PredictResult o2) { return o2.probability.compareTo(o1.probability); } }); int userId = allItems.get(0).userId ; String logonType=allItems.get(0).getLogonType(); StringBuilder result = new StringBuilder(); for (int i=0;i<topSize;i++) { PredictResult currentItem = allItems.get(i); result.append(currentItem.serviceId).append(","); } LocalDate localDate = LocalDate.now(); LocalTime localTime = LocalTime.now(); //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 start JSONObject resultJson = new JSONObject(); resultJson.put("user_id", userId); resultJson.put("logon_type", logonType); resultJson.put("date", localDate + " " + localTime); JSONArray jsonArray = new JSONArray(); jsonArray.add(resultJson); resultJson.put("service_id", result.toString()); //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 end collector.collect(jsonArray.toString()); } }