[ https://issues.apache.org/jira/browse/KAFKA-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18017528#comment-18017528 ]
Jainil Rana commented on KAFKA-19638: ------------------------------------- Hi @mjsax, Iād like to work on this issue and submit a patch, can you please assign it to me? > NPE in `Processor#init()` accessing state store > ----------------------------------------------- > > Key: KAFKA-19638 > URL: https://issues.apache.org/jira/browse/KAFKA-19638 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 4.2.0 > Reporter: Matthias J. Sax > Priority: Blocker > Fix For: 4.2.0 > > > As reported on the dev mailing list, we introduced a regression bug via > https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did > revert the commit > ([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d]) > for 4.1 release, and want to fix-forward for 4.2 release. > Stacktrace: > {code:java} > 15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the > following exception during processing and the registered exception handler > opted to SHUTDOWN_CLIENT. The streams client is going to shut down now. > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor random-value-processor > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894) > [kafka-streams-4.2.0-SNAPSHOT.jar:?] > Caused by: java.lang.NullPointerException: Cannot invoke > "org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()" > because the return value of > "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()" > is null > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > at > io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21) > ~[kafka-streams-stateful-unspecified.jar:?] > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124) > ~[kafka-streams-4.2.0-SNAPSHOT.jar:?] > ... 8 more {code} > Thanks [~eduwerc] for reporting the issue. > We clearly have a testing gap, not trying to use a state store within > `Processor#init()`. ā We need to close this gap. > However, there is also the question if using zero as surrogate ts for this > case (as the old code does), is a good solution or not? ā We could try to use > stream-time, but for the very first startup of an application, we also do not > have stream-time established yet, so we kinda push the can down the road. -- This message was sent by Atlassian Jira (v8.20.10#820010)