[ 
https://issues.apache.org/jira/browse/KAFKA-10428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190370#comment-17190370
 ] 

Jennifer Thompson commented on KAFKA-10428:
-------------------------------------------

Setting 

{{"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"}}

in the connector config fixes the issue.

> Mirror Maker connect applies base64 encoding to string headers
> --------------------------------------------------------------
>
>                 Key: KAFKA-10428
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10428
>             Project: Kafka
>          Issue Type: Bug
>          Components: mirrormaker
>    Affects Versions: 2.4.0, 2.5.0, 2.6.0
>            Reporter: Jennifer Thompson
>            Priority: Major
>
> MirrorSourceTask takes the header value as bytes from the ConsumerRecord, 
> which does not have a header schema, and adds it to the SourceRecord headers 
> using "addBytes". This uses Schema.BYTES as the schema for the header, and 
> somehow, base64 encoding gets applied when the record gets committed.
> This means that my original header value "with_headers" (created with a 
> python producer, and stored as a 12 character byte array) becomes the string 
> value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 
> encoded version of the original. If I try to preempt this using 
> "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, 
> it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing 
> through the MirrorSourceTask.
> I think the base64 encoding may be coming from Values#append 
> (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674),
>  but I'm not sure how. That is invoked by 
> SimpleConnectorHeader#fromConnectHeader via Values#convertToString.
> SimpleHeaderConverter#toConnectHeader produces the correct schema in this 
> case, and solves the problem for me, but it seems to guess at the schema, so 
> I'm not sure if it is the right solution. Since schemas seem to be required 
> for SourceRecord headers, but not available from ConsumerRecord headers, I'm 
> not sure what other option we have. I will open a PR with this solution



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to