Do you add time stamp to every record you output in ConvertFromKafkaRecord step or any step before that.
On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e.siga...@gmail.com> wrote: > Hi, > > In our pipeline, we've encountered an issue with the GroupByKey step. > After some time of running, it seems that messages are not progressing > through the GroupByKey step, causing the pipeline to stall in data > processing. > To troubleshoot this issue, we added debug logging before and after the > GroupByKey step. > > We are using Beam version 2.50 with Flink 1.16. > > running with only 1 task manager, 2 slots. parallelism 2. no HA. > > any insights or suggestions? > The messages are KV - of String and an Avro message. > > PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline > .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka( > pipelineUtil.getBootstrapServers(), > options.getSourceKafkaTopic(), > PIPELINE_NAME, > IpSessionInfo.class, > IpSessionInfoDeserializer.class)) > > .apply("ConvertIpSessionFromKafka", ParDo.of(new > ConvertFromKafkaRecord<>())) > > .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows()) > .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) > .withAllowedLateness(Duration.ZERO) > .discardingFiredPanes()) > > .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String, > IpSessionInfo>, KV<String, IpSessionInfo>>() { > @DoFn.ProcessElement > public void processElement(ProcessContext c) { > KV<String, IpSessionInfo> element = c.element(); > log.atInfo().log("Before GroupByKey: " + element); > c.output(element); > } > })) > > * .apply(GroupByKey.create())* > .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String, > Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() { > @ProcessElement > public void processElement(ProcessContext c) { > KV<String, Iterable<IpSessionInfo>> groupedElement = > c.element(); > log.atInfo().log("After GroupByKey: " + groupedElement); > c.output(groupedElement); > } > })) > .apply(ParDo.of(new ConvertIpSession())); > > > thanks > Sigalit >