Hello, Currently, I am using confluent Kafka Avro serializer to write to Kafka, and in the meanwhile register the schema to confluent schema registry.
The problem here is that our upstream is deserialized from msgpack and converted to a hashmap<String,ImmutableValue>, which is not serializable for avro. The map includes the properties of an event. The key is the field name and the value is the field value. As we have many different msgpack upstreams, and each represents a type of event, we don't want to do the serialization as the official sample codes did. It has to provide an Avro schema file for each upstream, which is hard for us to manage. So I got two ideas here, first, since we can use java reflection to build java POJO from a json/hashmap, is it also possible to build an avro GenericRecord like that? second, generate the avro schema by iterating each incoming hashmap's key and value. I found an implementation here https://stackoverflow.com/questions/60269983/is-there-any-way-to-generate-an-avro-schema-from-a-hashmap-of-values Could anyone provide any recommendations on how to implement this? package com.wish.log.etl.fn; import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.kafka.serializers.*; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecordBuilder; import org.apache.flink.api.common.serialization.SerializationSchema; import org.msgpack.value.ImmutableValue; import java.util.HashMap; import java.util.Map; public class KafkaGenericAvroSerializationSchema implements SerializationSchema<Map<String, ImmutableValue>> { private final String registryUrl = ""; private transient KafkaAvroSerializer inner; private final String topic; public KafkaGenericAvroSerializationSchema (String topic){ this.topic = topic; } private void checkInitialized() { if (inner == null) { Map<String, Object> props = new HashMap<>(); props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, registryUrl); SchemaRegistryClient client = new CachedSchemaRegistryClient( registryUrl, AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT); inner = new KafkaAvroSerializer(client, props); } } @Override public byte[] serialize(Map<String, ImmutableValue> input) { // KafkaAvroSerializer is not serializable, needs to be initialized here checkInitialized(); // official sample codes String userSchema = "{\"type\":\"record\"," + "\"name\":\"myrecord\"," + "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}"; Schema.Parser parser = new Schema.Parser(); Schema schema = parser.parse(userSchema); GenericData.Record record = new GenericRecordBuilder(schema).set("f1","kevin") .set("f2",1234) .build(); // how to serialize my input return inner.serialize(topic, input); } } Best, Kevin