Re: Suggestion to let KafkaIO support the deserializer API with headers

2020-09-10 Thread Lourens Naude
Thanks Luke,

I took a stab at this in https://issues.apache.org/jira/browse/BEAM-10865 -
also outlined extensively avenues explored with the range of Kafka APIs to
support and how Record headers and then eventually the
(Extended)Deserializer APIs evolved. Tested with kafka-clients 1.0.0
through latest

Best,
Lourens

On Fri, Aug 21, 2020 at 5:06 PM Luke Cwik  wrote:

> Sounds good.
>
> Note that you'll also want to update ReadFromKafkaDoFn[1] and provide
> tests that cover both to make sure we don't regress and stop providing
> headers.
>
> 1:
> https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309
>
> On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude 
> wrote:
>
>> Hi everyone,
>>
>> We bumped into an API issue with the deserializer called on constructing
>> KafaRecord instances in the KafkaIO module.
>>
>> I wanted to float this past the mailing list for discussion first before
>> exploring further.
>>
>> The callsite referenced: KafkaIO only calls the deserializer with the
>> simplified API that does not include Kafka record headers (even though they
>> are available to pass as an argument):
>> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>>
>> Our SerDes implementaton relies on Kafka Headers support and it was added
>> to Kafka records via KIP as a means to include metadata cleanly and not
>> abuse keys or values for such purposes.
>>
>> It is also a valid Deserializer API as per the official Kafka interface:
>>
>> *
>> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
>> * It delegates to the simplified version as it's default implementation
>> (which requires a formal implementation) in
>> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
>> * The default behaviour is thus backwards compatible, with a preference
>> for the header specific API
>>
>> We've used the custom SerDes without issues in a complex Connect and
>> Streams pipeline, but bumped into this API divergence of not preferring the
>> deserializer API with headers as the primary deserializer mechanism.
>>
>> The same API used elsewhere.
>>
>> * It's the default for the stock Java consumer:
>> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
>> (header enabled calls simplified API)
>> * Ditto Kafka Connect:
>> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
>> * And Kafka Streams:
>> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>>
>> Any thoughts on the proposed change with the additional headers argument
>> passed on deserialization?
>>
>> Best,
>> Lourens
>>
>


Re: Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Luke Cwik
Sounds good.

Note that you'll also want to update ReadFromKafkaDoFn[1] and provide tests
that cover both to make sure we don't regress and stop providing headers.

1:
https://github.com/apache/beam/blob/cfa448d121297398312d09c531258a72b413488b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java#L309

On Fri, Aug 21, 2020 at 8:29 AM Lourens Naude 
wrote:

> Hi everyone,
>
> We bumped into an API issue with the deserializer called on constructing
> KafaRecord instances in the KafkaIO module.
>
> I wanted to float this past the mailing list for discussion first before
> exploring further.
>
> The callsite referenced: KafkaIO only calls the deserializer with the
> simplified API that does not include Kafka record headers (even though they
> are available to pass as an argument):
> https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203
>
> Our SerDes implementaton relies on Kafka Headers support and it was added
> to Kafka records via KIP as a means to include metadata cleanly and not
> abuse keys or values for such purposes.
>
> It is also a valid Deserializer API as per the official Kafka interface:
>
> *
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
> * It delegates to the simplified version as it's default implementation
> (which requires a formal implementation) in
> https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
> * The default behaviour is thus backwards compatible, with a preference
> for the header specific API
>
> We've used the custom SerDes without issues in a complex Connect and
> Streams pipeline, but bumped into this API divergence of not preferring the
> deserializer API with headers as the primary deserializer mechanism.
>
> The same API used elsewhere.
>
> * It's the default for the stock Java consumer:
> https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
> (header enabled calls simplified API)
> * Ditto Kafka Connect:
> https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
> * And Kafka Streams:
> https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66
>
> Any thoughts on the proposed change with the additional headers argument
> passed on deserialization?
>
> Best,
> Lourens
>


Suggestion to let KafkaIO support the deserializer API with headers

2020-08-21 Thread Lourens Naude
Hi everyone,

We bumped into an API issue with the deserializer called on constructing
KafaRecord instances in the KafkaIO module.

I wanted to float this past the mailing list for discussion first before
exploring further.

The callsite referenced: KafkaIO only calls the deserializer with the
simplified API that does not include Kafka record headers (even though they
are available to pass as an argument):
https://github.com/apache/beam/blob/release-2.20.0/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java#L202-L203

Our SerDes implementaton relies on Kafka Headers support and it was added
to Kafka records via KIP as a means to include metadata cleanly and not
abuse keys or values for such purposes.

It is also a valid Deserializer API as per the official Kafka interface:

*
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59-L61
* It delegates to the simplified version as it's default implementation
(which requires a formal implementation) in
https://github.com/apache/kafka/blob/35a0de32ee3823dfb548a1cd5d5faf4f7c99e4e0/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L60
* The default behaviour is thus backwards compatible, with a preference for
the header specific API

We've used the custom SerDes without issues in a complex Connect and
Streams pipeline, but bumped into this API divergence of not preferring the
deserializer API with headers as the primary deserializer mechanism.

The same API used elsewhere.

* It's the default for the stock Java consumer:
https://github.com/apache/kafka/blob/4cd2396db31418c90005c998d9107ad40df055b2/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1362
(header enabled calls simplified API)
* Ditto Kafka Connect:
https://github.com/apache/kafka/blob/b399a731a39c28bdd89998edc7c9fd732c56eee1/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java#L48-L64
* And Kafka Streams:
https://github.com/apache/kafka/blob/92828d53b18703000159f4dd7dc8b3170667db25/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordDeserializer.java#L65-L66

Any thoughts on the proposed change with the additional headers argument
passed on deserialization?

Best,
Lourens