[
https://issues.apache.org/jira/browse/KAFKA-19638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax updated KAFKA-19638:
------------------------------------
Description:
As reported on the dev mailing list, we introduced a regression bug via
https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did revert
the commit
([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d])
for 4.1 release, and want to fix-forward for 4.2 release.
Stacktrace:
{code:java}
15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the
following exception during processing and the registered exception handler
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor random-value-processor
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894)
[kafka-streams-4.2.0-SNAPSHOT.jar:?]
Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()"
because the return value of
"org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()"
is null
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21)
~[kafka-streams-stateful-unspecified.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
... 8 more {code}
Thanks [~eduwerc] for reporting the issue.
We clearly have a testing gap, not trying to use a state store within
`Processor#init()`. – We need to close this gap.
However, there is also the question if using zero as surrogate ts for this case
(as the old code does), is a good solution or not? – We could try to use
stream-time, but for the very first startup of an application, we also do not
have stream-time established yet, so we kinda push the can down the road.
was:
As reported on the dev mailing list, we introduced a regression bug via
https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did revert
the commit
([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d])
for 4.1 release, and want to fix-forward for 4.2 release.
Stacktrace:
{code:java}
15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the
following exception during processing and the registered exception handler
opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: failed to initialize
processor random-value-processor
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894)
[kafka-streams-4.2.0-SNAPSHOT.jar:?]
Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()"
because the return value of
"org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()"
is null
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
at
io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21)
~[kafka-streams-stateful-unspecified.jar:?]
at
org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124)
~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
... 8 more {code}
Thanks [~eduwerc] for reporting the issue.
> NPE in `Processor#init()` accessing state store
> -----------------------------------------------
>
> Key: KAFKA-19638
> URL: https://issues.apache.org/jira/browse/KAFKA-19638
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 4.2.0
> Reporter: Matthias J. Sax
> Priority: Blocker
> Fix For: 4.2.0
>
>
> As reported on the dev mailing list, we introduced a regression bug via
> https://issues.apache.org/jira/browse/KAFKA-13722 in 4.1 branch. We did
> revert the commit
> ([https://github.com/apache/kafka/commit/f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d])
> for 4.1 release, and want to fix-forward for 4.2 release.
> Stacktrace:
> {code:java}
> 15:29:05 ERROR [STREAMS] KafkaStreams - stream-client [app1] Encountered the
> following exception during processing and the registered exception handler
> opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
> org.apache.kafka.streams.errors.StreamsException: failed to initialize
> processor random-value-processor
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:132)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:141)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTopology(StreamTask.java:1109)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamTask.completeRestoration(StreamTask.java:297)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:955)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:1417)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:1219)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:934)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:894)
> [kafka-streams-4.2.0-SNAPSHOT.jar:?]
> Caused by: java.lang.NullPointerException: Cannot invoke
> "org.apache.kafka.streams.processor.internals.ProcessorRecordContext.timestamp()"
> because the return value of
> "org.apache.kafka.streams.processor.internals.InternalProcessorContext.recordContext()"
> is null
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:32)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$6(MeteredKeyValueStore.java:303)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:901)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:303)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> org.apache.kafka.streams.processor.internals.AbstractReadWriteDecorator$KeyValueStoreReadWriteDecorator.put(AbstractReadWriteDecorator.java:123)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> at
> io.littlehorse.simulations.stateful.app.RandomValueProcessor.init(RandomValueProcessor.java:21)
> ~[kafka-streams-stateful-unspecified.jar:?]
> at
> org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:124)
> ~[kafka-streams-4.2.0-SNAPSHOT.jar:?]
> ... 8 more {code}
> Thanks [~eduwerc] for reporting the issue.
> We clearly have a testing gap, not trying to use a state store within
> `Processor#init()`. – We need to close this gap.
> However, there is also the question if using zero as surrogate ts for this
> case (as the old code does), is a good solution or not? – We could try to use
> stream-time, but for the very first startup of an application, we also do not
> have stream-time established yet, so we kinda push the can down the road.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)