hello
我在使用flink-sql1.11版本是使用到了map类型,但是我遇到了问题,当map中的value为空时会产生空指针异常,下面附上我的错误以及源代码
org.apache.flink.runtime.JobException: Recovery is suppressed by
NoRestartBackoffTimeStrategy
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:185)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:179)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:503)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:386)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
~[?:1.8.0_152]
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
~[?:1.8.0_152]
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
~[?:1.8.0_152]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_152]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.actor.ActorCell.invoke(ActorCell.scala:561)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.dispatch.Mailbox.run(Mailbox.scala:225)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.io.IOException: Failed to deserialize Avro record.
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:151)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
Caused by: java.lang.NullPointerException
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createConverter$57e941b$5(AvroRowDataDeserializationSchema.java:253)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createMapConverter$7941d275$1(AvroRowDataDeserializationSchema.java:315)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createNullableConverter$c3bac5d8$1(AvroRowDataDeserializationSchema.java:222)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.lambda$createRowConverter$80d8b6bd$1(AvroRowDataDeserializationSchema.java:207)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:149)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:75)
~[flink-avro-1.11.1.jar:1.11.1]
        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:81)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:56)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:181)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755)
~[lexus-flink_2.11-0.1.jar:?]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
~[flink-dist_2.11-1.11.1.jar:1.11.1]
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
~[flink-dist_2.11-1.11.1.jar:1.11.1]

private static DeserializationRuntimeConverter
createMapConverter(LogicalType type) {
                final DeserializationRuntimeConverter keyConverter = 
createConverter(
                        DataTypes.STRING().getLogicalType());
                final DeserializationRuntimeConverter valueConverter = 
createConverter(
                        extractValueTypeToAvroMap(type));
                return avroObject -> {
                        final Map<?, ?> map = (Map<?, ?>) avroObject;
                        Map<Object, Object> result = new HashMap<>();
                        for (Map.Entry<?, ?> entry : map.entrySet()) {
                                Object key = 
keyConverter.convert(entry.getKey());
                                Object value = 
valueConverter.convert(entry.getValue());
                                result.put(key, value);
                        }
                        return new GenericMapData(result);
                };
        }





--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复