How about returning min_timestamp? The would be dropped or redirected by
the ParDo after that.
Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this
pipeline defined under kafkaio package?

On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote:

> In this case, the user is attempting to handle errors when parsing the
> timestamp. The timestamp controls the watermark for the UnboundedSource,
> how would they control the watermark in a downstream ParDo?
>
> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote:
>
>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Ah nice. Yeah, if user can return full bytes instead of applying a
>>> function that would result in an exception,  this can be extracted by a
>>> ParDo down the line.
>>>
>>
>> KafkaIO does return bytes, and I think most sources should, unless there
>> is a good reason not to.
>> Given that, do we think Beam should provide a tranform that makes to
>> simpler to handle deadletter output? I think there was a thread about it in
>> the past.
>>
>>
>>>
>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com>
>>> wrote:
>>>
>>>> As Raghu said,
>>>>
>>>> Just apply a regular ParDo and return a PCollectionTuple afert that you
>>>> can extract your Success Records (TupleTag) and your DeadLetter
>>>> records(TupleTag) and do whatever you want with them.
>>>>
>>>>
>>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18:
>>>>
>>>>> User can read serialized bytes from KafkaIO and deserialize explicitly
>>>>> in a ParDo, which gives complete control on how to handle record errors.
>>>>> This is I would do if I need to in my pipeline.
>>>>>
>>>>> If there is a transform in Beam that does this, it could be convenient
>>>>> for users in many such scenarios. This is simpler than each source
>>>>> supporting it explicitly.
>>>>>
>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably
>>>>>> not something that can easily be supported. We might be able to support
>>>>>> similar features when we have Kafka on top of Splittable DoFn though.
>>>>>>
>>>>> So feel free to create a feature request JIRA for this.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>>> This is a great question. I've added the dev list to be sure it gets
>>>>>>> noticed by whoever may know best.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias <
>>>>>>> tobias.kay...@ricardo.ch> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses
>>>>>>>> a KafkaIO
>>>>>>>> connector for it's input? As
>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes
>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able to
>>>>>>>> produce a
>>>>>>>> Deadletter output from it?
>>>>>>>>
>>>>>>>> I have the following pipeline defined that reads from a KafkaIO
>>>>>>>> input:
>>>>>>>>
>>>>>>>> pipeline.apply(
>>>>>>>>   KafkaIO.<String, String>read()
>>>>>>>>     .withBootstrapServers(bootstrap)
>>>>>>>>     .withTopics(topics)
>>>>>>>>     .withKeyDeserializer(StringDeserializer.class)
>>>>>>>>     .withValueDeserializer(ConfigurableDeserializer.class)
>>>>>>>>     .updateConsumerProperties(
>>>>>>>>         ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME,
>>>>>>>> inputMessagesConfig))
>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("auto.offset.reset",
>>>>>>>> "earliest"))
>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("group.id",
>>>>>>>> "beam-consumers"))
>>>>>>>>     .updateConsumerProperties(ImmutableMap.of("enable.auto.commit",
>>>>>>>> "true"))
>>>>>>>>     .withTimestampPolicyFactory(
>>>>>>>>         TimestampPolicyFactory.withTimestampFn(
>>>>>>>>             new MessageTimestampExtractor(inputMessagesConfig)))
>>>>>>>>     .withReadCommitted()
>>>>>>>>     .commitOffsetsInFinalize())
>>>>>>>>
>>>>>>>>
>>>>>>>> and I like to get deadletter outputs when my timestamp extraction
>>>>>>>> fails.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Tobi
>>>>>>>>
>>>>>>>>

Reply via email to