[ 
https://issues.apache.org/jira/browse/KAFKA-5983?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16596998#comment-16596998
 ] 

Jordan Moore edited comment on KAFKA-5983 at 8/30/18 2:36 AM:
--------------------------------------------------------------

First, MirrorMaker isn't a Connect, so I don't think "key/value converter" 
would work. The internal values definitely aren't recognized.

You could try to make the destination registry be a slave. Otherwise, the only 
reason I see have two registries would if you want topics of the same name in 
two clusters with different schemas. 

My suggestion for a "fix" and something I've worked on before would be to 
implement a MessageHandler that reads the schema ID of the source message, does 
a lookup in the source registry, gets the schema, and then uploads it to the 
destination registry for your topic. 

If you add the CachedSchemaRegistryClient class to the handler, then it won't 
be doing a HTTP call for every message. 

See example handler that renames a topic here. 
https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler


was (Author: cricket007):
You could try to make the destination registry be a slave. Otherwise, the only 
reason I see have two registries would if you want topics of the same name in 
two clusters with different schemas. 

My suggestion for a "fix" and something I've worked on before would be to 
implement a MessageHandler that reads the schema ID of the source message, does 
a lookup in the source registry, gets the schema, and then uploads it to the 
destination registry for your topic. 

If you add the CachedSchemaRegistryClient class to the handler, then it won't 
be doing a HTTP call for every message. 

See example handler that renames a topic here. 
https://github.com/gwenshap/kafka-examples/tree/master/MirrorMakerHandler

> Cannot mirror Avro-encoded data using the Apache Kafka MirrorMaker
> ------------------------------------------------------------------
>
>                 Key: KAFKA-5983
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5983
>             Project: Kafka
>          Issue Type: Bug
>          Components: core
>    Affects Versions: 0.11.0.0
>         Environment: OS: Linux CentOS 7 and Windows 10
>            Reporter: Giulio Vito de Musso
>            Priority: Major
>              Labels: windows
>
> I'm installing an Apache Kafka MirrorMaker instance to replicate one cluster 
> data to one another cluster. Both on the source and on the target clusters 
> I'm using the Confluent Avro schema registry and the data is binarized with 
> Avro.
> I'm using the latest released version of Confluent 3.3.0 (kafka 0.11). 
> Moreover, the source broker is on a Windows machine while the target broker 
> is on a Linux machine.
> The two Kafka clusters are independent, thus they have different schema 
> registries.
> This are my configuration files for the MirrroMaker
> {code:title=consumer.properties|borderStyle=solid}
> group.id=test-mirrormaker-group
> bootstrap.servers=host01:9092
> exclude.internal.topics=true
> client.id=mirror_maker_consumer0
> auto.commit.enabled=false
> # Avro schema registry properties
> key.converter=io.confluent.connect.avro.AvroConverter
> key.converter.schema.registry.url=http://host01:8081
> value.converter=io.confluent.connect.avro.AvroConverter
> value.converter.schema.registry.url=http://host01:8081
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> {code}
> {code:title=producer.properties|borderStyle=solid}
> bootstrap.servers=host02:9093
> compression.type=none
> acks=1
> client.id=mirror_maker_producer0
> # Avro schema registry properties
> key.converter=io.confluent.connect.avro.AvroConverter
> key.converter.schema.registry.url=http://host02:8081
> value.converter=io.confluent.connect.avro.AvroConverter
> value.converter.schema.registry.url=http://host02:8081
> internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> internal.key.converter.schemas.enable=false
> internal.value.converter.schemas.enable=false
> {code}
> I run the MirrorMaker on the host01 Windows machine with this command
> {code}
> C:\kafka>.\bin\windows\kafka-mirror-maker.bat --consumer.config 
> .\etc\kafka\consumer.properties --producer.config 
> .\etc\kafka\producer.properties --whitelist=MY_TOPIC
> [2017-09-26 10:09:58,555] WARN The configuration 
> 'internal.key.converter.schemas.enable' was supplied but isn't a known 
> config. (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,555] WARN The configuration 
> 'value.converter.schema.registry.url' was supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,571] WARN The configuration 'internal.key.converter' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,586] WARN The configuration 
> 'internal.value.converter.schemas.enable' was supplied but isn't a known 
> config. (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,602] WARN The configuration 'internal.value.converter' 
> was supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,633] WARN The configuration 'value.converter' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,649] WARN The configuration 'key.converter' was supplied 
> but isn't a known config. (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,649] WARN The configuration 
> 'key.converter.schema.registry.url' was supplied but isn't a known config. 
> (org.apache.kafka.clients.producer.ProducerConfig)
> [2017-09-26 10:09:58,727] WARN The configuration 
> 'internal.key.converter.schemas.enable' was supplied but isn't a known 
> config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,727] WARN The configuration 
> 'value.converter.schema.registry.url' was supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,727] WARN The configuration 'internal.key.converter' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,742] WARN The configuration 'auto.commit.enabled' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,774] WARN The configuration 
> 'internal.value.converter.schemas.enable' was supplied but isn't a known 
> config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,789] WARN The configuration 'internal.value.converter' 
> was supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,805] WARN The configuration 'value.converter' was 
> supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,805] WARN The configuration 'key.converter' was supplied 
> but isn't a known config. (org.apache.kafka.clients.consumer.ConsumerConfig)
> [2017-09-26 10:09:58,821] WARN The configuration 
> 'key.converter.schema.registry.url' was supplied but isn't a known config. 
> (org.apache.kafka.clients.consumer.ConsumerConfig)
> {code}
> Using the topic UI utility (https://github.com/Landoop/kafka-topics-ui) I can 
> see that on the target broker the data is sent, but it is shown binarized and 
> I think this is caused by the misconfiguration of the schema registry.
> It seems that the MirrorMaker serializes both key and value data with the 
> _ByteArraySerializer_, so it ignores the Avro Schema registry case
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/tools/MirrorMaker.scala#L237
> It would be very useful if the Kafka MirrorMaker would read the key/value 
> serialization class parameters for producer and consumer, allowing to 
> configue the Avro schema serde.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to