also forgot to mention i am running this in Flink 1.8 using the flinkRunner. beam-runners-flink-1.8
On 2020/06/19 22:41:10, Roger <[email protected]> wrote: > Hello.> > I am having issues with an unbounded streaming processing pipeline using> > event times for processing. I can confirm that all the pieces of the> > pipeline are running including the aggregation. The problem is that it is> > only running for one window. This makes me think I've done something wrong> > with the watermarking and/or timestamping. The value of the kafka object is> > an object with a timestamp set as an epoch string in milliseconds. For> > example, "1592600152163". I have tried everything I can think of to debug> > including adding to logging statements but can only confirm that things> > seem to be set correctly. I can see that the CustomTimeFieldPolicy values> > for currentWatermark are getting set. I can print out the aggregated values> > that ultimately get created as well. For some reason, though, the window> > does not advance and I cannot continuously process the pipeline data.> > Things just stop.> > > Any suggestions or ideas on what I've done wrong would be very much> > appreciated.> > Thanks for looking!> > Roger> > > > > *Approach: Use withTimestampPolicyFactory from org.apache.beam.sdk.io.kafka> > here> > < https://beam.apache.org/releases/javadoc/2.4.0/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#withTimestampPolicyFactory-org.apache.beam.sdk.io.kafka.TimestampPolicyFactory->> > along> > with withTimestamps.of*> > > pipeline> > .apply("ReadFromKafka", KafkaIO.<Long, NewRelicRecord>read()> > .withKeyDeserializer(LongDeserializer.class)> > .withValueDeserializerAndCoder(> > KafkaJsonDeserializer.class, SerializableCoder.of(NewRelicRecord.class))> > .withBootstrapServers(options.getKafkaBrokers())> > .withTopic(options.getKafkaTopic())> > .withMaxReadTime(Duration.standardSeconds(Integer.parseInt(options.> > getKafkaReadTimeout())))> > .withConsumerConfigUpdates(ImmutableMap.of(> > "group.id", options.getKafkaGroupId()))> > .withTimestampPolicyFactory((tp, previousWaterMark) -> new> > CustomFieldTimePolicy(previousWaterMark))> > .withoutMetadata()> > )> > .apply("ExtractPayload", Values.<NewRelicRecord>create())> > .apply("Append event time for PCollection records",> > WithTimestamps.of((NewRelicRecord record) -> {> > > long millisecondsFromEpoch = Long.parseLong(record.getTimestamp());> > DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);> > Instant instant = jodaDateTime.toInstant();> > > return instant;> > }));> > > Where CustomFieldTimePolicy looks like this:> > > public class CustomFieldTimePolicy extends TimestampPolicy<Long,> > NewRelicRecord> {> > > /** current watermark holder. */> > private Instant currentWatermark;> > > /** watermark logic.> > * @param previousWatermark the watermark used to determine current watermark> > */> > public CustomFieldTimePolicy(final Optional<Instant> previousWatermark) {> > currentWatermark = previousWatermark.orElse(BoundedWindow.> > TIMESTAMP_MIN_VALUE);> > }> > > @Override> > public final Instant getTimestampForRecord(> > final PartitionContext ctx, final KafkaRecord<Long, NewRelicRecord> record)> > {> > > long millisecondsFromEpoch = Long.parseLong(record.getKV().getValue().> > getTimestamp());> > DateTime jodaDateTime = new DateTime(millisecondsFromEpoch);> > > Instant instant = jodaDateTime.toInstant();> > > if (instant.isAfter(currentWatermark)) {> > currentWatermark = instant;> > }> > > return instant;> > }> > > @Override> > public final Instant getWatermark(final PartitionContext ctx) {> > return currentWatermark;> > }> > }> >
