[ 
https://issues.apache.org/jira/browse/KAFKA-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

guwensheng updated KAFKA-20312:
-------------------------------
    Description: 
jdk:21 
KafkaClient: 3.9.0

When I use KafkaConsumer poll to pull messages from Kafka, there is a low 
probability that I encounter a null pointer issue.
I thought about it, and this might be because some partitions do not have a 
leader when the Kafka service encounters an exception.
The problem stack is as follows:

 
{code:java}
Caused by: java.lang.NullPointerException: element cannot be mapped to a null 
key
at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source) 
~[?:?]
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source) 
~[?:?]
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown 
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) 
~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878) 
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
I understand that this is the cause of the null pointer issue, and I hope that 
null pointer protection can be implemented here.

  was:
jdk:21 
KafkaClient: 3.9.1

When I use KafkaConsumer poll to pull messages from Kafka, there is a low 
probability that I encounter a null pointer issue.
I thought about it, and this might be because some partitions do not have a 
leader when the Kafka service encounters an exception.
The problem stack is as follows:

 
{code:java}
Caused by: java.lang.NullPointerException: element cannot be mapped to a null 
key
at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source) 
~[?:?]
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source) 
~[?:?]
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown 
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source) 
~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
 ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878) 
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
 ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
I understand that this is the cause of the null pointer issue, and I hope that 
null pointer protection can be implemented here.


> KafkaConsumer poll may happens NullPointerException
> ---------------------------------------------------
>
>                 Key: KAFKA-20312
>                 URL: https://issues.apache.org/jira/browse/KAFKA-20312
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients
>    Affects Versions: 3.9.1
>            Reporter: guwensheng
>            Priority: Major
>             Fix For: 3.9.0, 4.0.0, 4.1.0
>
>
> jdk:21 
> KafkaClient: 3.9.0
> When I use KafkaConsumer poll to pull messages from Kafka, there is a low 
> probability that I encounter a null pointer issue.
> I thought about it, and this might be because some partitions do not have a 
> leader when the Kafka service encounters an exception.
> The problem stack is as follows:
>  
> {code:java}
> Caused by: java.lang.NullPointerException: element cannot be mapped to a null 
> key
> at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
> at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source) 
> ~[?:?]
> at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source) 
> ~[?:?]
> at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown 
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown 
> Source) ~[?:?]
> at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown 
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
> at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
>  ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878) 
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
>  ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
>  ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
>  ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
>  ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
> I understand that this is the cause of the null pointer issue, and I hope 
> that null pointer protection can be implemented here.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to