Hello. I am having issues with an unbounded streaming processing pipeline using event times for processing. I can confirm that all the pieces of the pipeline are running including the aggregation. The problem is that it is only running for one window. This makes me think I've done something wrong with the watermarking and/or timestamping. The value of the kafka object is an object with a timestamp set as an epoch string in milliseconds. For example, "1592600152163". I have tried everything I can think of to debug including adding to logging statements but can only confirm that things seem to be set correctly. I can see that the CustomTimeFieldPolicy values for currentWatermark are getting set. I can print out the aggregated values that ultimately get created as well. For some reason, though, the window does not advance and I cannot continuously process the pipeline data. Things just stop.
Any suggestions or ideas on what I've done wrong would be very much appreciated. Thanks for looking! Roger *Approach: Use withTimestampPolicyFactory from org.apache.beam.sdk.io.kafka here <https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory-> along with withTimestamps.of* pipeline .apply("ReadFromKafka", KafkaIO.<Long, NewRelicRecord>read() .withKeyDeserializer(LongDeserializer.class) .withValueDeserializerAndCoder( KafkaJsonDeserializer.class, SerializableCoder.of(NewRelicRecord.class)) .withBootstrapServers(options.getKafkaBrokers()) .withTopic(options.getKafkaTopic()) .withMaxReadTime(Duration.standardSeconds(Integer.parseInt(options. getKafkaReadTimeout()))) .withConsumerConfigUpdates(ImmutableMap.of( "group.id", options.getKafkaGroupId())) .withTimestampPolicyFactory((tp, previousWaterMark) -> new CustomFieldTimePolicy(previousWaterMark)) .withoutMetadata() ) .apply("ExtractPayload", Values.<NewRelicRecord>create()) .apply("Append event time for PCollection records", WithTimestamps.of((NewRelicRecord record) -> { long millisecondsFromEpoch = Long.parseLong(record.getTimestamp()); DateTime jodaDateTime = new DateTime(millisecondsFromEpoch); Instant instant = jodaDateTime.toInstant(); return instant; })); Where CustomFieldTimePolicy looks like this: public class CustomFieldTimePolicy extends TimestampPolicy<Long, NewRelicRecord> { /** current watermark holder. */ private Instant currentWatermark; /** watermark logic. * @param previousWatermark the watermark used to determine current watermark */ public CustomFieldTimePolicy(final Optional<Instant> previousWatermark) { currentWatermark = previousWatermark.orElse(BoundedWindow. TIMESTAMP_MIN_VALUE); } @Override public final Instant getTimestampForRecord( final PartitionContext ctx, final KafkaRecord<Long, NewRelicRecord> record) { long millisecondsFromEpoch = Long.parseLong(record.getKV().getValue(). getTimestamp()); DateTime jodaDateTime = new DateTime(millisecondsFromEpoch); Instant instant = jodaDateTime.toInstant(); if (instant.isAfter(currentWatermark)) { currentWatermark = instant; } return instant; } @Override public final Instant getWatermark(final PartitionContext ctx) { return currentWatermark; } }
