You would have to return min timestamp for all records otherwise the watermark may have advanced and you would be outputting records that are droppably late.
On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> wrote: > To be clear, returning min_timestamp for unparsable records shound not > affect the watermark. > > On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> wrote: > >> How about returning min_timestamp? The would be dropped or redirected by >> the ParDo after that. >> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is >> this pipeline defined under kafkaio package? >> >> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> wrote: >> >>> In this case, the user is attempting to handle errors when parsing the >>> timestamp. The timestamp controls the watermark for the UnboundedSource, >>> how would they control the watermark in a downstream ParDo? >>> >>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> wrote: >>> >>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>> [email protected]> wrote: >>>> >>>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>>> function that would result in an exception, this can be extracted by a >>>>> ParDo down the line. >>>>> >>>> >>>> KafkaIO does return bytes, and I think most sources should, unless >>>> there is a good reason not to. >>>> Given that, do we think Beam should provide a tranform that makes to >>>> simpler to handle deadletter output? I think there was a thread about it in >>>> the past. >>>> >>>> >>>>> >>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>> [email protected]> wrote: >>>>> >>>>>> As Raghu said, >>>>>> >>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that >>>>>> you can extract your Success Records (TupleTag) and your DeadLetter >>>>>> records(TupleTag) and do whatever you want with them. >>>>>> >>>>>> >>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, >>>>>> 05:18: >>>>>> >>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>> explicitly in a ParDo, which gives complete control on how to handle >>>>>>> record >>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>> >>>>>>> If there is a transform in Beam that does this, it could be >>>>>>> convenient for users in many such scenarios. This is simpler than each >>>>>>> source supporting it explicitly. >>>>>>> >>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is probably >>>>>>>> not something that can easily be supported. We might be able to support >>>>>>>> similar features when we have Kafka on top of Splittable DoFn though. >>>>>>>> >>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Cham >>>>>>>> >>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> This is a great question. I've added the dev list to be sure it >>>>>>>>> gets noticed by whoever may know best. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that >>>>>>>>>> uses a KafkaIO >>>>>>>>>> connector for it's input? As >>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be able >>>>>>>>>> to produce a >>>>>>>>>> Deadletter output from it? >>>>>>>>>> >>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>>>>> input: >>>>>>>>>> >>>>>>>>>> pipeline.apply( >>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>> .withTopics(topics) >>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>> .updateConsumerProperties( >>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>> inputMessagesConfig)) >>>>>>>>>> >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>> "earliest")) >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>> "beam-consumers")) >>>>>>>>>> >>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>> "true")) >>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>> .withReadCommitted() >>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> and I like to get deadletter outputs when my timestamp extraction >>>>>>>>>> fails. >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Tobi >>>>>>>>>> >>>>>>>>>>
