Hi Madhukar,

Thanks for your question. When you instantiate the FlinkKafkaConsumer, you
supply a DeserializationSchema in the constructor. You simply create a
class which implements DeserializationSchema and contains the
KafkaAvroDecoder with the schema registry.

Like so:

public class MyAvroDeserializer implements DeserializationSchema<MyType> {

    private KafkaAvroDecoder decoder;

    public MyAvroDeserializer() {
         SchemaRegistryClient schemaRegistry = new
SchemaRegistryClient(...);
         this.decoder = new KafkaAvroDecoder(schemaRegistry);
    }

    public MyType deserialize(byte[] message) throws Exception {
         return (MyType) this.decoder.fromBytes(messages);
    }

    public boolean isEndOfStream(MyType nextElement) {
         return false;
    }

}

Then you supply this class when creating the consumer:

DeserializationSchema<MyType> decoder = new MyAvroDeserializer()
Properties props = new Properties();
OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA;
FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL;

FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder,
props, offsetStore, fetcherType);


Let me know if that works for you.

Best regards,
Max

On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com>
wrote:

> Hi
>
> I am very new to Avro. Currently I am using confluent Kafka version and I
> am able to write an Avro message to Kafka by storing schema in schema
> registry. Now I need to consume those messages using Flink Kafka Consumer
> and having a hard time to deserialize the messages.
>
> I am looking for an example on how to deserialize Avro message where
> schema is stored in schema registry.
>
> Any help is appreciated. Thanks in Advance.
>
> Thanks,
> Madhu
>
>

Reply via email to