[
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dhruvil Shah updated KAFKA-7678:
--------------------------------
Component/s: streams
> Failed to close producer due to java.lang.NullPointerException
> --------------------------------------------------------------
>
> Key: KAFKA-7678
> URL: https://issues.apache.org/jira/browse/KAFKA-7678
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Reporter: Jonathan Santilli
> Priority: Major
>
> This occurs when the group is rebalancing in a Kafka Stream application and
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it
> gracefully.
>
>
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46]
> Failed to close producer due to the following error:
> java.lang.NullPointerException
> at
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
> at
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>
>
> Although I have checked the code and the method
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*`
> class is expecting any kind of error to happen since is catching
> `*Throwable*`.
>
>
>
> {noformat}
> try {
> recordCollector.close();
> } catch (final Throwable e) {
> log.error("Failed to close producer due to the following error:", e);
> } finally {
> producer = null;
> }{noformat}
>
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
> log.debug("Closing producer");
> producer.close();
> producer = null;
> checkForException();
> }{noformat}
>
> Change it for:
>
> {noformat}
> @Override
> public void close() {
> log.debug("Closing producer");
> if ( Objects.nonNull(producer) ) {
> producer.close();
> producer = null;
> }
> checkForException();
> }{noformat}
>
> How does that sound?
>
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)