All records in Apache Beam have a timestamp. The default timestamp is the min timestamp defined here: https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48
On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote: > > > On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote: > >> You would have to return min timestamp for all records otherwise the >> watermark may have advanced and you would be outputting records that are >> droppably late. >> > > That would be fine I guess. What’s the timestamp for a record that doesn’t > have one? > > >> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> wrote: >> >>> To be clear, returning min_timestamp for unparsable records shound not >>> affect the watermark. >>> >>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> >>> wrote: >>> >>>> How about returning min_timestamp? The would be dropped or redirected >>>> by the ParDo after that. >>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is >>>> this pipeline defined under kafkaio package? >>>> >>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> In this case, the user is attempting to handle errors when parsing the >>>>> timestamp. The timestamp controls the watermark for the UnboundedSource, >>>>> how would they control the watermark in a downstream ParDo? >>>>> >>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>>>>> function that would result in an exception, this can be extracted by a >>>>>>> ParDo down the line. >>>>>>> >>>>>> >>>>>> KafkaIO does return bytes, and I think most sources should, unless >>>>>> there is a good reason not to. >>>>>> Given that, do we think Beam should provide a tranform that makes to >>>>>> simpler to handle deadletter output? I think there was a thread about it >>>>>> in >>>>>> the past. >>>>>> >>>>>> >>>>>>> >>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> As Raghu said, >>>>>>>> >>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert that >>>>>>>> you can extract your Success Records (TupleTag) and your DeadLetter >>>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>>> >>>>>>>> >>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, >>>>>>>> 05:18: >>>>>>>> >>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle >>>>>>>>> record >>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>> >>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>> convenient for users in many such scenarios. This is simpler than each >>>>>>>>> source supporting it explicitly. >>>>>>>>> >>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>> probably not something that can easily be supported. We might be >>>>>>>>>> able to >>>>>>>>>> support similar features when we have Kafka on top of Splittable DoFn >>>>>>>>>> though. >>>>>>>>>> >>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>> >>>>>>>>>> Thanks, >>>>>>>>>> Cham >>>>>>>>>> >>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> This is a great question. I've added the dev list to be sure it >>>>>>>>>>> gets noticed by whoever may know best. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Hi, >>>>>>>>>>>> >>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that >>>>>>>>>>>> uses a KafkaIO >>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be >>>>>>>>>>>> able to produce a >>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>> >>>>>>>>>>>> I have the following pipeline defined that reads from a KafkaIO >>>>>>>>>>>> input: >>>>>>>>>>>> >>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>> >>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>> >>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>> "earliest")) >>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>> >>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>> "true")) >>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>> extraction fails. >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Tobi >>>>>>>>>>>> >>>>>>>>>>>>
