Do you add time stamp to every record you output in
ConvertFromKafkaRecord step or any step before that.

On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e.siga...@gmail.com> wrote:

> Hi,
>
> In our pipeline, we've encountered an issue with the GroupByKey step.
> After some time of running, it seems that messages are not progressing
> through the GroupByKey step, causing the pipeline to stall in data
> processing.
> To troubleshoot this issue, we added debug logging before and after the
> GroupByKey step.
>
> We are using Beam version 2.50 with Flink 1.16.
>
> running with only 1 task manager, 2 slots. parallelism 2. no HA.
>
> any insights or suggestions?
> The messages are KV - of String and an Avro message.
>
> PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
>     .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
>         pipelineUtil.getBootstrapServers(),
>         options.getSourceKafkaTopic(),
>         PIPELINE_NAME,
>         IpSessionInfo.class,
>         IpSessionInfoDeserializer.class))
>
>  .apply("ConvertIpSessionFromKafka", ParDo.of(new
> ConvertFromKafkaRecord<>()))
>
>  .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
>         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>         .withAllowedLateness(Duration.ZERO)
>         .discardingFiredPanes())
>
> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
> IpSessionInfo>, KV<String, IpSessionInfo>>() {
>         @DoFn.ProcessElement
>         public void processElement(ProcessContext c) {
>             KV<String, IpSessionInfo> element = c.element();
>             log.atInfo().log("Before GroupByKey: " + element);
>             c.output(element);
>         }
>     }))
>
> *    .apply(GroupByKey.create())*
>  .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
> Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
>         @ProcessElement
>         public void processElement(ProcessContext c) {
>             KV<String, Iterable<IpSessionInfo>> groupedElement =
> c.element();
>             log.atInfo().log("After GroupByKey: " + groupedElement);
>             c.output(groupedElement);
>         }
>     }))
>     .apply(ParDo.of(new ConvertIpSession()));
>
>
> thanks
> Sigalit
>

Reply via email to