Hi,

你的map是什么类型呢?我来复现一下。

奔跑的小飞袁 <[email protected]> 于2020年10月13日周二 下午6:07写道:

> hello
> 我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
> org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> ~[?:1.8.0_152]
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> ~[?:1.8.0_152]
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> ~[?:1.8.0_152]
>         at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> [flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
> Caused by: java.lang.NullPointerException
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
> ~[flink-avro-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
> ~[lexus-flink_2.11-0.1.jar:?]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
> ~[flink-dist_2.11-1.11.1.jar:1.11.1]
>
> private static DeserializationRuntimeConverter
> createMapConverter(LogicalType type) {
>                 final DeserializationRuntimeConverter keyConverter =
> createConverter(
>                         DataTypes.STRING().getLogicalType());
>                 final DeserializationRuntimeConverter valueConverter =
> createConverter(
>                         extractValueTypeToAvroMap(type));
>                 return avroObject -> {
>                         final Map<?, ?> map = (Map<?, ?>) avroObject;
>                         Map<Object, Object> result = new HashMap<>();
>                         for (Map.Entry<?, ?> entry : map.entrySet()) {
>                                 Object key =
> keyConverter.convert(entry.getKey());
>                                 Object value =
> valueConverter.convert(entry.getValue());
>                                 result.put(key, value);
>                         }
>                         return new GenericMapData(result);
>                 };
>         }
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 

Best,
Benchao Li

回复