Hi everyone, When using KafkaIO to deserialize to avro SpecificRecords in combination with ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the avro generated classes (theSpecificRecords) and the schema registry schema (used to serialize the given message) mismatch.
My scenario is that my Avro generated classes are ahead of what's in the schema registry. So when deserialization happens, it tries to use the schema registry schema to deserialize to the SpecificRecord class and that fails with field order mismatches or field type mismatches depending on the situation. I know you're thinking that my schema evolution is bad and that I should go sit in the corner. However, only new fields are added to the schema, so it should not be an issue. Has anyone seen this happen to them? What I think happens: 1. ConfluentSchemaRegistryDeserializerProvider configures the AbstractKafkaDeserializer to use the confluent schema as the reader schema. [1][2] 2. If specific.avro.reader is set to true, the ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the ConfluentSchemaRegistryDeserializer sets the reader schema and the AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas are fetched from the schema registry. Both classes fetch the same schema separately. 3. Now this is a problem, because at my understanding, the write schema is used to tell avro what schema was used to serialize the object and the reader schema is used to tell avro what to deserialize those bytes to, in case it's not the same schema. I'm really not sure about this, but I think that's how it works. 4. Because both read and write schema are fetched from the schema registry, but our SpecificRecord class has an evolved schema that is not used, deserialization to that class fails. 5. This is why I think that if specific.avro.reader is set to true, the ConfluentSchemaRegistryDeserializer class should pass the schema fetch from the SpecificRecord class on line [1]. Would anyone be able to confirm or infirm the above logic makes sense? As for my issue, as a workaround, I just wrote a DeserializerProvider that does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but passes in the schema fetched from my SpecificRecord class and serialization works properly. Cheers, Cristian [1] https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133 [2] https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144 [3] https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175 [4] https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152
