[ 
https://issues.apache.org/jira/browse/KAFKA-10723?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kowshik Prakasam updated KAFKA-10723:
-------------------------------------
    Description: 
*TL;DR:*

The asynchronous shutdown in {{LogManager}} has the shortcoming that if during 
shutdown any of the internal futures fail, then we do not always ensure that 
all futures are completed before {{LogManager.shutdown}} returns. As a result, 
despite the shut down completed message from KafkaServer is seen in the error 
logs, some futures continue to run from inside LogManager attempting to close 
the logs. This is misleading and it could possibly break the general rule of 
avoiding post-shutdown activity in the Broker.

*Description:*

When LogManager is shutting down, exceptions in log closure are handled 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501].
 However, this 
[line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502]
 in the finally clause shuts down the thread pools *asynchronously*. The code: 
_threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each 
thread pool) in which previously submitted tasks are executed, but no new tasks 
will be accepted (see javadoc link 
[here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._
 As a result, if there is an exception during log closure, some of the thread 
pools which are closing logs could be leaked and continue to run in the 
background, after the control returns to the caller (i.e. {{KafkaServer}}). As 
a result, even after the "shut down completed" message is seen in the error 
logs (originating from {{KafkaServer}} shutdown sequence), log closures 
continue to happen in the background, which is misleading.
  

*Proposed options for fixes:*

It seems useful that we maintain the contract with {{KafkaServer}} that after 
{{LogManager.shutdown}} is called once, all tasks that close the logs are 
guaranteed to have completed before the call returns. There are probably couple 
ways to fix this:
 # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with 
_threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for 
all threads to be shutdown before returning the {{_LogManager.shutdown_}} call.
 # An alternate advanced fix could be to just skip creating of checkpoint and 
clean shutdown file only for the affected directory if any of its futures throw 
an error. We continue to wait for all futures to complete for all directories. 
This can require some changes to [this for 
loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496],
 so that we wait for all futures to complete regardless of whether one of them 
threw an error.

  was:
*TL;DR:*

The asynchronous shutdown in {{LogManager}} has the shortcoming that if during 
shutdown any of the internal futures fail, then we do not always ensure that 
all futures are completed before {{LogManager.shutdown}} returns. As a result, 
despite the shut down completed message from KafkaServer is seen in the error 
logs, some futures continue to run from inside LogManager attempting to close 
the logs.

*Description:*

When LogManager is shutting down, exceptions in log closure are handled 
[here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501].
 However, this 
[line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502]
 in the finally clause shuts down the thread pools *asynchronously*. The code: 
_threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for each 
thread pool) in which previously submitted tasks are executed, but no new tasks 
will be accepted (see javadoc link 
[here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._
 As a result, if there is an exception during log closure, some of the thread 
pools which are closing logs could be leaked and continue to run in the 
background, after the control returns to the caller (i.e. {{KafkaServer}}). As 
a result, even after the "shut down completed" message is seen in the error 
logs (originating from {{KafkaServer}} shutdown sequence), log closures 
continue to happen in the background, which is quite misleading.
  

*Proposed options for fixes:*

It seems useful that we maintain the contract with {{KafkaServer}} that after 
{{LogManager.shutdown}} is called once, all tasks that close the logs are 
guaranteed to have completed before the call returns. There are probably couple 
ways to fix this:
 # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with 
_threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait for 
all threads to be shutdown before returning the {{_LogManager.shutdown_}} call.
 # An alternate advanced fix could be to just skip creating of checkpoint and 
clean shutdown file only for the affected directory if any of its futures throw 
an error. We continue to wait for all futures to complete for all directories. 
This can require some changes to [this for 
loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496],
 so that we wait for all futures to complete regardless of whether one of them 
threw an error.


> LogManager leaks internal thread pool activity during shutdown
> --------------------------------------------------------------
>
>                 Key: KAFKA-10723
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10723
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Kowshik Prakasam
>            Priority: Major
>
> *TL;DR:*
> The asynchronous shutdown in {{LogManager}} has the shortcoming that if 
> during shutdown any of the internal futures fail, then we do not always 
> ensure that all futures are completed before {{LogManager.shutdown}} returns. 
> As a result, despite the shut down completed message from KafkaServer is seen 
> in the error logs, some futures continue to run from inside LogManager 
> attempting to close the logs. This is misleading and it could possibly break 
> the general rule of avoiding post-shutdown activity in the Broker.
> *Description:*
> When LogManager is shutting down, exceptions in log closure are handled 
> [here|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L497-L501].
>  However, this 
> [line|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L502]
>  in the finally clause shuts down the thread pools *asynchronously*. The 
> code: _threadPools.foreach(.shutdown())_ initiates an orderly shutdown (for 
> each thread pool) in which previously submitted tasks are executed, but no 
> new tasks will be accepted (see javadoc link 
> [here|https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/ExecutorService.html#shutdown()])_._
>  As a result, if there is an exception during log closure, some of the thread 
> pools which are closing logs could be leaked and continue to run in the 
> background, after the control returns to the caller (i.e. {{KafkaServer}}). 
> As a result, even after the "shut down completed" message is seen in the 
> error logs (originating from {{KafkaServer}} shutdown sequence), log closures 
> continue to happen in the background, which is misleading.
>   
> *Proposed options for fixes:*
> It seems useful that we maintain the contract with {{KafkaServer}} that after 
> {{LogManager.shutdown}} is called once, all tasks that close the logs are 
> guaranteed to have completed before the call returns. There are probably 
> couple ways to fix this:
>  # A simple fix could be to replace {{_threadPools.foreach(.shutdown())_ with 
> _threadPools.foreach(.awaitTermination())_}}{{.}} This ensures that we wait 
> for all threads to be shutdown before returning the {{_LogManager.shutdown_}} 
> call.
>  # An alternate advanced fix could be to just skip creating of checkpoint and 
> clean shutdown file only for the affected directory if any of its futures 
> throw an error. We continue to wait for all futures to complete for all 
> directories. This can require some changes to [this for 
> loop|https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L481-L496],
>  so that we wait for all futures to complete regardless of whether one of 
> them threw an error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to