User can read serialized bytes from KafkaIO and deserialize explicitly in a ParDo, which gives complete control on how to handle record errors. This is I would do if I need to in my pipeline.
If there is a transform in Beam that does this, it could be convenient for users in many such scenarios. This is simpler than each source supporting it explicitly. On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <[email protected]> wrote: > Given that KafkaIO uses UnboundeSource framework, this is probably not > something that can easily be supported. We might be able to support similar > features when we have Kafka on top of Splittable DoFn though. > So feel free to create a feature request JIRA for this. > > Thanks, > Cham > > On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> wrote: > >> This is a great question. I've added the dev list to be sure it gets >> noticed by whoever may know best. >> >> Kenn >> >> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <[email protected]> >> wrote: >> >>> >>> Hi, >>> >>> Is there a way to get a Deadletter Output from a pipeline that uses a >>> KafkaIO >>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()` >>> takes >>> only a SerializableFunction and not a ParDo, how would I be able to >>> produce a >>> Deadletter output from it? >>> >>> I have the following pipeline defined that reads from a KafkaIO input: >>> >>> pipeline.apply( >>> KafkaIO.<String, String>read() >>> .withBootstrapServers(bootstrap) >>> .withTopics(topics) >>> .withKeyDeserializer(StringDeserializer.class) >>> .withValueDeserializer(ConfigurableDeserializer.class) >>> .updateConsumerProperties( >>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>> inputMessagesConfig)) >>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>> "earliest")) >>> .updateConsumerProperties(ImmutableMap.of("group.id", >>> "beam-consumers")) >>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>> "true")) >>> .withTimestampPolicyFactory( >>> TimestampPolicyFactory.withTimestampFn( >>> new MessageTimestampExtractor(inputMessagesConfig))) >>> .withReadCommitted() >>> .commitOffsetsInFinalize()) >>> >>> >>> and I like to get deadletter outputs when my timestamp extraction fails. >>> >>> Best, >>> Tobi >>> >>>
