[
https://issues.apache.org/jira/browse/KAFKA-20312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
guwensheng updated KAFKA-20312:
-------------------------------
Description:
jdk:21
KafkaClient: 3.9.0
When I use KafkaConsumer poll to pull messages from Kafka, there is a low
probability that I encounter a null pointer issue.
I thought about it, and this might be because some partitions do not have a
leader when the Kafka service encounters an exception.
The problem stack is as follows:
{code:java}
Caused by: java.lang.NullPointerException: element cannot be mapped to a null
key
at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
~[?:?]
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
I understand that this is the cause of the null pointer issue, and I hope that
null pointer protection can be implemented here.
was:
jdk:21
KafkaClient: 3.9.1
When I use KafkaConsumer poll to pull messages from Kafka, there is a low
probability that I encounter a null pointer issue.
I thought about it, and this might be because some partitions do not have a
leader when the Kafka service encounters an exception.
The problem stack is as follows:
{code:java}
Caused by: java.lang.NullPointerException: element cannot be mapped to a null
key
at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
~[?:?]
at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown Source)
~[?:?]
at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
Source) ~[?:?]
at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878)
~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
I understand that this is the cause of the null pointer issue, and I hope that
null pointer protection can be implemented here.
> KafkaConsumer poll may happens NullPointerException
> ---------------------------------------------------
>
> Key: KAFKA-20312
> URL: https://issues.apache.org/jira/browse/KAFKA-20312
> Project: Kafka
> Issue Type: Bug
> Components: clients
> Affects Versions: 3.9.1
> Reporter: guwensheng
> Priority: Major
> Fix For: 3.9.0, 4.0.0, 4.1.0
>
>
> jdk:21
> KafkaClient: 3.9.0
> When I use KafkaConsumer poll to pull messages from Kafka, there is a low
> probability that I encounter a null pointer issue.
> I thought about it, and this might be because some partitions do not have a
> leader when the Kafka service encounters an exception.
> The problem stack is as follows:
>
> {code:java}
> Caused by: java.lang.NullPointerException: element cannot be mapped to a null
> key
> at java.base/java.util.Objects.requireNonNull(Unknown Source) ~[?:?]
> at java.base/java.util.stream.Collectors.lambda$groupingBy$53(Unknown Source)
> ~[?:?]
> at java.base/java.util.stream.ReduceOps$3ReducingSink.accept(Unknown Source)
> ~[?:?]
> at java.base/java.util.HashMap$EntrySpliterator.forEachRemaining(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.copyInto(Unknown Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(Unknown
> Source) ~[?:?]
> at java.base/java.util.stream.AbstractPipeline.evaluate(Unknown Source) ~[?:?]
> at java.base/java.util.stream.ReferencePipeline.collect(Unknown Source) ~[?:?]
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcherUtils.regroupPartitionMapByNode(OffsetFetcherUtils.java:187)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.groupListOffsetRequests(OffsetFetcher.java:379)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsAsync(OffsetFetcher.java:214)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.OffsetFetcher.resetPositionsIfNeeded(OffsetFetcher.java:109)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateFetchPositions(ClassicKafkaConsumer.java:1258)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.updateAssignmentMetadataIfNeeded(ClassicKafkaConsumer.java:708)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:663)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.poll(ClassicKafkaConsumer.java:643)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:878)
> ~[kafka-clients-3.9.0-htrunk7.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1347)
> ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1295)
> ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1028)
> ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:777)
> ~[kafka-streams-3.9.0-htrunk6.gdd.pub.r1.jar:?] {code}
> I understand that this is the cause of the null pointer issue, and I hope
> that null pointer protection can be implemented here.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)