[ 
https://issues.apache.org/jira/browse/KAFKA-13032?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17380020#comment-17380020
 ] 

Matthias J. Sax edited comment on KAFKA-13032 at 7/13/21, 4:26 PM:
-------------------------------------------------------------------

Looking into the code, it seems that your `KeyValueMapper` may return `null` 
for some cases?
{code:java}
stream.map((k,v) -> null);
{code}
Returning `null` is not allowed. You always need to return a `KeyValue` object 
as pointed out in the JavaDocs of `KStream#map`:
{code:java}
The provided {@link KeyValueMapper} must return a {@link KeyValue} type and 
must not return {@code null}.{code}

If that is the case, your are right though that it's hard to debug... The error 
message is not very helpful. Something we might want to improve.


was (Author: mjsax):
Looking into the code, it seems that your `KeyValueMapper` may return `null` 
for some cases?
{code:java}
stream.map((k,v) -> null);
{code}
Returning `null` is not allowed. You always need to return a `KeyValue` object 
as pointed out in the JavaDocs of `KStream#map`:
{code:java}
The provided {@link KeyValueMapper} must return a {@link KeyValue} type and 
must not return {@code null}.
{code}

> Impossible stacktrace
> ---------------------
>
>                 Key: KAFKA-13032
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13032
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.8.0
>            Reporter: Niclas Hedhman
>            Priority: Major
>
> I am presented with a stacktrace that has not a single touch point in my 
> code, so it is incredibly difficult to figure out where the problem could be. 
> I think more RuntimeExceptions need to be caught and pull out information at 
> each level that is providing any additional hint of where we are.
> For instance, each node could prepend its reference/name and one would have a 
> chance to see where we are...
> ```
> org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=0_140, processor=KSTREAM-SOURCE-0000000000, topic=_poll, 
> partition=140, offset=0, stacktrace=java.lang.NullPointerException
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:758)
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:1177)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:753)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
> Caused by: java.lang.NullPointerException
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMap$KStreamMapProcessor.process(KStreamMap.java:42)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.PassThrough$PassThroughProcessor.process(PassThrough.java:33)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:268)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:214)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamBranch$KStreamBranchProcessor.process(KStreamBranch.java:50)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamPeek$KStreamPeekProcessor.process(KStreamPeek.java:44)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:41)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:172)
>       at 
> org.apache.kafka.streams.kstream.internals.KStreamFilter$KStreamFilterProcessor.process(KStreamFilter.java:43)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorAdapter.process(ProcessorAdapter.java:71)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:181)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forwardInternal(ProcessorContextImpl.java:281)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:260)
>       at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:219)
>       at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:86)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$1(StreamTask.java:731)
>       at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:884)
>       at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:731)
>       ... 4 more
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to