Hi,

In our pipeline, we've encountered an issue with the GroupByKey step. After
some time of running, it seems that messages are not progressing through
the GroupByKey step, causing the pipeline to stall in data processing.
To troubleshoot this issue, we added debug logging before and after the
GroupByKey step.

We are using Beam version 2.50 with Flink 1.16.

running with only 1 task manager, 2 slots. parallelism 2. no HA.

any insights or suggestions?
The messages are KV - of String and an Avro message.

PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
    .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
        pipelineUtil.getBootstrapServers(),
        options.getSourceKafkaTopic(),
        PIPELINE_NAME,
        IpSessionInfo.class,
        IpSessionInfoDeserializer.class))

 .apply("ConvertIpSessionFromKafka", ParDo.of(new
ConvertFromKafkaRecord<>()))

 .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
        .withAllowedLateness(Duration.ZERO)
        .discardingFiredPanes())

.apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
IpSessionInfo>, KV<String, IpSessionInfo>>() {
        @DoFn.ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, IpSessionInfo> element = c.element();
            log.atInfo().log("Before GroupByKey: " + element);
            c.output(element);
        }
    }))

*    .apply(GroupByKey.create())*
 .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            KV<String, Iterable<IpSessionInfo>> groupedElement =
c.element();
            log.atInfo().log("After GroupByKey: " + groupedElement);
            c.output(groupedElement);
        }
    }))
    .apply(ParDo.of(new ConvertIpSession()));


thanks
Sigalit

Reply via email to