Hi 如果 processing time window 没有问题,但是 event time window 有问题的话,那需要考虑 event-time 下的 watermark 生成逻辑是否符合预期,如果 watermark 没有超过 window 结束时间,则一直不会被触发。
Best, Congxian 李佳宸 <[email protected]> 于2020年5月22日周五 下午3:27写道: > 大家好, > > 我遇到一个问题一直想不明白原因,想请教大家 > > 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。 > > 代码大致如下 > // Topn聚合 > DataStream itemList = resultDataStream > .assignTimestampsAndWatermarks( > new > > BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100)) > { > @Override > public long extractTimestamp(PredictResult > predictResult) { > return predictResult.getDate_timestamp(); > } > } > ) > .keyBy("userId") > > .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new TopNService(11)); > itemList.print("IRS_RESULT: "); > > > 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用 > ProcessTimeSessionWindow后,延迟降低为一秒以内。 > 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。 > 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~ > > > 谢谢 > > // top n方法 > > public static class TopNService extends > ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> { > > private final int topSize; > > public TopNService(int topSize) { > this.topSize = topSize; > } > @Override > public void process(Tuple tuple, Context context, > Iterable<PredictResult> iterable, Collector<Object> collector) throws > Exception { > List<PredictResult> allItems = new ArrayList<>(); > for (PredictResult predictResult:iterable){ > allItems.add(predictResult); > } > allItems.sort(new Comparator<PredictResult>() { > @Override > public int compare(PredictResult o1, PredictResult o2) { > return o2.probability.compareTo(o1.probability); > } > }); > int userId = allItems.get(0).userId ; > String logonType=allItems.get(0).getLogonType(); > StringBuilder result = new StringBuilder(); > for (int i=0;i<topSize;i++) { > PredictResult currentItem = allItems.get(i); > result.append(currentItem.serviceId).append(","); > } > LocalDate localDate = LocalDate.now(); > LocalTime localTime = LocalTime.now(); > //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 start > JSONObject resultJson = new JSONObject(); > resultJson.put("user_id", userId); > resultJson.put("logon_type", logonType); > resultJson.put("date", localDate + " " + localTime); > JSONArray jsonArray = new JSONArray(); > jsonArray.add(resultJson); > resultJson.put("service_id", result.toString()); > //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 end > collector.collect(jsonArray.toString()); > } > } >
