Hi Madhukar, Thanks for your question. When you instantiate the FlinkKafkaConsumer, you supply a DeserializationSchema in the constructor. You simply create a class which implements DeserializationSchema and contains the KafkaAvroDecoder with the schema registry.
Like so: public class MyAvroDeserializer implements DeserializationSchema<MyType> { private KafkaAvroDecoder decoder; public MyAvroDeserializer() { SchemaRegistryClient schemaRegistry = new SchemaRegistryClient(...); this.decoder = new KafkaAvroDecoder(schemaRegistry); } public MyType deserialize(byte[] message) throws Exception { return (MyType) this.decoder.fromBytes(messages); } public boolean isEndOfStream(MyType nextElement) { return false; } } Then you supply this class when creating the consumer: DeserializationSchema<MyType> decoder = new MyAvroDeserializer() Properties props = new Properties(); OffsetStore offsetStore = FlinkKafkaConsumer.OffsetStore.KAFKA; FetcherType fetcherType = FlinkKafkaConsumer.FetcherType.LEGACY_LOW_LEVEL; FlinkKafkaConsumer consumer = new FlinkKafkaConsumer("myTopic"), decoder, props, offsetStore, fetcherType); Let me know if that works for you. Best regards, Max On Thu, Nov 19, 2015 at 9:51 AM, Madhukar Thota <madhukar.th...@gmail.com> wrote: > Hi > > I am very new to Avro. Currently I am using confluent Kafka version and I > am able to write an Avro message to Kafka by storing schema in schema > registry. Now I need to consume those messages using Flink Kafka Consumer > and having a hard time to deserialize the messages. > > I am looking for an example on how to deserialize Avro message where > schema is stored in schema registry. > > Any help is appreciated. Thanks in Advance. > > Thanks, > Madhu > >