[ 
https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16948830#comment-16948830
 ] 

Randall Hauch commented on KAFKA-7273:
--------------------------------------

Yes, I think the intention is that the converters are able to modify the 
headers. However, just be clear that the Headers do support multiple header 
pairs with the same key, so converters should be careful about blindly adding a 
header that might have the same key as an existing header.


> Converters should have access to headers.
> -----------------------------------------
>
>                 Key: KAFKA-7273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7273
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Jeremy Custenborder
>            Assignee: Jeremy Custenborder
>            Priority: Major
>             Fix For: 2.4.0
>
>
> I found myself wanting to build a converter that stored additional type 
> information within headers. The converter interface does not allow a 
> developer to access to the headers in a Converter. I'm not suggesting that we 
> change the method for serializing them, rather that 
> *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
> and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
>   default byte[] fromConnectData(String topic, Headers headers, Schema 
> schema, Object object) {
>     return fromConnectData(topic, schema, object);
>   }
>   default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
> payload) {
>     return toConnectData(topic, payload);
>   }
>   void configure(Map<String, ?> var1, boolean var2);
>   byte[] fromConnectData(String var1, Schema var2, Object var3);
>   SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with 
> ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to