Hi, I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I am having the AVRO schema file with me which was used to write data in Kafka. Here https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html you have mentioned that using the GenericData.Record type is possible with Flink, but not recommended. Since the record contains the full schema, its very data intensive and thus probably slow to use. So what is the recommended way to read AVRO data from Kafka using flink.
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092"); properties.setProperty("zookeeper.connect", "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181"); properties.setProperty("group.id", "Zeeshantest"); AvroDeserializationSchema<GenericData.Record> avroSchema = new AvroDeserializationSchema<>(GenericData.Record.class); FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); DataStream<GenericData.Record> messageStream = env.addSource(kafkaConsumer); messageStream.rebalance().print(); env.execute("Flink AVRO KAFKA Test"); } This is the AvroDeserializationSchema that I am using. public class AvroDeserializationSchema<T> implements DeserializationSchema<T> { private static final long serialVersionUID = 4330538776656642778L; private final Class<T> avroType; private transient DatumReader<T> reader; private transient BinaryDecoder decoder; public AvroDeserializationSchema(Class<T> avroType) { this.avroType = avroType; } @Override public T deserialize(byte[] message) { ensureInitialized(); try { decoder = DecoderFactory.get().binaryDecoder(message, decoder); return reader.read(null, decoder); } catch (Exception e) { throw new RuntimeException(e); } } @Override public boolean isEndOfStream(T nextElement) { return false; } @Override public TypeInformation<T> getProducedType() { return TypeExtractor.getForClass(avroType); } private void ensureInitialized() { if (reader == null) { if (org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) { reader = new SpecificDatumReader<T>(avroType); } else { reader = new ReflectDatumReader<T>(avroType); } } } } On running this I am getting java.lang.Exception: Not a Specific class: class org.apache.avro.generic.GenericData$Record. Thanks & Regards Zeeshan Alam [cid:image001.jpg@01CFC06C.80406AE0] [cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982 [cid:image003.jpg@01CFC2B0.B0315750] +91 7259501608 Fidelity Internal Information<http://fnw.fmr.com/issg/Popi_def-ex.html#internal> Techworks Monitoring link<https://techworks.fmr.com/products/monitoring-overview>