Hello,

Currently, I am using confluent Kafka Avro serializer to write to Kafka,
and in the meanwhile register the schema to confluent schema registry.

The problem here is that our upstream is deserialized from msgpack and
converted to a hashmap<String,ImmutableValue>, which is not serializable
for avro. The map includes the properties of an event.  The key is the
field name and the value is the field value. As we have many different
msgpack upstreams, and each represents a type of event, we don't want to do
the serialization as the official sample codes did. It has to provide an
Avro schema file for each upstream, which is hard for us to manage.

So I got two ideas here,
first, since we can use java reflection to build java POJO from a
json/hashmap, is it also possible to build an avro GenericRecord like that?

second, generate the avro schema by iterating each incoming hashmap's key
and value. I found an implementation here
https://stackoverflow.com/questions/60269983/is-there-any-way-to-generate-an-avro-schema-from-a-hashmap-of-values

Could anyone provide any recommendations on how to implement this?

package com.wish.log.etl.fn;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.*;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.msgpack.value.ImmutableValue;

import java.util.HashMap;
import java.util.Map;

public class KafkaGenericAvroSerializationSchema implements
SerializationSchema<Map<String, ImmutableValue>> {

    private final String registryUrl = "";
    private transient KafkaAvroSerializer inner;
    private final String topic;

    public KafkaGenericAvroSerializationSchema (String topic){
        this.topic = topic;
    }

    private void checkInitialized() {
        if (inner == null) {
            Map<String, Object> props = new HashMap<>();
            props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,
registryUrl);
            SchemaRegistryClient client =
                    new CachedSchemaRegistryClient(
                            registryUrl,
AbstractKafkaAvroSerDeConfig.MAX_SCHEMAS_PER_SUBJECT_DEFAULT);
            inner = new KafkaAvroSerializer(client, props);
        }
    }

    @Override
    public byte[] serialize(Map<String, ImmutableValue> input) {
        // KafkaAvroSerializer is not serializable, needs to be initialized here
        checkInitialized();

        // official sample codes
        String userSchema = "{\"type\":\"record\"," +
                "\"name\":\"myrecord\"," +

"\"fields\":[{\"name\":\"f1\",\"type\":\"string\"},{\"name\":\"f2\",\"type\":\"int\",\"default\":0}]}";
        Schema.Parser parser = new Schema.Parser();
        Schema schema = parser.parse(userSchema);
        GenericData.Record record = new
GenericRecordBuilder(schema).set("f1","kevin")
                .set("f2",1234)
                .build();

        // how to serialize my input

        return inner.serialize(topic, input);
    }

}

Best,
Kevin

Reply via email to