Hi Andrew,
what version of Kafka Streams do you use?
Since 2.7 there is a null check for the source node [1].
The following ticket might be related:
https://issues.apache.org/jira/browse/KAFKA-10205
Best,
Bruno
[1]
https://github.com/apache/kafka/blob/20028e24cca91422b8f02fdbf45d2cd9ef24c901/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L1285
On 06.06.23 01:55, An, Hongguo (CORP) wrote:
Hi:
Every time, I restart my kafka stream app, it failed, and I have to reset the
app, the error is:
org.apache.kafka.streams.errors.StreamsException: stream-thread
[microapi-unified-profile-data-sync.dit-1067669c-2364-440d-a1c3-69ad45cc301d-StreamThread-4]
Failed to rebalance.
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:972)
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:863)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:819)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:788)
Caused by: java.lang.NullPointerException: Cannot invoke
"org.apache.kafka.streams.processor.internals.SourceNode.getTimestampExtractor()" because
"source" is null
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:230)
at
org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:459)
at
org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:410)
at
org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:395)
at
org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148)
at
org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107)
at
org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:294)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:285)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:424)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:358)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:353)
at
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1251)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1220)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:84)
at
brave.kafka.clients.TracingConsumer.poll(TracingConsumer.java:78)
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:963)
It works after reset, please help.
Thanks
Andrew
This message and any attachments are intended only for the use of the addressee
and may contain information that is privileged and confidential. If the reader
of the message is not the intended recipient or an authorized representative of
the intended recipient, you are hereby notified that any dissemination of this
communication is strictly prohibited. If you have received this communication
in error, notify the sender immediately by return email and delete the message
and any attachments from your system.