Forgive me if this is naive or missing something, but here are my thoughts on these alternatives:
(0) Timestamp has to be pulled out in the source to control the watermark. Luke's point is imortant. (1) If bad records get min_timestamp, and they occur infrequently enough, then watermark will advance and they will all be dropped. That will not allow output to a dead-letter queue. (2) If you have always min_timestamp records, or if bad records are frequent, the watermark will never advance. So windows/aggregations would never be considered complete. Triggers could be used to get output anyhow, but it would never be a final answer. I think it is not in the spirit of Beam to work this way. Pragmatically, no state could ever be freed by a runner. In SQL there is an actual "dead letter" option when creating a table that parses from a bytes source. If, for example, a JSON record cannot be parsed to the expected schema - like maybe an avro record got in the stream, or the JSON doesn't match the expected schema - it is output as-is to a user-specified dead letter queue. I think this same level of support is also required for records that cannot have timestamps extracted in an unbounded source. In an SDF I think the function has enough control to do it all in "userland", so Cham is right on here. Kenn On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote: > That depends on the users pipeline and how watermark advancement of the > source may impact elements becoming droppably late if they are emitted with > the minimum timestamp. > > On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> wrote: > >> I see. >> >> What I meant was to return min_timestamp for bad records in the timestamp >> handler passed to KafkaIO itself, and correct timestamp for parsable >> records. That should work too, right? >> >> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote: >> >>> Yes, that would be fine. >>> >>> The user could then use a ParDo which outputs to a DLQ for things it >>> can't parse the timestamp for and use outputWithTimestamp[1] for everything >>> else. >>> >>> 1: >>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>> >>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> wrote: >>> >>>> Thanks. So returning min timestamp is OK, right (assuming application >>>> fine is with what it means)? >>>> >>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>>> All records in Apache Beam have a timestamp. The default timestamp is >>>>> the min timestamp defined here: >>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>> >>>>> >>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> >>>>> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> You would have to return min timestamp for all records otherwise the >>>>>>> watermark may have advanced and you would be outputting records that are >>>>>>> droppably late. >>>>>>> >>>>>> >>>>>> That would be fine I guess. What’s the timestamp for a record that >>>>>> doesn’t have one? >>>>>> >>>>>> >>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> To be clear, returning min_timestamp for unparsable records shound >>>>>>>> not affect the watermark. >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>> redirected by the ParDo after that. >>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public >>>>>>>>> API, is this pipeline defined under kafkaio package? >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for the >>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>> downstream ParDo? >>>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>> applying a function that would result in an exception, this can be >>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> KafkaIO does return bytes, and I think most sources should, >>>>>>>>>>> unless there is a good reason not to. >>>>>>>>>>> Given that, do we think Beam should provide a tranform that >>>>>>>>>>> makes to simpler to handle deadletter output? I think there was a >>>>>>>>>>> thread >>>>>>>>>>> about it in the past. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>> >>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple afert >>>>>>>>>>>>> that you can extract your Success Records (TupleTag) and your >>>>>>>>>>>>> DeadLetter >>>>>>>>>>>>> records(TupleTag) and do whatever you want with them. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. >>>>>>>>>>>>> 2018, 05:18: >>>>>>>>>>>>> >>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to >>>>>>>>>>>>>> handle record >>>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>>>>>>> >>>>>>>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler >>>>>>>>>>>>>> than each >>>>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>>>>>> probably not something that can easily be supported. We might >>>>>>>>>>>>>>> be able to >>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be >>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline >>>>>>>>>>>>>>>>> that uses a KafkaIO >>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I >>>>>>>>>>>>>>>>> be able to produce a >>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>
