[ 
https://issues.apache.org/jira/browse/KAFKA-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017528#comment-18017528
 ] 

Jainil Rana commented on KAFKA-19638:
-------------------------------------

Hi @mjsax, I’d like to work on this issue and submit a patch, can you please 
assign it to me?

> NPE in `Processor#init()` accessing state store
> -----------------------------------------------
>
>                 Key: KAFKA-19638
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19638
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 4.2.0
>            Reporter: Matthias J. Sax
>            Priority: Blocker
>             Fix For: 4.2.0
>
>
> As reported on the dev mailing list, we introduced a regression bug via 
> https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did 
> revert the commit 
> ([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d])
>  for 4.1 release, and want to fix-forward for 4.2 release.
> Stacktrace:
> {code:java}
> 15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the 
> following exception during processing and the registered exception handler 
> opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
> org.apache.kafka.streams.errors.StreamsException: failed to initialize 
> processor random-value-processor
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894)
>  [kafka-streams-4.2.0-SNAPSHOT.jar:?]
> Caused by: java.lang.NullPointerException: Cannot invoke 
> "org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()"
>  because the return value of 
> "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()"
>  is null
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         at 
> io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21)
>  ~[kafka-streams-stateful-unspecified.jar:?]
>         at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124)
>  ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
>         ... 8 more {code}
> Thanks [~eduwerc] for reporting the issue.
> We clearly have a testing gap, not trying to use a state store within 
> `Processor#init()`. – We need to close this gap.
> However, there is also the question if using zero as surrogate ts for this 
> case (as the old code does), is a good solution or not? – We could try to use 
> stream-time, but for the very first startup of an application, we also do not 
> have stream-time established yet, so we kinda push the can down the road.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to