我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    DataStreamSource<Integer> integerDataStreamSource = env.addSource(new 
IntegerSource());

    integerDataStreamSource
            .keyBy(Integer::intValue)
                    .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
                            .process(new IntegerProcessFunction())
                                    .setParallelism(1);

    env.execute();
}


public class IntegerProcessFunction extends ProcessWindowFunction<Integer, 
Object, Integer, TimeWindow> {
    private Logger log;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.log = Logger.getLogger(IntegerProcessFunction.class);
    }

    @Override
    public void process(Integer integer, ProcessWindowFunction<Integer, Object, 
Integer, TimeWindow>.Context context, Iterable<Integer> elements, 
Collector<Object> out) throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        long end = context.window().getEnd();

        if (currentTimeMillis < end) {
            log <http://log.info/>.info <http://log.info/>("bad");
        } else {
            log <http://log.info/>.info <http://log.info/>("good");
        }
    }
}

回复