Hi! I think this is a known limitation for Flink 1.0 and it is fixed in Flink 1.1
Here is the JIRA ticket: https://issues.apache.org/jira/browse/FLINK-3691 Here is the mail thread: http://mail-archives.apache.org/mod_mbox/flink-user/201604.mbox/%3CCAOFSxKtJXfxRKm2=bplu+xvpwqrwd3c8ynuk3iwk9aqvgrc...@mail.gmail.com%3E You could try and use the latest release candidate to get the fix: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/VOTE-Release-Apache-Flink-1-1-0-RC1-td12723.html The release is also happening, so should be out in a stable release soon. Greetings, Stephan On Tue, Aug 2, 2016 at 4:04 PM, Alam, Zeeshan <zeeshan.a...@fmr.com> wrote: > Hi, > > > > I am using *Flink 1.0.3* and *FlinkKafkaConsumer08* to read AVRO data > from flink. I am having the* AVRO schema file* with me which was used to > write data in Kafka. Here > https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html > you have mentioned that using the GenericData.Record type is possible with > Flink, but not recommended. Since the record contains the full schema, its > very data intensive and thus probably slow to use. So what is the > recommended way to read AVRO data from Kafka using flink. > > > > *public* *static* *void* main(String[] args) *throws* Exception { > > StreamExecutionEnvironment env = StreamExecutionEnvironment. > *getExecutionEnvironment*(); > > Properties properties = *new* Properties(); > > properties.setProperty("bootstrap.servers", > "dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092"); > > properties.setProperty("zookeeper.connect", > "dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181"); > > properties.setProperty("group.id", "Zeeshantest"); > > AvroDeserializationSchema<GenericData.Record> avroSchema = > *new* AvroDeserializationSchema<>(GenericData.Record.*class*); > > FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = > *new* FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties); > > DataStream<GenericData.Record> messageStream = env > .addSource(kafkaConsumer); > > messageStream.rebalance().print(); > > env.execute("Flink AVRO KAFKA Test"); > > } > > > > This is the *AvroDeserializationSchema* that I am using. > > > > > > *public* *class* AvroDeserializationSchema<T> *implements* > DeserializationSchema<T> { > > > > *private* *static* *final* *long* *serialVersionUID* = > 4330538776656642778L; > > > > *private* *final* Class<T> avroType; > > *private* *transient* DatumReader<T> reader; > > *private* *transient* BinaryDecoder decoder; > > > > *public* AvroDeserializationSchema(Class<T> avroType) { > > *this*.avroType = avroType; > > } > > > > @Override > > *public* T deserialize(*byte*[] message) { > > ensureInitialized(); > > *try* { > > decoder = DecoderFactory.*get*().binaryDecoder( > message, decoder); > > *return* reader.read(*null*, decoder); > > } *catch* (Exception e) { > > *throw* *new* RuntimeException(e); > > } > > } > > > > @Override > > *public* *boolean* isEndOfStream(T nextElement) { > > *return* *false*; > > } > > > > @Override > > *public* TypeInformation<T> getProducedType() { > > *return* TypeExtractor.*getForClass*(avroType); > > } > > > > *private* *void* ensureInitialized() { > > *if* (reader == *null*) { > > *if* (org.apache.avro.specific.SpecificRecordBase. > *class*.isAssignableFrom(avroType)) { > > reader = *new* SpecificDatumReader<T>(avroType > ); > > } *else* { > > reader = *new* ReflectDatumReader<T>(avroType); > > } > > } > > } > > } > > > > On running this I am getting *java.lang.Exception*: Not a Specific class: > class org.apache.avro.generic.GenericData$Record. > > > > *Thanks & Regards* > > *Zeeshan Alam * > > [image: cid:image001.jpg@01CFC06C.80406AE0] > > *[image: cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982 [image: > cid:image003.jpg@01CFC2B0.B0315750] +91 7259501608 <%2B91%207259501608>* > > *Fidelity Internal Information* > <http://fnw.fmr.com/issg/Popi_def-ex.html#internal> > > > > *Techworks Monitoring link* > <https://techworks.fmr.com/products/monitoring-overview> > > > > >