[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845824#comment-16845824 ]
Sebastiaan edited comment on KAFKA-7678 at 5/22/19 12:16 PM: ------------------------------------------------------------- Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is the full stacktrace: {code:java} message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758) {code} Followed by: {code:java} message: task [1_26] Could not close task due to the following error: logger_name: org.apache.kafka.streams.processor.internals.StreamTask java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that: {code:java} public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }{code} {color:#000080} {color} Seems to my (ignorant) eye that the flush method should also be wrapped in a null check. Should this issue be re-opened? Or should I make a new issue and refer to this one? was (Author: sebastiaan83): Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is the full stacktrace: {code:java} message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758) {code} Followed by: {code:java} message: task [1_26] Could not close task due to the following error: logger_name: org.apache.kafka.streams.processor.internals.StreamTask java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that: {code:java} public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }{code} {color:#000080} {color} Seems to my (ignorant) eye that the flush method should also be wrapped in a null check. > Failed to close producer due to java.lang.NullPointerException > -------------------------------------------------------------- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.1, 2.0.1, 2.1.0 > Reporter: Jonathan Santilli > Assignee: Jonathan Santilli > Priority: Minor > Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2 > > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)