Hey Bruno, Thanks for your response.
Below you can find more details about serialization. private SpecificAvroSerde<FooKey> fooKeySerde; ... @PostConstruct void postConstruct() { this.fooKeySerde = configureSerde(true); } private <T> SpecificAvroSerde<T> configureSerde(boolean isSerdeForRecordKeys) { SpecificAvroSerde<T> specificAvroSerde = new SpecificAvroSerde<>(schemaRegistryClient); specificAvroSerde.configure(serdeConfig, isSerdeForRecordKeys); return specificAvroSerde; } Where SpecificAvroSerde is a delegate class implemented like that: import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import java.util.Map; import org.apache.kafka.common.annotation.InterfaceStability.Unstable; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; @Unstable public class SpecificAvroSerde<T> implements Serde<T> { private final Serde<T> inner; public SpecificAvroSerde() { this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new SpecificAvroDeserializer()); } public SpecificAvroSerde(SchemaRegistryClient client) { if (client == null) { throw new IllegalArgumentException("schema registry client must not be null"); } else { this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new SpecificAvroDeserializer(client)); } } public Serializer<T> serializer() { return this.inner.serializer(); } public Deserializer<T> deserializer() { return this.inner.deserializer(); } public void configure(Map<String, ?> serdeConfig, boolean isSerdeForRecordKeys) { this.inner.serializer().configure(serdeConfig, isSerdeForRecordKeys); this.inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys); } public void close() { this.inner.serializer().close(); this.inner.deserializer().close(); } } Also, mvn dependency:tree | grep avro returns: mvn dependency:tree | grep avro [INFO] +- io.confluent:kafka-streams-avro-serde:jar:7.0.1:compile [INFO] | +- io.confluent:kafka-avro-serializer:jar:7.0.1:compile [INFO] | \- org.apache.avro:avro:jar:1.10.1:compile [INFO] +- in.house.library.containing.avro.schemas:avro:jar:1.18.5:compile [INFO] | +- org.apache.avro:avro-maven-plugin:jar:1.10.0:compile [INFO] | | \- org.apache.avro:avro-compiler:jar:1.10.0:compile Cheers, Mariusz On Wed, Sep 20, 2023 at 4:01 PM Bruno Cadonna <cado...@apache.org> wrote: > Hi Mariusz, > > How is fooKey de-/serialized? > > I ask that because maybe the serializer for fooKey cannot handle the > extended enum. > > Best, > Bruno > > On 9/20/23 12:22 AM, M M wrote: > > Hello, > > > > This is my first time asking a question on a mailing list, so please > > forgive me any inaccuracies. > > > > I am having a Kafka Streams application with a Punctuator. > > Inside the punctuate() I have this code: > > > > // fooStore is a state store > > for (FooKey fooKey : fooKeysRepository.get(Type.FOO)) { Foo foo = > > fooStore.get(fooKey); // foo != null > > > > This returns a Foo which is not null. > > > > FooKey has fields: [string(nullable), enum(nullable), string(nullable)] > > > > I extended enum with a new value, say BAZ (I appended that at the end of > > the enum) so it looks like: > > "symbols": [ > > "A", > > "B", > > ...., > > "UNKNOWN" > > "BAZ" > > ], > > "default": "UNKNOWN" > > > > and suddenly after this change, > > for (FooKey fooKey : fooKeysRepository.get(Type.FOO)) { Foo foo = > > fooStore.get(fooKey); // foo == null > > > > I'm at a loss to understand why this is happening. There is no exception, > > no trace in application logs, nothing. > > > > What's more, the FooKey is always represented as {"string", null, > "string"} > > so the value for enum is consistently null. > > > > Cheers, > > Mariusz > > >