Hi
看着是解析数据错误,可以检查一下是不是source的schema和数据不匹配
On Tue, Apr 18, 2023 at 2:46 PM kcz <573693...@qq.com.invalid> wrote:
> 版本:1.15.2
>1.首先是采用SQL方式,将json数据输入到kafka里面(avro格式)
>2.然后采用DS api方式去接收解析kafka里面的avro数据
> --报错如下--
> Caused by: java.io.IOException: Failed to deserialize consumer record due
> to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -49
> at org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:308)
> at org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Disconnected from the target VM, address: '127.0.0.1:60668', transport:
> 'socket'
>
>
> Process finished with exit code 1
>
>
>
> --第一步SQL代码如下--
>create table test (
> a string,
> b string,
> c string
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'sink',
> 'properties.bootstrap.servers' =
> 'localhost:9092',
> 'format' = 'avro',
>
> 'properties.allow.auto.create.topics' = 'true'
> );
>
>
> create table test_consumer (
> a string,
> b string,
> c string
> ) WITH (
> 'connector' = 'kafka',
> 'topic' = 'source',
> 'properties.bootstrap.servers' =
> 'localhost:9092',
> 'format' = 'json',
>
> 'properties.allow.auto.create.topics' = 'true',
> 'properties.group.id' = 'group',
>
> 'scan.startup.mode' =
> 'latest-offset'
> );
> insert into test select * from test_consumer;
>
>
>
>
> -第二步API接收kafka
> avro代码如下-
>
>
> public