Hi, > 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口, 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat registry的avro格式吧 confluent schemat registry 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema 试试。
Best, Leonard Xu [1] https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670 <https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036670> > 在 2020年5月29日,01:14,hyangvv <[email protected]> 写道: > > 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下: > import org.apache.kafka.common.serialization.Serializer; > > import java.io.IOException; > > public class UserViewSerializer implements Serializer<UserView> { > @Override > public byte[] serialize(String topic, UserView data) { > byte[] array = null; > try { > array = data.toByteBuffer().array(); > } catch (IOException e) { > e.printStackTrace(); > } > return array; > } > } > 构造kafka的生产者,将UserView实例写入kafka队列,代码如下: > KafkaProducer<String,UserView> producer = new KafkaProducer<>(props, new > StringSerializer(), new UserViewSerializer()); > 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下: > FlinkKafkaConsumer<GenericRecord> myConsumer = new > FlinkKafkaConsumer<>("UserView", > AvroDeserializationSchema.forGeneric(SCHEMA), properties); > 导致运行失败的异常信息如下: > > Caused by: java.io.EOFException > at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw > (BinaryDecoder.java:827) > at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349) > at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263) > at org.apache.avro.io.ResolvingDecoder.readString > (ResolvingDecoder.java:201) > at org.apache.avro.generic.GenericDatumReader.readString > (GenericDatumReader.java:422) > at org.apache.avro.generic.GenericDatumReader.readString > (GenericDatumReader.java:414) > at org.apache.avro.generic.GenericDatumReader.readWithoutConversion > (GenericDatumReader.java:181) > at org.apache.avro.generic.GenericDatumReader.read > (GenericDatumReader.java:153) > at org.apache.avro.generic.GenericDatumReader.readField > (GenericDatumReader.java:232) > at org.apache.avro.generic.GenericDatumReader.readRecord > (GenericDatumReader.java:222) > at org.apache.avro.generic.GenericDatumReader.readWithoutConversion > (GenericDatumReader.java:175) > at org.apache.avro.generic.GenericDatumReader.read > (GenericDatumReader.java:153) > at org.apache.avro.generic.GenericDatumReader.read > (GenericDatumReader.java:145) > at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize > (AvroDeserializationSchema.java:135) > at > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize > (KafkaDeserializationSchemaWrapper.java:45) > at > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop > (KafkaFetcher.java:140) > at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run > (FlinkKafkaConsumerBase.java:718) > at org.apache.flink.streaming.api.operators.StreamSource.run > (StreamSource.java:100) > at org.apache.flink.streaming.api.operators.StreamSource.run > (StreamSource.java:63) > at > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run > (SourceStreamTask.java:200) > > 希望大神不吝赐教。 >
