Given that KafkaIO uses UnboundeSource framework, this is probably not something that can easily be supported. We might be able to support similar features when we have Kafka on top of Splittable DoFn though. So feel free to create a feature request JIRA for this.
Thanks, Cham On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> wrote: > This is a great question. I've added the dev list to be sure it gets > noticed by whoever may know best. > > Kenn > > On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <[email protected]> > wrote: > >> >> Hi, >> >> Is there a way to get a Deadletter Output from a pipeline that uses a >> KafkaIO >> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()` >> takes >> only a SerializableFunction and not a ParDo, how would I be able to >> produce a >> Deadletter output from it? >> >> I have the following pipeline defined that reads from a KafkaIO input: >> >> pipeline.apply( >> KafkaIO.<String, String>read() >> .withBootstrapServers(bootstrap) >> .withTopics(topics) >> .withKeyDeserializer(StringDeserializer.class) >> .withValueDeserializer(ConfigurableDeserializer.class) >> .updateConsumerProperties( >> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >> inputMessagesConfig)) >> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >> "earliest")) >> .updateConsumerProperties(ImmutableMap.of("group.id", >> "beam-consumers")) >> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >> "true")) >> .withTimestampPolicyFactory( >> TimestampPolicyFactory.withTimestampFn( >> new MessageTimestampExtractor(inputMessagesConfig))) >> .withReadCommitted() >> .commitOffsetsInFinalize()) >> >> >> and I like to get deadletter outputs when my timestamp extraction fails. >> >> Best, >> Tobi >> >>
