Hi

看着是解析数据错误,可以检查一下是不是source的schema和数据不匹配

On Tue, Apr 18, 2023 at 2:46 PM kcz <573693...@qq.com.invalid> wrote:

> 版本:1.15.2
>                1.首先是采用SQL方式,将json数据输入到kafka里面(avro格式)
>                2.然后采用DS api方式去接收解析kafka里面的avro数据
> ----------------------报错如下----------------------------------
> Caused by: java.io.IOException: Failed to deserialize consumer record due
> to
>         at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
>         at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
>         at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
>         at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
>         at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
>         at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
>         at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
>         at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
>         at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
>         at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
>         at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -49
>         at org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:308)
>         at org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
>         at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
>         at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
>         at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
>         at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
>         at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
>         at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
>         at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
>         at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
>         ... 14 more
> Disconnected from the target VM, address: '127.0.0.1:60668', transport:
> 'socket'
>
>
> Process finished with exit code 1
>
>
>
> --------------第一步SQL代码如下------------------------------------------
>                create table test (
>               a string,
>               b string,
>               c string
>               ) WITH (
>               &nbsp; &nbsp; &nbsp; 'connector' = 'kafka',
>               &nbsp; &nbsp; &nbsp; 'topic' = 'sink',
>               &nbsp; &nbsp; &nbsp; 'properties.bootstrap.servers' =
> 'localhost:9092',
>               &nbsp; &nbsp; &nbsp; 'format' = 'avro',
>                                 &nbsp;
> 'properties.allow.auto.create.topics' = 'true'
>               &nbsp; &nbsp; &nbsp; );
>
>
>               create table test_consumer (
>               a string,
>               b string,
>               c string
>               ) WITH (
>               &nbsp; &nbsp; &nbsp; 'connector' = 'kafka',
>               &nbsp; &nbsp; &nbsp; 'topic' = 'source',
>               &nbsp; &nbsp; &nbsp; 'properties.bootstrap.servers' =
> 'localhost:9092',
>               &nbsp; &nbsp; &nbsp; 'format' = 'json',
>                                 &nbsp;
> 'properties.allow.auto.create.topics' = 'true',
>                                 &nbsp; 'properties.group.id' = 'group',
>
>                                 &nbsp; 'scan.startup.mode' =
> 'latest-offset'
>               &nbsp; &nbsp; &nbsp; );
>               insert into test select * from test_consumer;
>
>
>
>
> -------------------------第二步API接收kafka
> avro代码如下-----------------------------
>
>
> public class KafkaAvroToJson {
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>
>         env.setParallelism(1);
>         DeserializationSchema<GenericRecord&gt; deserializer =
> AvroDeserializationSchema.forGeneric(AvroSchemaGenerator.generateSchema(AvroTest.class));
>         KafkaSource<GenericRecord&gt; kafkaConsumer =
> KafkaSource.<GenericRecord&gt;builder()
>                 .setBootstrapServers("localhost:9092")
>
> .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(deserializer))
>                 .setTopics("g05-sink")
>                 .setGroupId("GROUP_NAME")
>                 .setStartingOffsets(OffsetsInitializer.latest())
>                 .build();
>
>         DataStream<String&gt; jsonStream = env.fromSource(kafkaConsumer,
> WatermarkStrategy.noWatermarks(), "kafka source")
>                 .map(Object::toString);
>
>         jsonStream.print();
>
>         env.execute("Kafka Avro to JSON");
>     }
>
>     private static class AvroSchemaGenerator {
>         public static Schema generateSchema(Class<?&gt; clazz) {
>             return ReflectData.get().getSchema(clazz);
>         }
>     }
> }
>
>
>
>
>
>
>
>
>       &nbsp;

回复