大家好,

我遇到一个问题一直想不明白原因,想请教大家

我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。

代码大致如下
// Topn聚合
DataStream itemList = resultDataStream
                .assignTimestampsAndWatermarks(
                        new
BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100))
{
                            @Override
                             public long extractTimestamp(PredictResult
predictResult) {
                             return predictResult.getDate_timestamp();
                           }
                        }
                )
                .keyBy("userId")

.window(EventTimeSessionWindows.withGap(Time.milliseconds(100)))
                .process(new TopNService(11));
        itemList.print("IRS_RESULT: ");


作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用
ProcessTimeSessionWindow后,延迟降低为一秒以内。
使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。
我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~


谢谢

// top n方法

    public static class TopNService extends
ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> {

        private final int topSize;

        public TopNService(int topSize) {
            this.topSize = topSize;
        }
        @Override
        public void process(Tuple tuple, Context context,
Iterable<PredictResult> iterable, Collector<Object> collector) throws
Exception {
            List<PredictResult> allItems = new ArrayList<>();
            for (PredictResult predictResult:iterable){
                allItems.add(predictResult);
            }
            allItems.sort(new Comparator<PredictResult>() {
                @Override
                public int compare(PredictResult o1, PredictResult o2) {
                    return o2.probability.compareTo(o1.probability);
                }
            });
            int userId = allItems.get(0).userId ;
            String logonType=allItems.get(0).getLogonType();
            StringBuilder result = new StringBuilder();
            for (int i=0;i<topSize;i++) {
                PredictResult currentItem = allItems.get(i);
                result.append(currentItem.serviceId).append(",");
            }
            LocalDate localDate = LocalDate.now();
            LocalTime localTime = LocalTime.now();
            //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 start
            JSONObject resultJson = new JSONObject();
            resultJson.put("user_id", userId);
            resultJson.put("logon_type", logonType);
            resultJson.put("date", localDate + " " + localTime);
            JSONArray jsonArray = new JSONArray();
            jsonArray.add(resultJson);
            resultJson.put("service_id", result.toString());
            //NXZW_ZNTJ_TOPIC_IRS_RESULT  的数据格式 end
            collector.collect(jsonArray.toString());
        }
    }

回复