Re: flink api消费kafka(avro)数据出错

2023-04-18 文章 Shammon FY
Hi

看着是解析数据错误,可以检查一下是不是source的schema和数据不匹配

On Tue, Apr 18, 2023 at 2:46 PM kcz <573693...@qq.com.invalid> wrote:

> 版本:1.15.2
>1.首先是采用SQL方式,将json数据输入到kafka里面(avro格式)
>2.然后采用DS api方式去接收解析kafka里面的avro数据
> --报错如下--
> Caused by: java.io.IOException: Failed to deserialize consumer record due
> to
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:56)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:33)
> at
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:143)
> at
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
> at org.apache.flink.streaming.runtime.io
> .StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at org.apache.flink.streaming.runtime.io
> .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
> at
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
> at
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
> at
> org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is
> negative: -49
> at org.apache.avro.io
> .BinaryDecoder.readString(BinaryDecoder.java:308)
> at org.apache.avro.io
> .ResolvingDecoder.readString(ResolvingDecoder.java:208)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:470)
> at
> org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:460)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:192)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:260)
> at
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
> at
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
> at
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
> at
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:142)
> at
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
> at
> org.apache.flink.connector.kafka.source.reader.deserializer.KafkaValueOnlyDeserializationSchemaWrapper.deserialize(KafkaValueOnlyDeserializationSchemaWrapper.java:51)
> at
> org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter.emitRecord(KafkaRecordEmitter.java:53)
> ... 14 more
> Disconnected from the target VM, address: '127.0.0.1:60668', transport:
> 'socket'
>
>
> Process finished with exit code 1
>
>
>
> --第一步SQL代码如下--
>create table test (
>   a string,
>   b string,
>   c string
>   ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'sink',
>  'properties.bootstrap.servers' =
> 'localhost:9092',
>  'format' = 'avro',
> 
> 'properties.allow.auto.create.topics' = 'true'
>  );
>
>
>   create table test_consumer (
>   a string,
>   b string,
>   c string
>   ) WITH (
>  'connector' = 'kafka',
>  'topic' = 'source',
>  'properties.bootstrap.servers' =
> 'localhost:9092',
>  'format' = 'json',
> 
> 'properties.allow.auto.create.topics' = 'true',
>  'properties.group.id' = 'group',
>
>  'scan.startup.mode' =
> 'latest-offset'
>  );
>   insert into test select * from test_consumer;
>
>
>
>
> -第二步API接收kafka
> avro代码如下-
>
>
> public 

flink api??????

2021-04-07 文章 op
flink??api??
1.connectkeyedstream??key join??
2.coprocessfunction ?? keyedcoprocessfunction