GitHub user rhauch opened a pull request:

    https://github.com/apache/kafka/pull/4319

    [WIP] KAFKA-5142: Add Connect support for message headers (KIP-145)

    *NEW PROPOSAL FOR KIP-145... DO NOT MERGE*
    
    Changed the Connect API and runtime to support message headers as described 
in KIP-145.
    
    The new `Header` interface defines an immutable representation of a Kafka 
header (key-value pair) with support for the Connect value types and schemas. 
This interface provides methods for easily converting between many of the 
built-in primitive, structured, and logical data types.
    
    The new `Headers` interface defines an ordered collection of headers and is 
used to track all headers associated with a `ConnectRecord` (and thus 
`SourceRecord` and `SinkRecord`). This does allow multiple headers with the 
same key. The `Headers` contains methods for adding, removing, finding, and 
modifying headers. Convenience methods allow connectors and transforms to 
easily use and modify the headers for a record.
    
    A new `HeaderConverter` interface is also defined to enable the Connect 
runtime framework to be able to serialize and deserialize headers between the 
in-memory representation and Kafka’s byte[] representation. A new 
`SimpleHeaderConverter` implementation has been added, and this serializes to 
strings and deserializes by inferring the schemas (`Struct` header values are 
serialized without the schemas, so they can only be deserialized as `Map` 
instances without a schema.) The `StringConverter`, `JsonConverter`, and 
`ByteArrayConverter` have all been extended to also be `HeaderConverter` 
implementations. Each connector can be configured with a different header 
converter, although by default the `SimpleHeaderConverter` is used to serialize 
header values as strings without schemas.
    
    Unit and integration tests are added for `ConnectHeader` and 
`ConnectHeaders`, the two implementation classes for headers. Additional test 
methods are added for the methods added to the `Converter` implementations. 
Finally, the `ConnectRecord` object is already used heavily, so only limited 
tests need to be added while quite a few of the existing tests already cover 
the changes.
    
    ### Committer Checklist (excluded from commit message)
    - [ ] Verify design and implementation matches KIP-145
    - [ ] Verify test coverage and CI build status
    - [ ] Verify documentation (including upgrade notes)


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rhauch/kafka kafka-5142-b

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/4319.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4319
    
----
commit 1c35692da19f3c8c92ce60946a69f576878b958a
Author: Randall Hauch <rha...@gmail.com>
Date:   2017-12-05T17:05:00Z

    KAFKA-5142: Add message headers to Connect API (KIP-145)
    
    Changed the Connect API to add message headers as described in KIP-145.
    
    The new `Header` interface defines an immutable representation of a Kafka 
header (name-value pair) with support for the Connect value types and schemas. 
Kafka headers have a string name and a binary value, which doesn’t align well 
with Connect’s existing data and schema mechanisms. Thus, Connect’s 
`Header` interface provides methods for easily converting between many of the 
built-in primitive, structured, and logical data types. And, as discussed 
below, a new `HeaderConverter` interface is added to define how the Kafka 
header binary values are converted to Connect data objects.
    
    The new `Headers` interface defines an ordered collection of headers and is 
used to track all headers associated with a `ConnectRecord`. Like the Kafka 
headers API, the Connect `Headers` interface allows storing multiple headers 
with the same key in an ordered list. The Connect `Headers` interface is 
mutable and has a number of methods that make it easy for connectors and 
transformations to add, modify, and remove headers from the record, and the 
interface is designed to allow chaining multiple mutating methods.
    
    The existing constructors and methods in `ConnectRecord`, `SinkRecord`, and 
`SourceRecord` are unchanged to maintain backward compatibility, and in these 
situations the records will contain an empty `Headers` object that connectors 
and transforms can modify. There is also an additional constructor that allows 
an existing `Headers` to be passed in. A new overloaded form of `newRecord` 
method was created to allow connectors and transforms to create a new record 
with an entirely new `Headers` object.
    
    A new `HeaderConverter` interface is also defined to enable the Connect 
runtime framework to be able to serialize and deserialize headers between the 
in-memory representation and Kafka’s byte[] representation.
    
    Unit and integration tests are added for `ConnectHeader` and 
`ConnectHeaders`, the two implementation classes for headers. The 
`ConnectRecord` object is already used heavily, so only limited tests need to 
be added while quite a few of the existing tests already cover the changes. 
However, new unit tests were added for `SinkRecord` and `SourceRecord to verify 
the header behavior, including when the `newRecord` methods are called.

commit f398eba326d6c0cc8732770cb3bfc962f0453995
Author: Randall Hauch <rha...@gmail.com>
Date:   2017-12-13T01:27:26Z

    KAFKA-5142: Add message header converters to Connect API (KIP-145)
    
    This is the second commit for the public Connect API changes for KIP-145, 
and deals primarily with `HeaderConverter` implementations.
    
    Connect has three `Converter` implementations, `StringConverter`, 
`JsonConverter` and `ByteArrayConverter`. These were modified to also implement 
`HeaderConverter`, without changing any of the existing functionality.
    
    Like many of our pluggable components in Connect, the `HeaderConverter` 
interface extends `Configurable` that allows implementations to expose a 
`ConfigDef` that describes the supported configuration properties, and a 
`config` method that can be used to initialize the component with provided 
configuration properties. The `StringConverter`, `JsonConverter` and 
`ByteArrayConverter` were changed to support these methods in a backward 
compatible manner. There are now `StringConverterConfig` and 
`JsonConverterConfig` classes that define the `ConfigDef` for the 
implementations; the `ByteArrayConverter` has no configuration properties and 
doesn't need a config class.
    
    Note that the existing `Converter` interface has a special `config` 
signature with a parameter that sas whether the converter is being used for 
keys or values. This is different than the `Configurable.config` signature, so 
this commit adds new `ConverterConfig` abstract class that defines a 
`converter.type` property that can be used to set whether the converter is 
being used for keys, values, or headers. The existing `Converter` methods 
internally set this property based upon the supplied boolean parameter, so the 
default for `converter.type` can be `header`.

commit 14cf25a957ce1a7f0207f3fbdc9da5a30d5f3488
Author: Randall Hauch <rha...@gmail.com>
Date:   2017-12-13T01:28:44Z

    KAFKA-5142: Add message headers to Connect runtime (KIP-145)
    
    This is the third commit for KIP-145 and changes the Connect runtime to 
support headers. Each Connect worker now configures a `HeaderConverter` for 
each connector task, in the same way it creates key and value `Converter` 
instances. This is entirely backward compatible, so that existing worker and 
connector configurations will work without changes. By default, the worker will 
use the `SimpleHeaderConverter` to serialize header values as strings and to 
deserialize them by inferring the schemas.

----


---

Reply via email to