Hi Sigalit,
you should set TimestampPolicyFactory [1] to the source, because
otherwise resetting the timestamp in a plain ParDo
(ConvertFromKafkaRecord) can cause the element's timestamp to shift back
in time before watermark and subsequently cause the data to get dropped
by the GroupByKey transform. If you don't set any watermark policy
explicitly, the default is processing time, which is likely causing the
effect you observe. Alternative option is to use state and timers in the
ConvertFromKafkaRecord transform to make sure that the transform
correctly holds the output watermark using Timer.withOutputTimestamp
[2]. I'd go with option 1) though.
Best,
Jan
[1]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-
[2]
https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/state/Timer.html#withOutputTimestamp-org.joda.time.Instant-
On 11/17/23 08:22, Sigalit Eliazov wrote:
yes, the output of ConvertFromKafkaRecord is with timestamp
KafkaRecord<String,T> record = c.element();
KV<String,T> entry =Objects.requireNonNull(record).getKV();
String key = convertKey(entry.getKey());
T value = convertValue(entry.getValue());
c.outputWithTimestamp(KV.of(key,value),Instant.now()
thanks
Sigalit
On Fri, Nov 17, 2023 at 4:36 AM Sachin Mittal <sjmit...@gmail.com> wrote:
Do you add time stamp to every record you output in
ConvertFromKafkaRecord step or any step before that.
On Fri, 17 Nov 2023 at 4:07 AM, Sigalit Eliazov
<e.siga...@gmail.com> wrote:
Hi,
In our pipeline, we've encountered an issue with the
|GroupByKey| step. After some time of running, it seems that
messages are not progressing through the |GroupByKey| step,
causing the pipeline to stall in data processing.
To troubleshoot this issue, we added debug logging before and
after the |GroupByKey|step.
We are using Beam version 2.50 with Flink 1.16.
running with only 1 task manager, 2 slots. parallelism 2. no HA.
any insights or suggestions?
The messages are KV - of String and an Avro message.
PCollection<KV<String, IpSessionInfoWithKey>> ipSessionInput =
pipeline
.apply("readIpSessionInfo",
KafkaTransform.readAvroMessageFromKafka(
pipelineUtil.getBootstrapServers(),
options.getSourceKafkaTopic(),
PIPELINE_NAME,
IpSessionInfo.class,
IpSessionInfoDeserializer.class))
.apply("ConvertIpSessionFromKafka", ParDo.of(new
ConvertFromKafkaRecord<>()))
.apply(Window.<KV<String, IpSessionInfo>>into(new
GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(1)))
.withAllowedLateness(Duration.ZERO)
.discardingFiredPanes())
.apply("DebugLogBeforeGroupByKey", ParDo.of(new
DoFn<KV<String, IpSessionInfo>, KV<String, IpSessionInfo>>() {
@DoFn.ProcessElement
public void processElement(ProcessContext c) {
KV<String, IpSessionInfo> element = c.element();
log.atInfo().log("Before GroupByKey: " + element);
c.output(element);
}
}))
* .apply(GroupByKey.create())
*
.apply("DebugLogAfterGroupByKey", ParDo.of(new
DoFn<KV<String, Iterable<IpSessionInfo>>, KV<String,
Iterable<IpSessionInfo>>>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, Iterable<IpSessionInfo>> groupedElement
= c.element();
log.atInfo().log("After GroupByKey: " +
groupedElement);
c.output(groupedElement);
}
}))
.apply(ParDo.of(new ConvertIpSession()));
thanks
Sigalit