Thanks. So returning min timestamp is OK, right (assuming application fine is with what it means)?
On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote: > All records in Apache Beam have a timestamp. The default timestamp is the > min timestamp defined here: > https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 > > > On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> wrote: > >> >> >> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> wrote: >> >>> You would have to return min timestamp for all records otherwise the >>> watermark may have advanced and you would be outputting records that are >>> droppably late. >>> >> >> That would be fine I guess. What’s the timestamp for a record that >> doesn’t have one? >> >> >>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> >>> wrote: >>> >>>> To be clear, returning min_timestamp for unparsable records shound not >>>> affect the watermark. >>>> >>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> How about returning min_timestamp? The would be dropped or redirected >>>>> by the ParDo after that. >>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public API, is >>>>> this pipeline defined under kafkaio package? >>>>> >>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> In this case, the user is attempting to handle errors when parsing >>>>>> the timestamp. The timestamp controls the watermark for the >>>>>> UnboundedSource, how would they control the watermark in a downstream >>>>>> ParDo? >>>>>> >>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> Ah nice. Yeah, if user can return full bytes instead of applying a >>>>>>>> function that would result in an exception, this can be extracted by a >>>>>>>> ParDo down the line. >>>>>>>> >>>>>>> >>>>>>> KafkaIO does return bytes, and I think most sources should, unless >>>>>>> there is a good reason not to. >>>>>>> Given that, do we think Beam should provide a tranform that makes to >>>>>>> simpler to handle deadletter output? I think there was a thread about >>>>>>> it in >>>>>>> the past. >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> As Raghu said, >>>>>>>>> >>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert >>>>>>>>> that you can extract your Success Records (TupleTag) and your >>>>>>>>> DeadLetter >>>>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>>>> >>>>>>>>> >>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, >>>>>>>>> 05:18: >>>>>>>>> >>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>>> explicitly in a ParDo, which gives complete control on how to handle >>>>>>>>>> record >>>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>>> >>>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>>> convenient for users in many such scenarios. This is simpler than >>>>>>>>>> each >>>>>>>>>> source supporting it explicitly. >>>>>>>>>> >>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>> probably not something that can easily be supported. We might be >>>>>>>>>>> able to >>>>>>>>>>> support similar features when we have Kafka on top of Splittable >>>>>>>>>>> DoFn >>>>>>>>>>> though. >>>>>>>>>>> >>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Cham >>>>>>>>>>> >>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> This is a great question. I've added the dev list to be sure it >>>>>>>>>>>> gets noticed by whoever may know best. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Hi, >>>>>>>>>>>>> >>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline that >>>>>>>>>>>>> uses a KafkaIO >>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I be >>>>>>>>>>>>> able to produce a >>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>> >>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>> >>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>> >>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>> >>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>> >>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>> "true")) >>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>> new >>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>> >>>>>>>>>>>>> Best, >>>>>>>>>>>>> Tobi >>>>>>>>>>>>> >>>>>>>>>>>>>
