Re: Suggestion to let KafkaIO support the deserializer API with headers
Thanks Luke, I took a stab at this in https://issues.apache.org/jira/browse/BEAM-10865 - also outlined extensively avenues explored with the range of Kafka APIs to support and how Record headers and then eventually the (Extended)Deserializer APIs evolved. Tested with kafka-clients 1.0.0 through latest Best, Lourens On Fri, Aug 21, 2020 at 5:06 PM Luke Cwik wrote: > Sounds good. > > Note that you'll also want to update ReadFromKafkaDoFn[1] and provide > tests that cover both to make sure we don't regress and stop providing > headers. > > 1: > https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309 > > On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude > wrote: > >> Hi everyone, >> >> We bumped into an API issue with the deserializer called on constructing >> KafaRecord instances in the KafkaIO module. >> >> I wanted to float this past the mailing list for discussion first before >> exploring further. >> >> The callsite referenced: KafkaIO only calls the deserializer with the >> simplified API that does not include Kafka record headers (even though they >> are available to pass as an argument): >> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203 >> >> Our SerDes implementaton relies on Kafka Headers support and it was added >> to Kafka records via KIP as a means to include metadata cleanly and not >> abuse keys or values for such purposes. >> >> It is also a valid Deserializer API as per the official Kafka interface: >> >> * >> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61 >> * It delegates to the simplified version as it's default implementation >> (which requires a formal implementation) in >> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60 >> * The default behaviour is thus backwards compatible, with a preference >> for the header specific API >> >> We've used the custom SerDes without issues in a complex Connect and >> Streams pipeline, but bumped into this API divergence of not preferring the >> deserializer API with headers as the primary deserializer mechanism. >> >> The same API used elsewhere. >> >> * It's the default for the stock Java consumer: >> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362 >> (header enabled calls simplified API) >> * Ditto Kafka Connect: >> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64 >> * And Kafka Streams: >> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66 >> >> Any thoughts on the proposed change with the additional headers argument >> passed on deserialization? >> >> Best, >> Lourens >> >
Re: Suggestion to let KafkaIO support the deserializer API with headers
Sounds good. Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests that cover both to make sure we don't regress and stop providing headers. 1: https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309 On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude wrote: > Hi everyone, > > We bumped into an API issue with the deserializer called on constructing > KafaRecord instances in the KafkaIO module. > > I wanted to float this past the mailing list for discussion first before > exploring further. > > The callsite referenced: KafkaIO only calls the deserializer with the > simplified API that does not include Kafka record headers (even though they > are available to pass as an argument): > https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203 > > Our SerDes implementaton relies on Kafka Headers support and it was added > to Kafka records via KIP as a means to include metadata cleanly and not > abuse keys or values for such purposes. > > It is also a valid Deserializer API as per the official Kafka interface: > > * > https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61 > * It delegates to the simplified version as it's default implementation > (which requires a formal implementation) in > https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60 > * The default behaviour is thus backwards compatible, with a preference > for the header specific API > > We've used the custom SerDes without issues in a complex Connect and > Streams pipeline, but bumped into this API divergence of not preferring the > deserializer API with headers as the primary deserializer mechanism. > > The same API used elsewhere. > > * It's the default for the stock Java consumer: > https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362 > (header enabled calls simplified API) > * Ditto Kafka Connect: > https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64 > * And Kafka Streams: > https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66 > > Any thoughts on the proposed change with the additional headers argument > passed on deserialization? > > Best, > Lourens >
Suggestion to let KafkaIO support the deserializer API with headers
Hi everyone, We bumped into an API issue with the deserializer called on constructing KafaRecord instances in the KafkaIO module. I wanted to float this past the mailing list for discussion first before exploring further. The callsite referenced: KafkaIO only calls the deserializer with the simplified API that does not include Kafka record headers (even though they are available to pass as an argument): https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203 Our SerDes implementaton relies on Kafka Headers support and it was added to Kafka records via KIP as a means to include metadata cleanly and not abuse keys or values for such purposes. It is also a valid Deserializer API as per the official Kafka interface: * https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61 * It delegates to the simplified version as it's default implementation (which requires a formal implementation) in https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60 * The default behaviour is thus backwards compatible, with a preference for the header specific API We've used the custom SerDes without issues in a complex Connect and Streams pipeline, but bumped into this API divergence of not preferring the deserializer API with headers as the primary deserializer mechanism. The same API used elsewhere. * It's the default for the stock Java consumer: https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362 (header enabled calls simplified API) * Ditto Kafka Connect: https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64 * And Kafka Streams: https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66 Any thoughts on the proposed change with the additional headers argument passed on deserialization? Best, Lourens