Hi 看着是解析数据错误,可以检查一下是不是source的schema和数据不匹配
On Tue, Apr 18, 2023 at 2:46 PM kcz <573693...@qq.com.invalid> wrote: > 版本:1.15.2 > 1.首先是采用SQL方式,将json数据输入到kafka里面(avro格式) > 2.然后采用DS api方式去接收解析kafka里面的avro数据 > ----------------------报错如下---------------------------------- > Caused by: java.io.IOException: Failed to deserialize consumer record due > to > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56) > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33) > at > org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143) > at > org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385) > at org.apache.flink.streaming.runtime.io > .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753) > at > org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948) > at > org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) > at java.lang.Thread.run(Thread.java:748) > Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is > negative: -49 > at org.apache.avro.io > .BinaryDecoder.readString(BinaryDecoder.java:308) > at org.apache.avro.io > .ResolvingDecoder.readString(ResolvingDecoder.java:208) > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470) > at > org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) > at > org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260) > at > org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248) > at > org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161) > at > org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154) > at > org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142) > at > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82) > at > org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51) > at > org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53) > ... 14 more > Disconnected from the target VM, address: '127.0.0.1:60668', transport: > 'socket' > > > Process finished with exit code 1 > > > > --------------第一步SQL代码如下------------------------------------------ > create table test ( > a string, > b string, > c string > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'sink', > 'properties.bootstrap.servers' = > 'localhost:9092', > 'format' = 'avro', > > 'properties.allow.auto.create.topics' = 'true' > ); > > > create table test_consumer ( > a string, > b string, > c string > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'source', > 'properties.bootstrap.servers' = > 'localhost:9092', > 'format' = 'json', > > 'properties.allow.auto.create.topics' = 'true', > 'properties.group.id' = 'group', > > 'scan.startup.mode' = > 'latest-offset' > ); > insert into test select * from test_consumer; > > > > > -------------------------第二步API接收kafka > avro代码如下----------------------------- > > > public class KafkaAvroToJson { > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.setParallelism(1); > DeserializationSchema<GenericRecord> deserializer = > AvroDeserializationSchema.forGeneric(AvroSchemaGenerator.generateSchema(AvroTest.class)); > KafkaSource<GenericRecord> kafkaConsumer = > KafkaSource.<GenericRecord>builder() > .setBootstrapServers("localhost:9092") > > .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(deserializer)) > .setTopics("g05-sink") > .setGroupId("GROUP_NAME") > .setStartingOffsets(OffsetsInitializer.latest()) > .build(); > > DataStream<String> jsonStream = env.fromSource(kafkaConsumer, > WatermarkStrategy.noWatermarks(), "kafka source") > .map(Object::toString); > > jsonStream.print(); > > env.execute("Kafka Avro to JSON"); > } > > private static class AvroSchemaGenerator { > public static Schema generateSchema(Class<?> clazz) { > return ReflectData.get().getSchema(clazz); > } > } > } > > > > > > > > >