Well, if every input record's timestamp is X, watermark staying at X is the right answer, no? But I am not sure where the disagreement is, actually. I might be mistaken.
KafkaIO has a few in-built policies for watermark and timestamp that cover most use cases (including server time, which has a benefit of providing perfect watermark). It also gives fairly complete control on these to users if they chose to. I think it looks like reasonable for a policy to base its watermark only only on parsable records, and ignore unparsable records w.r.t watermark calculation. It could even assign a timestamp that makes more logical sense in a particular application. On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> wrote: > Forgive me if this is naive or missing something, but here are my thoughts > on these alternatives: > > (0) Timestamp has to be pulled out in the source to control the watermark. > Luke's point is imortant. > > (1) If bad records get min_timestamp, and they occur infrequently enough, > then watermark will advance and they will all be dropped. That will not > allow output to a dead-letter queue. > > (2) If you have always min_timestamp records, or if bad records are > frequent, the watermark will never advance. So windows/aggregations would > never be considered complete. Triggers could be used to get output anyhow, > but it would never be a final answer. I think it is not in the spirit of > Beam to work this way. Pragmatically, no state could ever be freed by a > runner. > > In SQL there is an actual "dead letter" option when creating a table that > parses from a bytes source. If, for example, a JSON record cannot be parsed > to the expected schema - like maybe an avro record got in the stream, or > the JSON doesn't match the expected schema - it is output as-is to a > user-specified dead letter queue. I think this same level of support is > also required for records that cannot have timestamps extracted in an > unbounded source. > > In an SDF I think the function has enough control to do it all in > "userland", so Cham is right on here. > > Kenn > > On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote: > >> That depends on the users pipeline and how watermark advancement of the >> source may impact elements becoming droppably late if they are emitted with >> the minimum timestamp. >> >> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> wrote: >> >>> I see. >>> >>> What I meant was to return min_timestamp for bad records in the >>> timestamp handler passed to KafkaIO itself, and correct timestamp for >>> parsable records. That should work too, right? >>> >>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> wrote: >>> >>>> Yes, that would be fine. >>>> >>>> The user could then use a ParDo which outputs to a DLQ for things it >>>> can't parse the timestamp for and use outputWithTimestamp[1] for everything >>>> else. >>>> >>>> 1: >>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>>> >>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> >>>> wrote: >>>> >>>>> Thanks. So returning min timestamp is OK, right (assuming application >>>>> fine is with what it means)? >>>>> >>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>>> All records in Apache Beam have a timestamp. The default timestamp is >>>>>> the min timestamp defined here: >>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>>> >>>>>> >>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> You would have to return min timestamp for all records otherwise >>>>>>>> the watermark may have advanced and you would be outputting records >>>>>>>> that >>>>>>>> are droppably late. >>>>>>>> >>>>>>> >>>>>>> That would be fine I guess. What’s the timestamp for a record that >>>>>>> doesn’t have one? >>>>>>> >>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> To be clear, returning min_timestamp for unparsable records shound >>>>>>>>> not affect the watermark. >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>>> redirected by the ParDo after that. >>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a public >>>>>>>>>> API, is this pipeline defined under kafkaio package? >>>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark for the >>>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>>> downstream ParDo? >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>>> applying a function that would result in an exception, this can >>>>>>>>>>>>> be >>>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> KafkaIO does return bytes, and I think most sources should, >>>>>>>>>>>> unless there is a good reason not to. >>>>>>>>>>>> Given that, do we think Beam should provide a tranform that >>>>>>>>>>>> makes to simpler to handle deadletter output? I think there was a >>>>>>>>>>>> thread >>>>>>>>>>>> about it in the past. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>>> >>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple >>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) and >>>>>>>>>>>>>> your >>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with them. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. >>>>>>>>>>>>>> 2018, 05:18: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and deserialize >>>>>>>>>>>>>>> explicitly in a ParDo, which gives complete control on how to >>>>>>>>>>>>>>> handle record >>>>>>>>>>>>>>> errors. This is I would do if I need to in my pipeline. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If there is a transform in Beam that does this, it could be >>>>>>>>>>>>>>> convenient for users in many such scenarios. This is simpler >>>>>>>>>>>>>>> than each >>>>>>>>>>>>>>> source supporting it explicitly. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this is >>>>>>>>>>>>>>>> probably not something that can easily be supported. We might >>>>>>>>>>>>>>>> be able to >>>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to be >>>>>>>>>>>>>>>>> sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a pipeline >>>>>>>>>>>>>>>>>> that uses a KafkaIO >>>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how would I >>>>>>>>>>>>>>>>>> be able to produce a >>>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads from a >>>>>>>>>>>>>>>>>> KafkaIO input: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>>>>>>>>>>>>>>> "beam-consumers")) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my timestamp >>>>>>>>>>>>>>>>>> extraction fails. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>
