Hi folks,
I am trying to consume avro data from Kafka in Flink. The data is produced by 
Kafka connect using AvroConverter. I have created a 
AvroDeserializationSchema.java 
<https://gist.github.com/datafibers/ae9d624b6db44865ae14defe8a838123> used by 
Flink consumer. Then, I use following code to read it.

public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
              Properties properties = new Properties();
              properties.setProperty("bootstrap.servers", “localhost:9092");
              properties.setProperty("zookeeper.connect", “localhost:2181”);
Schema schema = new Parser().parse("{" + "\"name\": \"test\", "
                                           + "\"type\": \"record\", "
                                           + "\"fields\": "
                                           +" [ "
                                           + "  { \"name\": \"name\", \"type\": 
\"string\" },"
                                           + "  { \"name\": \"symbol\", 
\"type\": \"string\" },"
                                           + "  { \"name\": \"exchange\", 
\"type\": \"string\"}"
                                           + "] "
                                           +"}");

              AvroDeserializationSchema avroSchema = new 
AvroDeserializationSchema<>(schema);
              FlinkKafkaConsumer09<GenericRecord> kafkaConsumer = 
                new FlinkKafkaConsumer09<>("myavrotopic",avroSchema, 
properties);
              DataStream<GenericRecord> messageStream = 
env.addSource(kafkaConsumer);
              messageStream.rebalance().print();
              env.execute("Flink AVRO KAFKA Test");
}

Once, I run the code, I am able to get the schema information only as follows.
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":""}
{"name":"", "symbol":"", "exchange":”"}

Could anyone help to find out the issues why I cannot decode it?

Further troubleshooting, I found out if I use a kafka producer here 
<https://gist.github.com/datafibers/d063b255b50fa34515c0ac9e24d4485c> to send 
the avro data especially using kafka.serializer.DefaultEncoder. Above code can 
get correct result. Does any body know how to either set DefaultEncoder in 
Kafka Connect or set it when writing customized kafka connect? Or in the other 
way, how should I modify the AvroDeserializationSchema.java for instead?

Thanks, I’ll post this to the Kafka user group as well.
Will




Reply via email to