你好,我没有用confluent schema 
registry的avro格式,就是调用avro生成类中的toByteBufer实现了kafka的Serilizer。



> 在 2020年5月29日,下午12:31,Leonard Xu <[email protected]> 写道:
> 
> Hi, 
>> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,
> 你往kafka中写入avro格式的数据时用kafak的Serializer,写入的格式是当成confluent schemat 
> registry的avro格式吧
> confluent schemat registry 
> 在处理avro数据时会多写一个MAGIC_BYTE,一般avro是没有的,消费时用ConfluentRegistryAvroDeserializationSchema
>  试试。
> 
> 
> Best,
> Leonard Xu
> [1] 
> https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17036670
>  
> <https://issues.apache.org/jira/browse/FLINK-16048?focusedCommentId=17036670&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17036670>
> 
> 
>> 在 2020年5月29日,01:14,hyangvv <[email protected]> 写道:
>> 
>> 我使用avro代码生成工具基于一个schema生成了UserView类,又实现了kafka的Serializer接口,代码如下:
>> import org.apache.kafka.common.serialization.Serializer;
>> 
>> import java.io.IOException;
>> 
>> public class UserViewSerializer implements Serializer<UserView> {
>>   @Override
>>   public byte[] serialize(String topic, UserView data) {
>>       byte[] array = null;
>>       try {
>>           array = data.toByteBuffer().array();
>>       } catch (IOException e) {
>>           e.printStackTrace();
>>       }
>>       return array;
>>   }
>> }
>> 构造kafka的生产者,将UserView实例写入kafka队列,代码如下:
>> KafkaProducer<String,UserView> producer = new KafkaProducer<>(props, new 
>> StringSerializer(), new UserViewSerializer());
>> 在Flink程序中用FlinkKafkaConsumer消费kafka队列中的avro格式的消息,就出现了主题中描述的问题,出现异常的代码如下:
>> FlinkKafkaConsumer<GenericRecord> myConsumer = new 
>> FlinkKafkaConsumer<>("UserView", 
>> AvroDeserializationSchema.forGeneric(SCHEMA), properties);
>> 导致运行失败的异常信息如下:
>> 
>> Caused by: java.io.EOFException
>>   at org.apache.avro.io.BinaryDecoder$InputStreamByteSource.readRaw 
>> (BinaryDecoder.java:827)
>>   at org.apache.avro.io.BinaryDecoder.doReadBytes (BinaryDecoder.java:349)
>>   at org.apache.avro.io.BinaryDecoder.readString (BinaryDecoder.java:263)
>>   at org.apache.avro.io.ResolvingDecoder.readString 
>> (ResolvingDecoder.java:201)
>>   at org.apache.avro.generic.GenericDatumReader.readString 
>> (GenericDatumReader.java:422)
>>   at org.apache.avro.generic.GenericDatumReader.readString 
>> (GenericDatumReader.java:414)
>>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
>> (GenericDatumReader.java:181)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:153)
>>   at org.apache.avro.generic.GenericDatumReader.readField 
>> (GenericDatumReader.java:232)
>>   at org.apache.avro.generic.GenericDatumReader.readRecord 
>> (GenericDatumReader.java:222)
>>   at org.apache.avro.generic.GenericDatumReader.readWithoutConversion 
>> (GenericDatumReader.java:175)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:153)
>>   at org.apache.avro.generic.GenericDatumReader.read 
>> (GenericDatumReader.java:145)
>>   at org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize 
>> (AvroDeserializationSchema.java:135)
>>   at 
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize
>>  (KafkaDeserializationSchemaWrapper.java:45)
>>   at 
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop
>>  (KafkaFetcher.java:140)
>>   at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run 
>> (FlinkKafkaConsumerBase.java:718)
>>   at org.apache.flink.streaming.api.operators.StreamSource.run 
>> (StreamSource.java:100)
>>   at org.apache.flink.streaming.api.operators.StreamSource.run 
>> (StreamSource.java:63)
>>   at 
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run
>>  (SourceStreamTask.java:200)
>> 
>> 希望大神不吝赐教。
>> 
> 

回复