Hi, 你的map是什么类型呢?我来复现一下。
奔跑的小飞袁 <[email protected]> 于2020年10月13日周二 下午6:07写道: > hello > 我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > ~[?:1.8.0_152] > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > ~[?:1.8.0_152] > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > ~[?:1.8.0_152] > at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.11-1.11.1.jar:1.11.1] > at > > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.11-1.11.1.jar:1.11.1] > Caused by: java.io.IOException: Failed to deserialize Avro record. > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > Caused by: java.lang.NullPointerException > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75) > ~[flink-avro-1.11.1.jar:1.11.1] > at > > org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) > ~[lexus-flink_2.11-0.1.jar:?] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > at > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201) > ~[flink-dist_2.11-1.11.1.jar:1.11.1] > > private static DeserializationRuntimeConverter > createMapConverter(LogicalType type) { > final DeserializationRuntimeConverter keyConverter = > createConverter( > DataTypes.STRING().getLogicalType()); > final DeserializationRuntimeConverter valueConverter = > createConverter( > extractValueTypeToAvroMap(type)); > return avroObject -> { > final Map<?, ?> map = (Map<?, ?>) avroObject; > Map<Object, Object> result = new HashMap<>(); > for (Map.Entry<?, ?> entry : map.entrySet()) { > Object key = > keyConverter.convert(entry.getKey()); > Object value = > valueConverter.convert(entry.getValue()); > result.put(key, value); > } > return new GenericMapData(result); > }; > } > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ > -- Best, Benchao Li
