Hey Bruno,

Thanks for your response.

Below you can find more details about serialization.


private SpecificAvroSerde<FooKey> fooKeySerde;
...

@PostConstruct
  void postConstruct() {
  this.fooKeySerde = configureSerde(true);
}

private <T> SpecificAvroSerde<T> configureSerde(boolean
isSerdeForRecordKeys) {
  SpecificAvroSerde<T> specificAvroSerde = new
SpecificAvroSerde<>(schemaRegistryClient);
  specificAvroSerde.configure(serdeConfig, isSerdeForRecordKeys);
  return specificAvroSerde;
}

Where SpecificAvroSerde is a delegate class implemented like that:

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import java.util.Map;
import org.apache.kafka.common.annotation.InterfaceStability.Unstable;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;

@Unstable
public class SpecificAvroSerde<T> implements Serde<T> {
  private final Serde<T> inner;

  public SpecificAvroSerde() {
    this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(), new
SpecificAvroDeserializer());
  }

  public SpecificAvroSerde(SchemaRegistryClient client) {
    if (client == null) {
      throw new IllegalArgumentException("schema registry client must not
be null");
    } else {
      this.inner = Serdes.serdeFrom(new SpecificAvroSerializer(client), new
SpecificAvroDeserializer(client));
    }
  }

  public Serializer<T> serializer() {
    return this.inner.serializer();
  }

  public Deserializer<T> deserializer() {
    return this.inner.deserializer();
  }

  public void configure(Map<String, ?> serdeConfig, boolean
isSerdeForRecordKeys) {
    this.inner.serializer().configure(serdeConfig, isSerdeForRecordKeys);
    this.inner.deserializer().configure(serdeConfig, isSerdeForRecordKeys);
  }

  public void close() {
    this.inner.serializer().close();
    this.inner.deserializer().close();
  }
}

Also,

mvn dependency:tree | grep avro

returns:

mvn dependency:tree | grep avro
[INFO] +- io.confluent:kafka-streams-avro-serde:jar:7.0.1:compile
[INFO] |  +- io.confluent:kafka-avro-serializer:jar:7.0.1:compile
[INFO] |  \- org.apache.avro:avro:jar:1.10.1:compile
[INFO] +- in.house.library.containing.avro.schemas:avro:jar:1.18.5:compile
[INFO] |  +- org.apache.avro:avro-maven-plugin:jar:1.10.0:compile
[INFO] |  |  \- org.apache.avro:avro-compiler:jar:1.10.0:compile


Cheers,
Mariusz

On Wed, Sep 20, 2023 at 4:01 PM Bruno Cadonna <cado...@apache.org> wrote:

> Hi Mariusz,
>
> How is fooKey de-/serialized?
>
> I ask that because maybe the serializer for fooKey cannot handle the
> extended enum.
>
> Best,
> Bruno
>
> On 9/20/23 12:22 AM, M M wrote:
> > Hello,
> >
> > This is my first time asking a question on a mailing list, so please
> > forgive me any inaccuracies.
> >
> > I am having a Kafka Streams application with a Punctuator.
> > Inside the punctuate() I have this code:
> >
> > // fooStore is a state store
> > for (FooKey fooKey : fooKeysRepository.get(Type.FOO)) { Foo foo =
> > fooStore.get(fooKey); // foo != null
> >
> > This returns a Foo which is not null.
> >
> > FooKey has fields: [string(nullable), enum(nullable), string(nullable)]
> >
> > I extended enum with a new value, say BAZ (I appended that at the end of
> > the enum) so it looks like:
> > "symbols": [
> > "A",
> > "B",
> > ....,
> > "UNKNOWN"
> > "BAZ"
> > ],
> > "default": "UNKNOWN"
> >
> > and suddenly after this change,
> > for (FooKey fooKey : fooKeysRepository.get(Type.FOO)) { Foo foo =
> > fooStore.get(fooKey); // foo == null
> >
> > I'm at a loss to understand why this is happening. There is no exception,
> > no trace in application logs, nothing.
> >
> > What's more, the FooKey is always represented as {"string", null,
> "string"}
> > so the value for enum is consistently null.
> >
> > Cheers,
> > Mariusz
> >
>

Reply via email to