[ 
https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jeremy Custenborder updated KAFKA-7273:
---------------------------------------
    Description: 
I found myself wanting to build a converter that stored additional type 
information within headers. The converter interface does not allow a developer 
to access to the headers in a Converter. I'm not suggesting that we change the 
method for serializing them, rather that 
*org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
and *toConnectData*. For example something like this.
{code:java}
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.storage.Converter;

public interface Converter {
default byte[] fromConnectData(String topic, Headers headers, Schema schema, 
Object object) {
return fromConnectData(topic, schema, object);
}

default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
payload) {
return toConnectData(topic, payload);
}

void configure(Map<String, ?> var1, boolean var2);

byte[] fromConnectData(String var1, Schema var2, Object var3);

SchemaAndValue toConnectData(String var1, byte[] var2);
}

{code}
This would be a similar approach to what was already done with 
ExtendedDeserializer and ExtendedSerializer in the Kafka client.

  was:
I found myself wanting to build a converter that stored additional type 
information within headers. The converter interface does not allow a developer 
to access to the headers in a Converter. I'm not suggesting that we change the 
method for serializing them, rather that 
*org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
and *toConnectData*. For example something like this.
{code:java}
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.header.Headers;
import org.apache.kafka.connect.storage.Converter;

public interface ExtendedConverter extends Converter {
  byte[] fromConnectData(String topic, Headers headers, Schema schema, Object 
object);
  SchemaAndValue toConnectData(String topic, Headers headers, byte[] payload);
}

{code}
This would be a similar approach to what was already done with 
ExtendedDeserializer and ExtendedSerializer in the Kafka client.


> Converters should have access to headers.
> -----------------------------------------
>
>                 Key: KAFKA-7273
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7273
>             Project: Kafka
>          Issue Type: Improvement
>          Components: KafkaConnect
>            Reporter: Jeremy Custenborder
>            Priority: Major
>
> I found myself wanting to build a converter that stored additional type 
> information within headers. The converter interface does not allow a 
> developer to access to the headers in a Converter. I'm not suggesting that we 
> change the method for serializing them, rather that 
> *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* 
> and *toConnectData*. For example something like this.
> {code:java}
> import org.apache.kafka.connect.data.Schema;
> import org.apache.kafka.connect.data.SchemaAndValue;
> import org.apache.kafka.connect.header.Headers;
> import org.apache.kafka.connect.storage.Converter;
> public interface Converter {
> default byte[] fromConnectData(String topic, Headers headers, Schema schema, 
> Object object) {
> return fromConnectData(topic, schema, object);
> }
> default SchemaAndValue toConnectData(String topic, Headers headers, byte[] 
> payload) {
> return toConnectData(topic, payload);
> }
> void configure(Map<String, ?> var1, boolean var2);
> byte[] fromConnectData(String var1, Schema var2, Object var3);
> SchemaAndValue toConnectData(String var1, byte[] var2);
> }
> {code}
> This would be a similar approach to what was already done with 
> ExtendedDeserializer and ExtendedSerializer in the Kafka client.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to