On Thu, Oct 25, 2018 at 10:41 AM Raghu Angadi <[email protected]> wrote:
> > On Thu, Oct 25, 2018 at 10:28 AM Chamikara Jayalath <[email protected]> > wrote: > >> Not sure if I understand why this would require Kafka to behave as two >> independent sources. >> > > > >> Won't setting a non-negative-infinity timestamp (for example processing >> time) for failed records be enough ? >> > > Well that's what I suggested and that is what a user seems to have done. > The question what if that is not what we want and don't want to mix these > two together (at least from my reading of Luke's and Kenn's comments, which > could be off). > Sorry I meant setting it at the SDF itself (not from a ParDo) when we have SDF-based KafkaIO. > > > >> >> Also (at least at some point) there were discussions on supporting SDFs >> to report different watermarks for different outputs. More details are >> available here: >> https://docs.google.com/document/d/1BGc8pM1GOvZhwR9SARSVte-20XEoBUxrGJ5gTWXdv3c/edit >> >> - Cham >> >> >>> >>> Raghu. >>> >>>> >>>> It could even assign a timestamp that makes more logical sense in a >>>>> particular application. >>>>> >>>>> On Wed, Oct 24, 2018 at 8:30 PM Kenneth Knowles <[email protected]> >>>>> wrote: >>>>> >>>>>> Forgive me if this is naive or missing something, but here are my >>>>>> thoughts on these alternatives: >>>>>> >>>>>> (0) Timestamp has to be pulled out in the source to control the >>>>>> watermark. Luke's point is imortant. >>>>>> >>>>>> (1) If bad records get min_timestamp, and they occur infrequently >>>>>> enough, then watermark will advance and they will all be dropped. That >>>>>> will >>>>>> not allow output to a dead-letter queue. >>>>>> >>>>>> (2) If you have always min_timestamp records, or if bad records are >>>>>> frequent, the watermark will never advance. So windows/aggregations would >>>>>> never be considered complete. Triggers could be used to get output >>>>>> anyhow, >>>>>> but it would never be a final answer. I think it is not in the spirit of >>>>>> Beam to work this way. Pragmatically, no state could ever be freed by a >>>>>> runner. >>>>>> >>>>>> In SQL there is an actual "dead letter" option when creating a table >>>>>> that parses from a bytes source. If, for example, a JSON record cannot be >>>>>> parsed to the expected schema - like maybe an avro record got in the >>>>>> stream, or the JSON doesn't match the expected schema - it is output >>>>>> as-is >>>>>> to a user-specified dead letter queue. I think this same level of support >>>>>> is also required for records that cannot have timestamps extracted in an >>>>>> unbounded source. >>>>>> >>>>>> In an SDF I think the function has enough control to do it all in >>>>>> "userland", so Cham is right on here. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Wed, Oct 24, 2018 at 6:54 PM Lukasz Cwik <[email protected]> wrote: >>>>>> >>>>>>> That depends on the users pipeline and how watermark advancement of >>>>>>> the source may impact elements becoming droppably late if they are >>>>>>> emitted >>>>>>> with the minimum timestamp. >>>>>>> >>>>>>> On Wed, Oct 24, 2018 at 4:42 PM Raghu Angadi <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> I see. >>>>>>>> >>>>>>>> What I meant was to return min_timestamp for bad records in the >>>>>>>> timestamp handler passed to KafkaIO itself, and correct timestamp for >>>>>>>> parsable records. That should work too, right? >>>>>>>> >>>>>>>> On Wed, Oct 24, 2018 at 4:21 PM Lukasz Cwik <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes, that would be fine. >>>>>>>>> >>>>>>>>> The user could then use a ParDo which outputs to a DLQ for things >>>>>>>>> it can't parse the timestamp for and use outputWithTimestamp[1] for >>>>>>>>> everything else. >>>>>>>>> >>>>>>>>> 1: >>>>>>>>> https://beam.apache.org/releases/javadoc/2.7.0/org/apache/beam/sdk/transforms/DoFn.WindowedContext.html#outputWithTimestamp-org.apache.beam.sdk.values.TupleTag-T-org.joda.time.Instant- >>>>>>>>> >>>>>>>>> On Wed, Oct 24, 2018 at 1:21 PM Raghu Angadi <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks. So returning min timestamp is OK, right (assuming >>>>>>>>>> application fine is with what it means)? >>>>>>>>>> >>>>>>>>>> On Wed, Oct 24, 2018 at 1:17 PM Lukasz Cwik <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> All records in Apache Beam have a timestamp. The default >>>>>>>>>>> timestamp is the min timestamp defined here: >>>>>>>>>>> https://github.com/apache/beam/blob/279a05604b83a54e8e5a79e13d8761f94841f326/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/BoundedWindow.java#L48 >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Oct 24, 2018 at 12:51 PM Raghu Angadi < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:33 PM Lukasz Cwik <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> You would have to return min timestamp for all records >>>>>>>>>>>>> otherwise the watermark may have advanced and you would be >>>>>>>>>>>>> outputting >>>>>>>>>>>>> records that are droppably late. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> That would be fine I guess. What’s the timestamp for a record >>>>>>>>>>>> that doesn’t have one? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Oct 24, 2018 at 12:25 PM Raghu Angadi < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> To be clear, returning min_timestamp for unparsable records >>>>>>>>>>>>>> shound not affect the watermark. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:32 AM Raghu Angadi < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> How about returning min_timestamp? The would be dropped or >>>>>>>>>>>>>>> redirected by the ParDo after that. >>>>>>>>>>>>>>> Btw, TimestampPolicyFactory.withTimestampFn() is not a >>>>>>>>>>>>>>> public API, is this pipeline defined under kafkaio package? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 10:19 AM Lukasz Cwik < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> In this case, the user is attempting to handle errors when >>>>>>>>>>>>>>>> parsing the timestamp. The timestamp controls the watermark >>>>>>>>>>>>>>>> for the >>>>>>>>>>>>>>>> UnboundedSource, how would they control the watermark in a >>>>>>>>>>>>>>>> downstream ParDo? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 9:48 AM Raghu Angadi < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Oct 24, 2018 at 7:19 AM Chamikara Jayalath < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Ah nice. Yeah, if user can return full bytes instead of >>>>>>>>>>>>>>>>>> applying a function that would result in an exception, this >>>>>>>>>>>>>>>>>> can be >>>>>>>>>>>>>>>>>> extracted by a ParDo down the line. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> KafkaIO does return bytes, and I think most sources >>>>>>>>>>>>>>>>> should, unless there is a good reason not to. >>>>>>>>>>>>>>>>> Given that, do we think Beam should provide a tranform >>>>>>>>>>>>>>>>> that makes to simpler to handle deadletter output? I think >>>>>>>>>>>>>>>>> there was a >>>>>>>>>>>>>>>>> thread about it in the past. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia < >>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> As Raghu said, >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Just apply a regular ParDo and return a PCollectionTuple >>>>>>>>>>>>>>>>>>> afert that you can extract your Success Records (TupleTag) >>>>>>>>>>>>>>>>>>> and your >>>>>>>>>>>>>>>>>>> DeadLetter records(TupleTag) and do whatever you want with >>>>>>>>>>>>>>>>>>> them. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Raghu Angadi <[email protected]> schrieb am Mi., 24. >>>>>>>>>>>>>>>>>>> Okt. 2018, 05:18: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> User can read serialized bytes from KafkaIO and >>>>>>>>>>>>>>>>>>>> deserialize explicitly in a ParDo, which gives complete >>>>>>>>>>>>>>>>>>>> control on how to >>>>>>>>>>>>>>>>>>>> handle record errors. This is I would do if I need to in >>>>>>>>>>>>>>>>>>>> my pipeline. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> If there is a transform in Beam that does this, it >>>>>>>>>>>>>>>>>>>> could be convenient for users in many such scenarios. This >>>>>>>>>>>>>>>>>>>> is simpler than >>>>>>>>>>>>>>>>>>>> each source supporting it explicitly. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath < >>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Given that KafkaIO uses UnboundeSource framework, this >>>>>>>>>>>>>>>>>>>>> is probably not something that can easily be supported. >>>>>>>>>>>>>>>>>>>>> We might be able to >>>>>>>>>>>>>>>>>>>>> support similar features when we have Kafka on top of >>>>>>>>>>>>>>>>>>>>> Splittable DoFn >>>>>>>>>>>>>>>>>>>>> though. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So feel free to create a feature request JIRA for this. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks, >>>>>>>>>>>>>>>>>>>>> Cham >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> This is a great question. I've added the dev list to >>>>>>>>>>>>>>>>>>>>>> be sure it gets noticed by whoever may know best. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hi, >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Is there a way to get a Deadletter Output from a >>>>>>>>>>>>>>>>>>>>>>> pipeline that uses a KafkaIO >>>>>>>>>>>>>>>>>>>>>>> connector for it's input? As >>>>>>>>>>>>>>>>>>>>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>>>>>>>>>>>>>>>>>>>> only a SerializableFunction and not a ParDo, how >>>>>>>>>>>>>>>>>>>>>>> would I be able to produce a >>>>>>>>>>>>>>>>>>>>>>> Deadletter output from it? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> I have the following pipeline defined that reads >>>>>>>>>>>>>>>>>>>>>>> from a KafkaIO input: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> pipeline.apply( >>>>>>>>>>>>>>>>>>>>>>> KafkaIO.<String, String>read() >>>>>>>>>>>>>>>>>>>>>>> .withBootstrapServers(bootstrap) >>>>>>>>>>>>>>>>>>>>>>> .withTopics(topics) >>>>>>>>>>>>>>>>>>>>>>> .withKeyDeserializer(StringDeserializer.class) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties( >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>>>>>>>>>>>>>>>>>>>> inputMessagesConfig)) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>>>>>>>>>>>>>>>>>>>> "earliest")) >>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of(" >>>>>>>>>>>>>>>>>>>>>>> group.id", "beam-consumers")) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>>>>>>>>>>>>>>>>>>>> "true")) >>>>>>>>>>>>>>>>>>>>>>> .withTimestampPolicyFactory( >>>>>>>>>>>>>>>>>>>>>>> TimestampPolicyFactory.withTimestampFn( >>>>>>>>>>>>>>>>>>>>>>> new >>>>>>>>>>>>>>>>>>>>>>> MessageTimestampExtractor(inputMessagesConfig))) >>>>>>>>>>>>>>>>>>>>>>> .withReadCommitted() >>>>>>>>>>>>>>>>>>>>>>> .commitOffsetsInFinalize()) >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> and I like to get deadletter outputs when my >>>>>>>>>>>>>>>>>>>>>>> timestamp extraction fails. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Best, >>>>>>>>>>>>>>>>>>>>>>> Tobi >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>
