Hi, In our pipeline, we've encountered an issue with the GroupByKey step. After some time of running, it seems that messages are not progressing through the GroupByKey step, causing the pipeline to stall in data processing. To troubleshoot this issue, we added debug logging before and after the GroupByKey step.
We are using Beam version 2.50 with Flink 1.16. running with only 1 task manager, 2 slots. parallelism 2. no HA. any insights or suggestions? The messages are KV - of String and an Avro message. PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka( pipelineUtil.getBootstrapServers(), options.getSourceKafkaTopic(), PIPELINE_NAME, IpSessionInfo.class, IpSessionInfoDeserializer.class)) .apply("ConvertIpSessionFromKafka", ParDo.of(new ConvertFromKafkaRecord<>())) .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) .withAllowedLateness(Duration.ZERO) .discardingFiredPanes()) .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String, IpSessionInfo>, KV<String, IpSessionInfo>>() { @DoFn.ProcessElement public void processElement(ProcessContext c) { KV<String, IpSessionInfo> element = c.element(); log.atInfo().log("Before GroupByKey: " + element); c.output(element); } })) * .apply(GroupByKey.create())* .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String, Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() { @ProcessElement public void processElement(ProcessContext c) { KV<String, Iterable<IpSessionInfo>> groupedElement = c.element(); log.atInfo().log("After GroupByKey: " + groupedElement); c.output(groupedElement); } })) .apply(ParDo.of(new ConvertIpSession())); thanks Sigalit