[
https://issues.apache.org/jira/browse/KAFKA-13505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17454057#comment-17454057
]
Guus De Graeve commented on KAFKA-13505:
----------------------------------------
[~jcustenborder] we got it to work using your transformer! You have no idea how
much you helped us out. Thanks a lot.
For reference, this is how we fixed it in our connector configs:
{code:java}
...
"transforms": "NormalizeSchema",
"transforms.NormalizeSchema.type":"com.github.jcustenborder.kafka.connect.transform.common.NormalizeSchema$Value",
... {code}
> Kafka Connect should respect Avro 1.10.X enum defaults spec
> -----------------------------------------------------------
>
> Key: KAFKA-13505
> URL: https://issues.apache.org/jira/browse/KAFKA-13505
> Project: Kafka
> Issue Type: Bug
> Components: KafkaConnect
> Reporter: Guus De Graeve
> Priority: Major
>
> We are using Kafka Connect to pipe data from Kafka topics into parquet files
> on S3. Our Kafka data is serialised using Avro (schema registry). We use the
> Amazon S3 Sink Connector for this.
> Up until recently we would set "schema.compatibility" to "NONE" in our
> connectors, but this had the pain-full side-effect that during deploys of our
> application we got huge file explosions (lots of very small files in HDFS /
> S3). This happens because kafka connect will create a new file every time the
> schema id of a log changes compared to the previous log. During deploys of
> our applications (which can take up to 20 minutes) multiple logs of mixed
> schema ids are inevitable and given the huge amounts of logs file explosions
> of up to a million files weren't uncommon.
> To solve this problem we switched all our connectors "schema.compatibility"
> to "BACKWARD", which should only create a new file when a higher schema id is
> detected and deserialise all logs with the latest known schema id. Which
> should only create one new file during deploys.
> An example connector config:
> {code:java}
> {
> "name": "hdfs-Project_Test_Subject",
> "config": {
> "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
> "partition.duration.ms": "86400000",
> "topics.dir": "/user/kafka/Project",
> "hadoop.conf.dir": "/opt/hadoop/conf",
> "flush.size": "1000000",
> "schema.compatibility": "BACKWARD",
> "topics": "Project_Test_Subject",
> "timezone": "UTC",
> "hdfs.url": "hdfs://hadoophost:9000",
> "value.converter.value.subject.name.strategy":
> "io.confluent.kafka.serializers.subject.TopicNameStrategy",
> "rotate.interval.ms": "7200000",
> "locale": "C",
> "hadoop.home": "/opt/hadoop",
> "logs.dir": "/user/kafka/_logs",
> "format.class": "io.confluent.connect.hdfs.parquet.ParquetFormat",
> "partitioner.class":
> "io.confluent.connect.storage.partitioner.TimeBasedPartitioner",
> "name": "hdfs-Project_Test_Subject",
> "errors.tolerance": "all",
> "storage.class": "io.confluent.connect.hdfs.storage.HdfsStorage",
> "path.format": "YYYY/MM/dd"
> }
> }{code}
> However, we have lots of enum fields in our data records (avro schemas) to
> which subjects get added frequently, and this is causing issues with our
> Kafka Connect connectors FAILING with these kinds of errors:
> {code:java}
> Schema parameters not equal. source parameters:
> {io.confluent.connect.avro.enum.default.testfield=null,
> io.confluent.connect.avro.Enum=Ablo.testfield,
> io.confluent.connect.avro.Enum.null=null,
> io.confluent.connect.avro.Enum.value1=value1,
> io.confluent.connect.avro.Enum.value2=value2} and target parameters:
> {io.confluent.connect.avro.enum.default.testfield=null,
> io.confluent.connect.avro.Enum=Ablo.testfield,
> io.confluent.connect.avro.Enum.null=null,
> io.confluent.connect.avro.Enum.value1=value1,
> io.confluent.connect.avro.Enum.value2=value2,
> io.confluent.connect.avro.Enum.value3=value3}{code}
> Since Avro 1.10.X specification, enum values support defaults, which makes
> schema evolution possible even when adding subjects (values) to an enum. When
> testing our schemas for compatibility using the Schema Registry api we always
> get "is_compatible" => true. So schema evolution should in theory not be a
> problem.
> The error above is thrown in the *SchemaProjector* class which is part of
> Kafka Connect, more specifically in the function
> {*}checkMaybeCompatible(){*}. It seems like this function is not respecting
> the Avro 1.10.X specification for enum schema evolution, and I'm not sure if
> it is meant to respect it? As we currently don't have any other routes to fix
> this issue and returning to the "NONE" schema compatibility is no options
> considering the file explosions, we're kinda stuck here.
> This issue was discussed more in detail on the Confluent forum in this thread:
> [https://forum.confluent.io/t/should-will-kafka-connect-support-schema-evolution-using-avro-1-10-x-enum-defaults/3076/8]
> Adem from Confluent is quite confident this is a bug and asked me to file a
> bug report here.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)