你好,我没有用confluent schema registry的avro格式,就是调用avro生成类中的toByteBufer实现了kafka的Serilizer。
> 在 2020年5月29日,下午12:31,Leonard Xu <[email protected]> 写道: > > Hi, >> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口, > 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat > registry的avro格式吧 > confluent schemat registry > 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema > 试试。 > > > Best, > Leonard Xu > [1] > https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670 > > <https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036670> > > >> 在 2020年5月29日,01:14,hyangvv <[email protected]> 写道: >> >> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下: >> import org.apache.kafka.common.serialization.Serializer; >> >> import java.io.IOException; >> >> public class UserViewSerializer implements Serializer<UserView> { >> @Override >> public byte[] serialize(String topic, UserView data) { >> byte[] array = null; >> try { >> array = data.toByteBuffer().array(); >> } catch (IOException e) { >> e.printStackTrace(); >> } >> return array; >> } >> } >> 构造kafka的生产者,将UserView实例写入kafka队列,代码如下: >> KafkaProducer<String,UserView> producer = new KafkaProducer<>(props, new >> StringSerializer(), new UserViewSerializer()); >> 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下: >> FlinkKafkaConsumer<GenericRecord> myConsumer = new >> FlinkKafkaConsumer<>("UserView", >> AvroDeserializationSchema.forGeneric(SCHEMA), properties); >> 导致运行失败的异常信息如下: >> >> Caused by: java.io.EOFException >> at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw >> (BinaryDecoder.java:827) >> at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349) >> at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263) >> at org.apache.avro.io.ResolvingDecoder.readString >> (ResolvingDecoder.java:201) >> at org.apache.avro.generic.GenericDatumReader.readString >> (GenericDatumReader.java:422) >> at org.apache.avro.generic.GenericDatumReader.readString >> (GenericDatumReader.java:414) >> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion >> (GenericDatumReader.java:181) >> at org.apache.avro.generic.GenericDatumReader.read >> (GenericDatumReader.java:153) >> at org.apache.avro.generic.GenericDatumReader.readField >> (GenericDatumReader.java:232) >> at org.apache.avro.generic.GenericDatumReader.readRecord >> (GenericDatumReader.java:222) >> at org.apache.avro.generic.GenericDatumReader.readWithoutConversion >> (GenericDatumReader.java:175) >> at org.apache.avro.generic.GenericDatumReader.read >> (GenericDatumReader.java:153) >> at org.apache.avro.generic.GenericDatumReader.read >> (GenericDatumReader.java:145) >> at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize >> (AvroDeserializationSchema.java:135) >> at >> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize >> (KafkaDeserializationSchemaWrapper.java:45) >> at >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop >> (KafkaFetcher.java:140) >> at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run >> (FlinkKafkaConsumerBase.java:718) >> at org.apache.flink.streaming.api.operators.StreamSource.run >> (StreamSource.java:100) >> at org.apache.flink.streaming.api.operators.StreamSource.run >> (StreamSource.java:63) >> at >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run >> (SourceStreamTask.java:200) >> >> 希望大神不吝赐教。 >> >
