Hi, 问题应该是 kafka source 配置了多并发运行,但数据量比较少(或者 topic 的 partition 数量小于 task 的并发数量),不是所有的 source task 都消费到了数据并产生 watermark,导致下游聚合算子无法对齐 watermark 触发计算。 可以尝试通过以下办法解决: 1. 将 source 并发控制为 1 2. 为 watermark 策略开始 idleness 处理,参考 [#1]
fromElement 数据源会强制指定并发为 1 [#1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/#dealing-with-idle-sources Best, Weihua On Tue, Feb 7, 2023 at 1:31 PM wei_yuze <wei_y...@qq.com.invalid> wrote: > 您好! > > > > > 我在进行基于事件时间的窗口聚合操作时,使用fromElement数据源可以实现,但替换为Kafka数据源就不行了,但程序并不报错。以下贴出代码。代码中给了两个数据源,分别命名为:streamSource > 和 kafkaSource > 。当使用streamSource生成watermarkedStream的时候,可以完成聚合计算并输出结果。但使用kafkaSource却不行。 > > > > > public class WindowReduceTest2 { public static void > main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > // 使用fromElement数据源 > DataStreamSource<Event2> streamSource = > env.fromElements( > new > Event2("Alice", "./home", "2023-02-04 17:10:11"), > new Event2("Bob", > "./cart", "2023-02-04 17:10:12"), > new > Event2("Alice", "./home", "2023-02-04 17:10:13"), > new > Event2("Alice", "./home", "2023-02-04 17:10:15"), > new Event2("Cary", > "./home", "2023-02-04 17:10:16"), > new Event2("Cary", > "./home", "2023-02-04 17:10:16") > ); > > > // 使用Kafka数据源 > JsonDeserializationSchema<Event2> > jsonFormat = new JsonDeserializationSchema<>(Event2.class); > KafkaSource<Event2> source = > KafkaSource.<Event2>builder() > > .setBootstrapServers(Config.KAFKA_BROKERS) > > .setTopics(Config.KAFKA_TOPIC) > > .setGroupId("my-group") > > .setStartingOffsets(OffsetsInitializer.earliest()) > > .setValueOnlyDeserializer(jsonFormat) > .build(); > DataStreamSource<Event2> kafkaSource = > env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source"); > kafkaSource.print(); > > > // 生成watermark,从数据中提取时间作为事件时间 > SingleOutputStreamOperator<Event2> > watermarkedStream = > kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event2>forBoundedOutOfOrderness(Duration.ZERO) > > .withTimestampAssigner(new SerializableTimestampAssigner<Event2>() { > > @Override > > public long extractTimestamp(Event2 element, long recordTimestamp) { > > SimpleDateFormat simpleDateFormat = new > SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); > > Date date = null; > > try { > > date = > simpleDateFormat.parse(element.getTime()); > > } catch (ParseException e) { > > throw new RuntimeException(e); > > } > > long time = date.getTime(); > > System.out.println(time); > > return time; > } > })); > > > // 窗口聚合 > watermarkedStream.map(new MapFunction<Event2, > Tuple2<String, Long>>() { > > @Override > > public Tuple2<String, Long> map(Event2 value) throws Exception { > > // 将数据转换成二元组,方便计算 > > return Tuple2.of(value.getUser(), 1L); > } > }) > .keyBy(r -> > r.f0) > // 设置滚动事件时间窗口 > > .window(TumblingEventTimeWindows.of(Time.seconds(5))) > .reduce(new > ReduceFunction<Tuple2<String, Long>>() { > > @Override > > public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, > Tuple2<String, Long> value2) throws Exception { > > // 定义累加规则,窗口闭合时,向下游发送累加结果 > > return Tuple2.of(value1.f0, value1.f1 + value2.f1); > } > }) > .print("Aggregated > stream"); > > > env.execute(); > } > } > > > > > > > 值得注意的是,若将代码中的 TumblingEventTimeWindows 替换为 TumblingProcessingTimeWindows > ,即使使用 Kafka 数据源也是可以完成聚合计算并输出结果的。 > > > > 感谢您花时间查看这个问题! > Lucas