How about returning min_timestamp? The would be dropped or redirected by the ParDo after that. Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is this pipeline defined under kafkaio package?
On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <lc...@google.com> wrote: > In this case, the user is attempting to handle errors when parsing the > timestamp. The timestamp controls the watermark for the UnboundedSource, > how would they control the watermark in a downstream ParDo? > > On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <rang...@google.com> wrote: > >> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> Ah nice. Yeah, if user can return full bytes instead of applying a >>> function that would result in an exception, this can be extracted by a >>> ParDo down the line. >>> >> >> KafkaIO does return bytes, and I think most sources should, unless there >> is a good reason not to. >> Given that, do we think Beam should provide a tranform that makes to >> simpler to handle deadletter output? I think there was a thread about it in >> the past. >> >> >>> >>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <jcgarc...@gmail.com> >>> wrote: >>> >>>> As Raghu said, >>>> >>>> Just apply a regular ParDo and return a PCollectionTuple afert that you >>>> can extract your Success Records (TupleTag) and your DeadLetter >>>> records(TupleTag) and do whatever you want with them. >>>> >>>> >>>> Raghu Angadi <rang...@google.com> schrieb am Mi., 24. Okt. 2018, 05:18: >>>> >>>>> User can read serialized bytes from KafkaIO and deserialize explicitly >>>>> in a ParDo, which gives complete control on how to handle record errors. >>>>> This is I would do if I need to in my pipeline. >>>>> >>>>> If there is a transform in Beam that does this, it could be convenient >>>>> for users in many such scenarios. This is simpler than each source >>>>> supporting it explicitly. >>>>> >>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably >>>>>> not something that can easily be supported. We might be able to support >>>>>> similar features when we have Kafka on top of Splittable DoFn though. >>>>>> >>>>> So feel free to create a feature request JIRA for this. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <k...@google.com> >>>>>> wrote: >>>>>> >>>>>>> This is a great question. I've added the dev list to be sure it gets >>>>>>> noticed by whoever may know best. >>>>>>> >>>>>>> Kenn >>>>>>> >>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>> tobias.kay...@ricardo.ch> wrote: >>>>>>> >>>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Is there a way to get a Deadletter Output from a pipeline that uses >>>>>>>> a KafkaIO >>>>>>>> connector for it's input? As >>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>> only a SerializableFunction and not a ParDo, how would I be able to >>>>>>>> produce a >>>>>>>> Deadletter output from it? >>>>>>>> >>>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>>> input: >>>>>>>> >>>>>>>> pipeline.apply( >>>>>>>> KafkaIO.<String, String>read() >>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>> .withTopics(topics) >>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>> .updateConsumerProperties( >>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>> inputMessagesConfig)) >>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>> "earliest")) >>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>> "beam-consumers")) >>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>> "true")) >>>>>>>> .withTimestampPolicyFactory( >>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>> .withReadCommitted() >>>>>>>> .commitOffsetsInFinalize()) >>>>>>>> >>>>>>>> >>>>>>>> and I like to get deadletter outputs when my timestamp extraction >>>>>>>> fails. >>>>>>>> >>>>>>>> Best, >>>>>>>> Tobi >>>>>>>> >>>>>>>>