Re:处理时间的滚动窗口提前触发

2024-04-23 文章 Xuyang
Hi, 我看你使用了System.currentTimeMillis(),有可能是分布式的情况下,多台TM上的机器时间不一致导致的吗?




--

Best!
Xuyang





在 2024-04-20 19:04:14,"hhq" <424028...@qq.com.INVALID> 写道:
>我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助
>
>public static void main(String[] args) throws Exception {
>
>StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>DataStreamSource integerDataStreamSource = env.addSource(new 
> IntegerSource());
>
>integerDataStreamSource
>.keyBy(Integer::intValue)
>.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
>.process(new IntegerProcessFunction())
>.setParallelism(1);
>
>env.execute();
>}
>
>
>public class IntegerProcessFunction extends ProcessWindowFunctionObject, Integer, TimeWindow> {
>private Logger log;
>@Override
>public void open(Configuration parameters) throws Exception {
>super.open(parameters);
>this.log = Logger.getLogger(IntegerProcessFunction.class);
>}
>
>@Override
>public void process(Integer integer, ProcessWindowFunction Object, Integer, TimeWindow>.Context context, Iterable elements, 
> Collector out) throws Exception {
>long currentTimeMillis = System.currentTimeMillis();
>long end = context.window().getEnd();
>
>if (currentTimeMillis < end) {
>log .info ("bad");
>} else {
>log .info ("good");
>}
>}
>}
>


处理时间的滚动窗口提前触发

2024-04-20 文章 hhq
我使用了一个基于处理时间的滚动窗口,窗口大小设置为60s,但是我在窗口的处理函数中比较窗口的结束时间和系统时间,偶尔会发现获取到的系统时间早于窗口结束时间(这里的提前量不大,只有几毫秒,但是我不清楚,这是flink窗口本身的原因还是我代码的问题)我没有找到原因,请求帮助

public static void main(String[] args) throws Exception {

StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource integerDataStreamSource = env.addSource(new 
IntegerSource());

integerDataStreamSource
.keyBy(Integer::intValue)
.window(TumblingProcessingTimeWindows.of(Time.seconds(60)))
.process(new IntegerProcessFunction())
.setParallelism(1);

env.execute();
}


public class IntegerProcessFunction extends ProcessWindowFunction {
private Logger log;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
this.log = Logger.getLogger(IntegerProcessFunction.class);
}

@Override
public void process(Integer integer, ProcessWindowFunction.Context context, Iterable elements, 
Collector out) throws Exception {
long currentTimeMillis = System.currentTimeMillis();
long end = context.window().getEnd();

if (currentTimeMillis < end) {
log .info ("bad");
} else {
log .info ("good");
}
}
}