Given that KafkaIO uses UnboundeSource framework, this is probably not
something that can easily be supported. We might be able to support similar
features when we have Kafka on top of Splittable DoFn though. So feel free
to create a feature request JIRA for this.

Thanks,
Cham

On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> wrote:

> This is a great question. I've added the dev list to be sure it gets
> noticed by whoever may know best.
>
> Kenn
>
> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <[email protected]>
> wrote:
>
>>
>> Hi,
>>
>> Is there a way to get a Deadletter Output from a pipeline that uses a
>> KafkaIO
>> connector for it's input? As `TimestampPolicyFactory.withTimestampFn()`
>> takes
>> only a SerializableFunction and not a ParDo, how would I be able to
>> produce a
>> Deadletter output from it?
>>
>> I have the following pipeline defined that reads from a KafkaIO input:
>>
>> pipeline.apply(
>>   KafkaIO.<String, String>read()
>>     .withBootstrapServers(bootstrap)
>>     .withTopics(topics)
>>     .withKeyDeserializer(StringDeserializer.class)
>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>     .updateConsumerProperties(
>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>> inputMessagesConfig))
>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>> "earliest"))
>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>> "beam-consumers"))
>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>> "true"))
>>     .withTimestampPolicyFactory(
>>         TimestampPolicyFactory.withTimestampFn(
>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>     .withReadCommitted()
>>     .commitOffsetsInFinalize())
>>
>>
>> and I like to get deadletter outputs when my timestamp extraction fails.
>>
>> Best,
>> Tobi
>>
>>

Reply via email to