Yes, that would be fine. The user could then use a ParDo which outputs to a DLQ for things it can't parse the timestamp for and use outputWithTimestamp[1] for everything else.
1: https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> wrote: > Thanks. So returning min timestamp is OK, right (assuming application > fine is with what it means)? > > On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote: > >> All records in Apache Beam have a timestamp. The default timestamp is the >> min timestamp defined here: >> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >> >> >> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote: >> >>> >>> >>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> You would have to return min timestamp for all records otherwise the >>>> watermark may have advanced and you would be outputting records that are >>>> droppably late. >>>> >>> >>> That would be fine I guess. What’s the timestamp for a record that >>> doesn’t have one? >>> >>> >>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> To be clear, returning min_timestamp for unparsable records shound not >>>>> affect the watermark. >>>>> >>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> How about returning min_timestamp? The would be dropped or redirected >>>>>> by the ParDo after that. >>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, >>>>>> is this pipeline defined under kafkaio package? >>>>>> >>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> In this case, the user is attempting to handle errors when parsing >>>>>>> the timestamp. The timestamp controls the watermark for the >>>>>>> UnboundedSource, how would they control the watermark in a downstream >>>>>>> ParDo? >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>>>>>>> function that would result in an exception, this can be extracted by >>>>>>>>> a >>>>>>>>> ParDo down the line. >>>>>>>>> >>>>>>>> >>>>>>>> KafkaIO does return bytes, and I think most sources should, unless >>>>>>>> there is a good reason not to. >>>>>>>> Given that, do we think Beam should provide a tranform that makes >>>>>>>> to simpler to handle deadletter output? I think there was a thread >>>>>>>> about it >>>>>>>> in the past. >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>> [email protected]> wrote: >>>>>>>>> >>>>>>>>>> As Raghu said, >>>>>>>>>> >>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert >>>>>>>>>> that you can extract your Success Records (TupleTag) and your >>>>>>>>>> DeadLetter >>>>>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, >>>>>>>>>> 05:18: >>>>>>>>>> >>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to >>>>>>>>>>> handle record >>>>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>>>> >>>>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>>>> convenient for users in many such scenarios. This is simpler than >>>>>>>>>>> each >>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>>> probably not something that can easily be supported. We might be >>>>>>>>>>>> able to >>>>>>>>>>>> support similar features when we have Kafka on top of Splittable >>>>>>>>>>>> DoFn >>>>>>>>>>>> though. >>>>>>>>>>>> >>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>> >>>>>>>>>>>> Thanks, >>>>>>>>>>>> Cham >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> This is a great question. I've added the dev list to be sure >>>>>>>>>>>>> it gets noticed by whoever may know best. >>>>>>>>>>>>> >>>>>>>>>>>>> Kenn >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline >>>>>>>>>>>>>> that uses a KafkaIO >>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be >>>>>>>>>>>>>> able to produce a >>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>> >>>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>>> >>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>> >>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>>> >>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>> new >>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Best, >>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>> >>>>>>>>>>>>>>
