Thanks for the question! Interesting...

I didn’t go deep into the details yet (I will!) but can it be related to this 
change? [1][2]

[1] https://issues.apache.org/jira/browse/BEAM-10759 
<https://issues.apache.org/jira/browse/BEAM-10759>
[2] https://github.com/apache/beam/pull/12630 
<https://github.com/apache/beam/pull/12630>

—
Alexey

> On 28 Jul 2022, at 22:43, Cristian Constantinescu <zei...@gmail.com> wrote:
> 
> Attaching these two links which kinda point in the same direction as my 
> previous e-mail:
> 
> https://ambitious.systems/avro-writers-vs-readers-schema 
> <https://ambitious.systems/avro-writers-vs-readers-schema>
> https://ambitious.systems/avro-schema-resolution 
> <https://ambitious.systems/avro-schema-resolution>
> 
> On Thu, Jul 28, 2022 at 4:31 PM Cristian Constantinescu <zei...@gmail.com 
> <mailto:zei...@gmail.com>> wrote:
> Hi everyone,
> 
> When using KafkaIO to deserialize to avro SpecificRecords in combination with 
> ConfluentSchemaRegistryDeserializerProvider, it fails when the schema in the 
> avro generated classes (theSpecificRecords) and the schema registry schema 
> (used to serialize the given message) mismatch.
> 
> My scenario is that my Avro generated classes are ahead of what's in the 
> schema registry. So when deserialization happens, it tries to use the schema 
> registry schema to deserialize to the SpecificRecord class and that fails 
> with field order mismatches or field type mismatches depending on the 
> situation.
> 
> I know you're thinking that my schema evolution is bad and that I should go 
> sit in the corner. However, only new fields are added to the schema, so it 
> should not be an issue.
> 
> Has anyone seen this happen to them?
> 
> What I think happens:
> 
> 1. ConfluentSchemaRegistryDeserializerProvider configures the 
> AbstractKafkaDeserializer to use the confluent schema as the reader schema. 
> [1][2]
> 2. If specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer (Beam owned) [3] eventually calls 
> AbstractKafkaAvroDeserializer (Confluent owned)[4]. Effectively, the 
> ConfluentSchemaRegistryDeserializer sets the reader schema and the 
> AbstractKafkaAvroDeserializer sets the writer schema. However, both schemas 
> are fetched from the schema registry. Both classes fetch the same schema 
> separately.
> 3. Now this is a problem, because at my understanding, the write schema is 
> used to tell avro what schema was used to serialize the object and the reader 
> schema is used to tell avro what to deserialize those bytes to, in case it's 
> not the same schema. I'm really not sure about this, but I think that's how 
> it works.
> 4. Because both read and write schema are fetched from the schema registry, 
> but our SpecificRecord class has an evolved schema that is not used, 
> deserialization to that class fails.
> 5. This is why I think that if specific.avro.reader is set to true, the 
> ConfluentSchemaRegistryDeserializer class should pass the schema fetch from 
> the SpecificRecord class on line [1].
> 
> Would anyone be able to confirm or infirm the above logic makes sense?
> 
> As for my issue, as a workaround, I just wrote a DeserializerProvider that 
> does exactly what the ConfluentSchemaRegistryDeserializerProvider does, but 
> passes in the schema fetched from my SpecificRecord class and serialization 
> works properly.
> 
> Cheers,
> Cristian
> 
> [1] 
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133
>  
> <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L133>[2]
>  
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144
>  
> <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L144>
> [3] 
> https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175
>  
> <https://github.com/apache/beam/blob/3a6100d7af5abd3655afe9e8cd52f406044979df/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConfluentSchemaRegistryDeserializerProvider.java#L175>
> [4] 
> https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152
>  
> <https://github.com/confluentinc/schema-registry/blob/2c2a356755b000e524123f9676ace98bd3c74f59/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroDeserializer.java#L152>

Reply via email to