Jennifer Thompson created KAFKA-10428:
-----------------------------------------
Summary: Mirror Maker connect applies base64 encoding to string
headers
Key: KAFKA-10428
URL: https://issues.apache.org/jira/browse/KAFKA-10428
Project: Kafka
Issue Type: Bug
Components: mirrormaker
Affects Versions: 2.6.0, 2.5.0, 2.4.0
Reporter: Jennifer Thompson
MirrorSourceTask takes the header value as bytes from the ConsumerRecord, which
does not have a header schema, and adds it to the SourceRecord headers using
"addBytes". This uses Schema.BYTES as the schema for the header, and somehow,
base64 encoding gets applied when the record gets committed.
This means that my original header value "with_headers" (created with a python
producer, and stored as a 12 character byte array) becomes the string value
"d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 encoded
version of the original. If I try to preempt this using "d2l0aF9oZWFkZXJz" to
start with, and base64 encoding the headers everywhere, it just gets double
encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing through the
MirrorSourceTask.
I think the base64 encoding may be coming from Values#append
(https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674),
but I'm not sure how. That is invoked by
SimpleConnectorHeader#fromConnectHeader via Values#convertToString.
SimpleHeaderConverter#toConnectHeader produces the correct schema in this case,
and solves the problem for me, but it seems to guess at the schema, so I'm not
sure if it is the right solution. Since schemas seem to be required for
SourceRecord headers, but not available from ConsumerRecord headers, I'm not
sure what other option we have. I will open a PR with this solution
--
This message was sent by Atlassian Jira
(v8.3.4#803005)