我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下:
import org.apache.kafka.common.serialization.Serializer;

import java.io.IOException;

public class UserViewSerializer implements Serializer<UserView> {
    @Override
    public byte[] serialize(String topic, UserView data) {
        byte[] array = null;
        try {
            array = data.toByteBuffer().array();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return array;
    }
}
构造kafka的生产者,将UserView实例写入kafka队列,代码如下:
KafkaProducer<String,UserView> producer = new KafkaProducer<>(props, new 
StringSerializer(), new UserViewSerializer());
在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下:
FlinkKafkaConsumer<GenericRecord> myConsumer = new 
FlinkKafkaConsumer<>("UserView", AvroDeserializationSchema.forGeneric(SCHEMA), 
properties);
导致运行失败的异常信息如下:

Caused by: java.io.EOFException
    at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw 
(BinaryDecoder.java:827)
    at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349)
    at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString 
(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString 
(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readString 
(GenericDatumReader.java:414)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
(GenericDatumReader.java:181)
    at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField 
(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord 
(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read 
(GenericDatumReader.java:145)
    at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize 
(AvroDeserializationSchema.java:135)
    at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize
 (KafkaDeserializationSchemaWrapper.java:45)
    at 
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop 
(KafkaFetcher.java:140)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run 
(FlinkKafkaConsumerBase.java:718)
    at org.apache.flink.streaming.api.operators.StreamSource.run 
(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run 
(StreamSource.java:63)
    at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
 (SourceStreamTask.java:200)

希望大神不吝赐教。

回复