Ah nice. Yeah, if user can return full bytes instead of applying a function that would result in an exception, this can be extracted by a ParDo down the line.
On Tue, Oct 23, 2018 at 11:14 PM Juan Carlos Garcia <[email protected]> wrote: > As Raghu said, > > Just apply a regular ParDo and return a PCollectionTuple afert that you > can extract your Success Records (TupleTag) and your DeadLetter > records(TupleTag) and do whatever you want with them. > > > Raghu Angadi <[email protected]> schrieb am Mi., 24. Okt. 2018, 05:18: > >> User can read serialized bytes from KafkaIO and deserialize explicitly in >> a ParDo, which gives complete control on how to handle record errors. This >> is I would do if I need to in my pipeline. >> >> If there is a transform in Beam that does this, it could be convenient >> for users in many such scenarios. This is simpler than each source >> supporting it explicitly. >> >> On Tue, Oct 23, 2018 at 8:03 PM Chamikara Jayalath <[email protected]> >> wrote: >> >>> Given that KafkaIO uses UnboundeSource framework, this is probably not >>> something that can easily be supported. We might be able to support similar >>> features when we have Kafka on top of Splittable DoFn though. >>> >> So feel free to create a feature request JIRA for this. >>> >>> Thanks, >>> Cham >>> >>> On Tue, Oct 23, 2018 at 7:43 PM Kenneth Knowles <[email protected]> wrote: >>> >>>> This is a great question. I've added the dev list to be sure it gets >>>> noticed by whoever may know best. >>>> >>>> Kenn >>>> >>>> On Tue, Oct 23, 2018 at 2:05 AM Kaymak, Tobias < >>>> [email protected]> wrote: >>>> >>>>> >>>>> Hi, >>>>> >>>>> Is there a way to get a Deadletter Output from a pipeline that uses a >>>>> KafkaIO >>>>> connector for it's input? As >>>>> `TimestampPolicyFactory.withTimestampFn()` takes >>>>> only a SerializableFunction and not a ParDo, how would I be able to >>>>> produce a >>>>> Deadletter output from it? >>>>> >>>>> I have the following pipeline defined that reads from a KafkaIO input: >>>>> >>>>> pipeline.apply( >>>>> KafkaIO.<String, String>read() >>>>> .withBootstrapServers(bootstrap) >>>>> .withTopics(topics) >>>>> .withKeyDeserializer(StringDeserializer.class) >>>>> .withValueDeserializer(ConfigurableDeserializer.class) >>>>> .updateConsumerProperties( >>>>> ImmutableMap.of(InputMessagesConfig.CONFIG_PROPERTY_NAME, >>>>> inputMessagesConfig)) >>>>> .updateConsumerProperties(ImmutableMap.of("auto.offset.reset", >>>>> "earliest")) >>>>> .updateConsumerProperties(ImmutableMap.of("group.id", >>>>> "beam-consumers")) >>>>> .updateConsumerProperties(ImmutableMap.of("enable.auto.commit", >>>>> "true")) >>>>> .withTimestampPolicyFactory( >>>>> TimestampPolicyFactory.withTimestampFn( >>>>> new MessageTimestampExtractor(inputMessagesConfig))) >>>>> .withReadCommitted() >>>>> .commitOffsetsInFinalize()) >>>>> >>>>> >>>>> and I like to get deadletter outputs when my timestamp extraction >>>>> fails. >>>>> >>>>> Best, >>>>> Tobi >>>>> >>>>>
