Ah nice. Yeah, if user can return full bytes instead of applying a function
that would result in an exception,  this can be extracted by a ParDo down
the line.

On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <[email protected]>
wrote:

> As Raghu said,
>
> Just apply a regular ParDo and return a PCollectionTuple afert that you
> can extract your Success Records (TupleTag) and your DeadLetter
> records(TupleTag) and do whatever you want with them.
>
>
> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, 05:18:
>
>> User can read serialized bytes from KafkaIO and deserialize explicitly in
>> a ParDo, which gives complete control on how to handle record errors. This
>> is I would do if I need to in my pipeline.
>>
>> If there is a transform in Beam that does this, it could be convenient
>> for users in many such scenarios. This is simpler than each source
>> supporting it explicitly.
>>
>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <[email protected]>
>> wrote:
>>
>>> Given that KafkaIO uses UnboundeSource framework, this is probably not
>>> something that can easily be supported. We might be able to support similar
>>> features when we have Kafka on top of Splittable DoFn though.
>>>
>> So feel free to create a feature request JIRA for this.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>>> This is a great question. I've added the dev list to be sure it gets
>>>> noticed by whoever may know best.
>>>>
>>>> Kenn
>>>>
>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>> [email protected]> wrote:
>>>>
>>>>>
>>>>> Hi,
>>>>>
>>>>> Is there a way to get a Deadletter Output from a pipeline that uses a
>>>>> KafkaIO
>>>>> connector for it's input? As
>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>> produce a
>>>>> Deadletter output from it?
>>>>>
>>>>> I have the following pipeline defined that reads from a KafkaIO input:
>>>>>
>>>>> pipeline.apply(
>>>>>   KafkaIO.<String, String>read()
>>>>>     .withBootstrapServers(bootstrap)
>>>>>     .withTopics(topics)
>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>     .updateConsumerProperties(
>>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>> inputMessagesConfig))
>>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>> "earliest"))
>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>> "beam-consumers"))
>>>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>> "true"))
>>>>>     .withTimestampPolicyFactory(
>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>     .withReadCommitted()
>>>>>     .commitOffsetsInFinalize())
>>>>>
>>>>>
>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>> fails.
>>>>>
>>>>> Best,
>>>>> Tobi
>>>>>
>>>>>

Reply via email to