Hi,
Is there a way to get a Deadletter Output from a pipeline that uses a
KafkaIO
connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
takes
only a SerializableFunction and not a ParDo, how would I be able to produce
a
Deadletter output from it?
I have the following pipeline defined that reads from a KafkaIO input:
pipeline.apply(
KafkaIO.<String, String>read()
.withBootstrapServers(bootstrap)
.withTopics(topics)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(ConfigurableDeserializer.class)
.updateConsumerProperties(
ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
inputMessagesConfig))
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
"earliest"))
.updateConsumerProperties(ImmutableMap.of("group.id", "beam-consumers"))
.updateConsumerProperties(ImmutableMap.of("enable.auto.commit", "true"))
.withTimestampPolicyFactory(
TimestampPolicyFactory.withTimestampFn(
new MessageTimestampExtractor(inputMessagesConfig)))
.withReadCommitted()
.commitOffsetsInFinalize())
and I like to get deadletter outputs when my timestamp extraction fails.
Best,
Tobi