Hi

如果 processing time window 没有问题,但是 event time window 有问题的话,那需要考虑 event-time
下的 watermark 生成逻辑是否符合预期,如果 watermark 没有超过 window 结束时间,则一直不会被触发。

Best,
Congxian


李佳宸 <[email protected]> 于2020年5月22日周五 下午3:27写道:

> 大家好,
>
> 我遇到一个问题一直想不明白原因,想请教大家
>
> 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。
>
> 代码大致如下
> // Topn聚合
> DataStream itemList = resultDataStream
>                 .assignTimestampsAndWatermarks(
>                         new
>
> BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100))
> {
>                             @Override
>                              public long extractTimestamp(PredictResult
> predictResult) {
>                              return predictResult.getDate_timestamp();
>                            }
>                         }
>                 )
>                 .keyBy("userId")
>
> .window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
>                 .process(new TopNService(11));
>         itemList.print("IRS_RESULT: ");
>
>
> 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
> ProcessTimeSessionWindow后,延迟降低为一秒以内。
> 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
> 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~
>
>
> 谢谢
>
> // top n方法
>
>     public static class TopNService extends
> ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> {
>
>         private final int topSize;
>
>         public TopNService(int topSize) {
>             this.topSize = topSize;
>         }
>         @Override
>         public void process(Tuple tuple, Context context,
> Iterable<PredictResult> iterable, Collector<Object> collector) throws
> Exception {
>             List<PredictResult> allItems = new ArrayList<>();
>             for (PredictResult predictResult:iterable){
>                 allItems.add(predictResult);
>             }
>             allItems.sort(new Comparator<PredictResult>() {
>                 @Override
>                 public int compare(PredictResult o1, PredictResult o2) {
>                     return o2.probability.compareTo(o1.probability);
>                 }
>             });
>             int userId = allItems.get(0).userId ;
>             String logonType=allItems.get(0).getLogonType();
>             StringBuilder result = new StringBuilder();
>             for (int i=0;i<topSize;i++) {
>                 PredictResult currentItem = allItems.get(i);
>                 result.append(currentItem.serviceId).append(",");
>             }
>             LocalDate localDate = LocalDate.now();
>             LocalTime localTime = LocalTime.now();
>             //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 start
>             JSONObject resultJson = new JSONObject();
>             resultJson.put("user_id", userId);
>             resultJson.put("logon_type", logonType);
>             resultJson.put("date", localDate + " " + localTime);
>             JSONArray jsonArray = new JSONArray();
>             jsonArray.add(resultJson);
>             resultJson.put("service_id", result.toString());
>             //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 end
>             collector.collect(jsonArray.toString());
>         }
>     }
>

回复