Hi,

I am using Flink 1.0.3 and FlinkKafkaConsumer08 to read AVRO data from flink. I 
am having the AVRO schema file with me which was used to write data in Kafka. 
Here 
https://ci.apache.org/projects/flink/flink-docs-release-0.8/example_connectors.html
 you have mentioned that using the GenericData.Record type is possible with 
Flink, but not recommended. Since the record contains the full schema, its very 
data intensive and thus probably slow to use. So what is the recommended way to 
read AVRO data from Kafka using flink.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", 
"dojo3xxxxx:9092,dojoxxxxx:9092,dojoxxxxx:9092");
              properties.setProperty("zookeeper.connect", 
"dojo3xxxxx:2181,dojoxxxxx:2181,dojoxxxxx:2181");
              properties.setProperty("group.id", "Zeeshantest");
              AvroDeserializationSchema<GenericData.Record> avroSchema = new 
AvroDeserializationSchema<>(GenericData.Record.class);
              FlinkKafkaConsumer08<GenericData.Record> kafkaConsumer = new 
FlinkKafkaConsumer08<>("myavrotopic", avroSchema, properties);
              DataStream<GenericData.Record> messageStream = 
env.addSource(kafkaConsumer);
              messageStream.rebalance().print();
              env.execute("Flink AVRO KAFKA Test");
       }

This is the AvroDeserializationSchema that I am using.


public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

       private static final long serialVersionUID = 4330538776656642778L;

       private final Class<T> avroType;
       private transient DatumReader<T> reader;
       private transient BinaryDecoder decoder;

       public AvroDeserializationSchema(Class<T> avroType) {
              this.avroType = avroType;
       }

       @Override
       public T deserialize(byte[] message) {
              ensureInitialized();
              try {
                     decoder = DecoderFactory.get().binaryDecoder(message, 
decoder);
                     return reader.read(null, decoder);
              } catch (Exception e) {
                     throw new RuntimeException(e);
              }
       }

       @Override
       public boolean isEndOfStream(T nextElement) {
              return false;
       }

       @Override
       public TypeInformation<T> getProducedType() {
              return TypeExtractor.getForClass(avroType);
       }

       private void ensureInitialized() {
              if (reader == null) {
                     if 
(org.apache.avro.specific.SpecificRecordBase.class.isAssignableFrom(avroType)) {
                           reader = new SpecificDatumReader<T>(avroType);
                     } else {
                           reader = new ReflectDatumReader<T>(avroType);
                     }
              }
       }
}

On running this I am getting java.lang.Exception: Not a Specific class: class 
org.apache.avro.generic.GenericData$Record.

Thanks & Regards
Zeeshan Alam
[cid:image001.jpg@01CFC06C.80406AE0]
[cid:image002.jpg@01CFC2B0.B0315750] +91 80 6626 5982  
[cid:image003.jpg@01CFC2B0.B0315750]  +91 7259501608
Fidelity Internal Information<http://fnw.fmr.com/issg/Popi_def-ex.html#internal>

Techworks Monitoring 
link<https://techworks.fmr.com/products/monitoring-overview>


Reply via email to