yes, the output of ConvertFromKafkaRecord is with timestamp

KafkaRecord<String, T> record = c.element();
KV<String, T> entry = Objects.requireNonNull(record).getKV();
String key = convertKey(entry.getKey());
T value = convertValue(entry.getValue());
c.outputWithTimestamp(KV.of(key, value), Instant.now()


thanks

Sigalit


On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Do you add time stamp to every record you output in
> ConvertFromKafkaRecord step or any step before that.
>
> On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov <e.siga...@gmail.com>
> wrote:
>
>> Hi,
>>
>> In our pipeline, we've encountered an issue with the GroupByKey step.
>> After some time of running, it seems that messages are not progressing
>> through the GroupByKey step, causing the pipeline to stall in data
>> processing.
>> To troubleshoot this issue, we added debug logging before and after the
>> GroupByKey step.
>>
>> We are using Beam version 2.50 with Flink 1.16.
>>
>> running with only 1 task manager, 2 slots. parallelism 2. no HA.
>>
>> any insights or suggestions?
>> The messages are KV - of String and an Avro message.
>>
>> PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput = pipeline
>>     .apply("readIpSessionInfo", KafkaTransform.readAvroMessageFromKafka(
>>         pipelineUtil.getBootstrapServers(),
>>         options.getSourceKafkaTopic(),
>>         PIPELINE_NAME,
>>         IpSessionInfo.class,
>>         IpSessionInfoDeserializer.class))
>>
>>  .apply("ConvertIpSessionFromKafka", ParDo.of(new
>> ConvertFromKafkaRecord<>()))
>>
>>  .apply(Window.<KV<String, IpSessionInfo>>into(new GlobalWindows())
>>         .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
>>         .withAllowedLateness(Duration.ZERO)
>>         .discardingFiredPanes())
>>
>> .apply("DebugLogBeforeGroupByKey", ParDo.of(new DoFn<KV<String,
>> IpSessionInfo>, KV<String, IpSessionInfo>>() {
>>         @DoFn.ProcessElement
>>         public void processElement(ProcessContext c) {
>>             KV<String, IpSessionInfo> element = c.element();
>>             log.atInfo().log("Before GroupByKey: " + element);
>>             c.output(element);
>>         }
>>     }))
>>
>> *    .apply(GroupByKey.create())*
>>  .apply("DebugLogAfterGroupByKey", ParDo.of(new DoFn<KV<String,
>> Iterable<IpSessionInfo>>, KV<String, Iterable<IpSessionInfo>>>() {
>>         @ProcessElement
>>         public void processElement(ProcessContext c) {
>>             KV<String, Iterable<IpSessionInfo>> groupedElement =
>> c.element();
>>             log.atInfo().log("After GroupByKey: " + groupedElement);
>>             c.output(groupedElement);
>>         }
>>     }))
>>     .apply(ParDo.of(new ConvertIpSession()));
>>
>>
>> thanks
>> Sigalit
>>
>

Reply via email to