Hi,

Is there a way to get a Deadletter Output from a pipeline that uses a
KafkaIO
connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
takes
only a SerializableFunction and not a ParDo, how would I be able to produce
a
Deadletter output from it?

I have the following pipeline defined that reads from a KafkaIO input:

pipeline.apply(
  KafkaIO.<String, String>read()
    .withBootstrapServers(bootstrap)
    .withTopics(topics)
    .withKeyDeserializer(StringDeserializer.class)
    .withValueDeserializer(ConfigurableDeserializer.class)
    .updateConsumerProperties(
        ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
    .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"earliest"))
    .updateConsumerProperties(ImmutableMap.of("group.id", "beam-consumers"))
    .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
    .withTimestampPolicyFactory(
        TimestampPolicyFactory.withTimestampFn(
            new MessageTimestampExtractor(inputMessagesConfig)))
    .withReadCommitted()
    .commitOffsetsInFinalize())


and I like to get deadletter outputs when my timestamp extraction fails.

Best,
Tobi

Reply via email to