Thanks for letting us now. I hope we can improve the Avro support in the
1.15 release.
Maybe the `"name" : "KafkaAvroMessage", "namespace" : "xxx"` causes the
exception then? Otherwise the schema looks identical.
Regards,
Timo
On 30.08.21 11:00, Wayne wrote:
I use the way of writing code to load the schema to read the data, and
it can be parsed normally. I debug sql execution process and found that
the schema generated by sql is indeed inconsistent with the original
schema of avro, so the final judgment is that the sql was written
incorrectly, resulting in no Piece together the same schema
The following is the schema generated by this sql:
{"type":"record","name":"record","fields":[{"name":"aaa","type":"string"},{"name":"bbb","type":["null","string"],"default":null},{"name":"ccc","type":["null","string"],"default":null},{"name":"ddd","type":"string"}]}
At 2021-08-30 15:03:49, "Timo Walther" <twal...@apache.org> wrote:
Hi,
could it be that there is some corrupt record in your Kafka topic? Maybe
you can read from a different offset to verify that. In general, I
cannot spot an obivious mistake in your schema.
Regards,
Timo
On 28.08.21 14:32, Wayne wrote:
i have Apache Avro schema
我的avro schema 如下
|{ "type" : "record", "name" : "KafkaAvroMessage", "namespace" : "xxx",
"fields" : [ { "name" : "aaa", "type" : "string" }, { "name" : "bbb",
"type" : [ "null", "string" ], "default" : null },{ "name" : "ccc",
"type" : [ "null", "string" ] }, { "name" : "ddd", "type" : "string",
"default" : "" } ] } |
The sql worte is like this
我下的sql如下
|CREATE TABLE xxx ( `aaa` STRING NOT NULL, `bbb` STRING , `ccc` STRING ,
`ddd` STRING NOT NULL ) WITH( ... 'format' = 'avro' ); |
|
|
报错如下
Exception in thread "main" java.lang.RuntimeException: Failed to fetch
next result at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:109)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:80)
at
org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:117)
at
org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:350)
at
org.apache.flink.table.utils.PrintUtils.printAsTableauForm(PrintUtils.java:149)
at
org.apache.flink.table.api.internal.TableResultImpl.print(TableResultImpl.java:154)
at com.stubhub.wyane.flink.avro.avroTest4.main(avroTest4.java:50) Caused
by: java.io.IOException: Failed to fetch job execution result at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:169)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:118)
at
org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106)
... 6 more Caused by: java.util.concurrent.ExecutionException:
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed. at
java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at
java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1928)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:167)
... 8 more Caused by:
org.apache.flink.runtime.client.JobExecutionException: Job execution
failed. at
org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:117)
at
java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
at
java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:628)
at
java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:1996)
at
org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:114)
at
org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:166)
... 8 more Caused by: org.apache.flink.runtime.JobException: Recovery is
suppressed by NoRestartBackoffTimeStrategy at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:669)
at
org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at sun.reflect.GeneratedMethodAccessor13.invoke(Unknown Source) at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498) at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at
scala.PartialFunction.applyOrElse(PartialFunction.scala:127) at
scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) at
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) at
akka.actor.Actor.aroundReceive(Actor.scala:517) at
akka.actor.Actor.aroundReceive$(Actor.scala:515) at
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at
akka.actor.ActorCell.invoke(ActorCell.scala:561) at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at
akka.dispatch.Mailbox.run(Mailbox.scala:225) at
akka.dispatch.Mailbox.exec(Mailbox.scala:235) at
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.io.IOException: Failed to deserialize Avro record. at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:113)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:263)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 20 at
org.apache.flink.avro.shaded.org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
at
org.apache.flink.avro.shaded.org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
at
org.apache.flink.avro.shaded.org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
... 9 more Process finished with exit code 1
我的sql写的哪里有问题