Hi Sigalit,

you should set TimestampPolicyFactory [1] to the source, because otherwise resetting the timestamp in a plain ParDo (ConvertFromKafkaRecord) can cause the element's timestamp to shift back in time before watermark and subsequently cause the data to get dropped by the GroupByKey transform. If you don't set any watermark policy explicitly, the default is processing time, which is likely causing the effect you observe. Alternative option is to use state and timers in the ConvertFromKafkaRecord transform to make sure that the transform correctly holds the output watermark using Timer.withOutputTimestamp [2]. I'd go with option 1) though.

Best,

 Jan

[1] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory- [2] https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-

On 11/17/23 08:22, Sigalit Eliazov wrote:
yes, the output of ConvertFromKafkaRecord is with timestamp
KafkaRecord<String,T> record = c.element();
KV<String,T> entry =Objects.requireNonNull(record).getKV();
String key = convertKey(entry.getKey());
T value = convertValue(entry.getValue());
c.outputWithTimestamp(KV.of(key,value),Instant.now()
thanks
Sigalit

On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sjmit...@gmail.com> wrote:

    Do you add time stamp to every record you output in
    ConvertFromKafkaRecord step or any step before that.

    On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov
    <e.siga...@gmail.com> wrote:

        Hi,

        In our pipeline, we've encountered an issue with the
        |GroupByKey| step. After some time of running, it seems that
        messages are not progressing through the |GroupByKey| step,
        causing the pipeline to stall in data processing.

        To troubleshoot this issue, we added debug logging before and
        after the |GroupByKey|step.

        We are using Beam version 2.50 with Flink 1.16.

        running with only 1 task manager, 2 slots. parallelism 2. no HA.

        any insights or suggestions?

        The messages are KV - of String and an Avro message.

        PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput =
        pipeline
            .apply("readIpSessionInfo",
        KafkaTransform.readAvroMessageFromKafka(
                pipelineUtil.getBootstrapServers(),
                options.getSourceKafkaTopic(),
                PIPELINE_NAME,
                IpSessionInfo.class,
                IpSessionInfoDeserializer.class))
         .apply("ConvertIpSessionFromKafka", ParDo.of(new
        ConvertFromKafkaRecord<>()))
         .apply(Window.<KV<String, IpSessionInfo>>into(new
        GlobalWindows())
        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes())
        .apply("DebugLogBeforeGroupByKey", ParDo.of(new
        DoFn<KV<String, IpSessionInfo>, KV<String, IpSessionInfo>>() {
                @DoFn.ProcessElement
                public void processElement(ProcessContext c) {
                    KV<String, IpSessionInfo> element = c.element();
                    log.atInfo().log("Before GroupByKey: " + element);
                    c.output(element);
                }
            }))
        *    .apply(GroupByKey.create())
        *
         .apply("DebugLogAfterGroupByKey", ParDo.of(new
        DoFn<KV<String, Iterable<IpSessionInfo>>, KV<String,
        Iterable<IpSessionInfo>>>() {
                @ProcessElement
                public void processElement(ProcessContext c) {
                    KV<String, Iterable<IpSessionInfo>> groupedElement
        = c.element();
                    log.atInfo().log("After GroupByKey: " +
        groupedElement);
                    c.output(groupedElement);
                }
            }))
            .apply(ParDo.of(new ConvertIpSession()));


        thanks
        Sigalit

Reply via email to