Thanks for the question! Interesting...

I didn’t go deep into the details yet (I will!) but can it be related to this 
change? [1][2]



> On 28 Jul 2022, at 22:43, Cristian Constantinescu <> wrote:
> Attaching these two links which kinda point in the same direction as my 
> previous e-mail:
> <>
> <>
> On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu < 
> <>> wrote:
> Hi everyone,
> When using KafkaIO to deserialize to avro SpecificRecords in combination with 
> ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the 
> avro generated classes (theSpecificRecords) and the schema registry schema 
> (used to serialize the given message) mismatch.
> My scenario is that my Avro generated classes are ahead of what's in the 
> schema registry. So when deserialization happens, it tries to use the schema 
> registry schema to deserialize to the SpecificRecord class and that fails 
> with field order mismatches or field type mismatches depending on the 
> situation.
> I know you're thinking that my schema evolution is bad and that I should go 
> sit in the corner. However, only new fields are added to the schema, so it 
> should not be an issue.
> Has anyone seen this happen to them?
> What I think happens:
> 1. ConfluentSchemaRegistryDeserializerProvider configures the 
> AbstractKafkaDeserializer to use the confluent schema as the reader schema. 
> [1][2]
> 2. If specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls 
> AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the 
> ConfluentSchemaRegistryDeserializer sets the reader schema and the 
> AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas 
> are fetched from the schema registry. Both classes fetch the same schema 
> separately.
> 3. Now this is a problem, because at my understanding, the write schema is 
> used to tell avro what schema was used to serialize the object and the reader 
> schema is used to tell avro what to deserialize those bytes to, in case it's 
> not the same schema. I'm really not sure about this, but I think that's how 
> it works.
> 4. Because both read and write schema are fetched from the schema registry, 
> but our SpecificRecord class has an evolved schema that is not used, 
> deserialization to that class fails.
> 5. This is why I think that if specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer class should pass the schema fetch from 
> the SpecificRecord class on line [1].
> Would anyone be able to confirm or infirm the above logic makes sense?
> As for my issue, as a workaround, I just wrote a DeserializerProvider that 
> does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but 
> passes in the schema fetched from my SpecificRecord class and serialization 
> works properly.
> Cheers,
> Cristian
> [1] 
> <>[2]
> <>
> [3] 
> <>
> [4] 
> <>

Reply via email to