[ 
https://issues.apache.org/jira/browse/KAFKA-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ewen Cheslack-Postava updated KAFKA-5164:
-----------------------------------------
       Resolution: Fixed
    Fix Version/s: 0.11.1.0
                   0.11.0.0
           Status: Resolved  (was: Patch Available)

Issue resolved by pull request 3198
[https://github.com/apache/kafka/pull/3198]

> SetSchemaMetadata does not replace the schemas in structs correctly
> -------------------------------------------------------------------
>
>                 Key: KAFKA-5164
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5164
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 0.10.2.1
>            Reporter: Ewen Cheslack-Postava
>            Assignee: Randall Hauch
>             Fix For: 0.11.0.0, 0.11.1.0
>
>
> In SetSchemaMetadataTest we verify that the name and version of the schema in 
> the record have been replaced:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62
> However, in the case of Structs, the schema will be attached to both the 
> record and the Struct itself. So we correctly rebuild the Record:
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
> https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119
> But if the key or value are a struct, they will still contain the old schema 
> embedded in the struct.
> Ultimately this can lead to validations in other code failing (even for very 
> simple changes like adjusting the name of a schema):
> {code}
> (org.apache.kafka.connect.runtime.WorkerTask:141)
> org.apache.kafka.connect.errors.DataException: Mismatching struct schema
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
>     at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
>     at 
> io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
>     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>     at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> {code}
> The solution to this is probably to check whether we're dealing with a Struct 
> when we use a new schema and potentially copy/reallocate it.
> This particular issue would only appear if we don't modify the data, so I 
> think SetSchemaMetadata is currently the only transformation that would have 
> the issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to