yes, the output of ConvertFromKafkaRecord is with timestamp KafkaRecord<String, T> record = c.element(); KV<String, T> entry = Objects.requireNonNull(record).getKV(); String key = convertKey(entry.getKey()); T value = convertValue(entry.getValue()); c.outputWithTimestamp(KV.of(key, value), Instant.now()
thanks Sigalit On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Do you add time stamp to every record you output in > ConvertFromKafkaRecord step or any step before that. > > On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e.siga...@gmail.com> > wrote: > >> Hi, >> >> In our pipeline, we've encountered an issue with the GroupByKey step. >> After some time of running, it seems that messages are not progressing >> through the GroupByKey step, causing the pipeline to stall in data >> processing. >> To troubleshoot this issue, we added debug logging before and after the >> GroupByKey step. >> >> We are using Beam version 2.50 with Flink 1.16. >> >> running with only 1 task manager, 2 slots. parallelism 2. no HA. >> >> any insights or suggestions? >> The messages are KV - of String and an Avro message. >> >> PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline >> .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka( >> pipelineUtil.getBootstrapServers(), >> options.getSourceKafkaTopic(), >> PIPELINE_NAME, >> IpSessionInfo.class, >> IpSessionInfoDeserializer.class)) >> >> .apply("ConvertIpSessionFromKafka", ParDo.of(new >> ConvertFromKafkaRecord<>())) >> >> .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows()) >> .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1))) >> .withAllowedLateness(Duration.ZERO) >> .discardingFiredPanes()) >> >> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String, >> IpSessionInfo>, KV<String, IpSessionInfo>>() { >> @DoFn.ProcessElement >> public void processElement(ProcessContext c) { >> KV<String, IpSessionInfo> element = c.element(); >> log.atInfo().log("Before GroupByKey: " + element); >> c.output(element); >> } >> })) >> >> * .apply(GroupByKey.create())* >> .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String, >> Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() { >> @ProcessElement >> public void processElement(ProcessContext c) { >> KV<String, Iterable<IpSessionInfo>> groupedElement = >> c.element(); >> log.atInfo().log("After GroupByKey: " + groupedElement); >> c.output(groupedElement); >> } >> })) >> .apply(ParDo.of(new ConvertIpSession())); >> >> >> thanks >> Sigalit >> >