junrao commented on a change in pull request #11351: URL: https://github.com/apache/kafka/pull/11351#discussion_r714188175
########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -482,6 +482,20 @@ class BrokerServer( } metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this)) + /** + * We must shutdown the scheduler early because otherwise, the scheduler could touch other + * resources that might have been shutdown and cause exceptions. + * For example, if we didn't shutdown the scheduler first, when LogManager was closing + * partitions one by one, the scheduler might concurrently delete old segments due to + * retention. However, the old segments could have been closed by the LogManager, which would + * cause an exception and subsequently mark logdir as offline. As a result, the broker would + * not flush the remaining partitions or write the clean shutdown marker. Ultimately, the + * broker would have to take hours to recover the log during restart and are subject to + * potential data loss. Review comment: I don't think unclean shutdown will cause data loss if acks = all is used. ########## File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala ########## @@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) this synchronized { - ensureRunning() + if (!isStarted) { + info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name)) + return null Review comment: In UnifiedLog, we have code uses the returned future. ` val producerExpireCheck = scheduler.schedule(name = "PeriodicProducerExpirationCheck", fun = () => { ` With this change, perhaps we could return an Option and let the caller deal with it accordingly? ########## File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala ########## @@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int, debug("Scheduling task %s with initial delay %d ms and period %d ms." .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit))) this synchronized { - ensureRunning() + if (!isStarted) { + info("Kafka scheduler is not running at the time '%s' is scheduled.".format(name)) Review comment: Perhaps we could add that we are ignoring the named task. ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -482,6 +482,20 @@ class BrokerServer( } metadataSnapshotter.foreach(snapshotter => CoreUtils.swallow(snapshotter.close(), this)) + /** + * We must shutdown the scheduler early because otherwise, the scheduler could touch other + * resources that might have been shutdown and cause exceptions. + * For example, if we didn't shutdown the scheduler first, when LogManager was closing + * partitions one by one, the scheduler might concurrently delete old segments due to + * retention. However, the old segments could have been closed by the LogManager, which would + * cause an exception and subsequently mark logdir as offline. As a result, the broker would Review comment: exception => IOException ? ########## File path: core/src/main/scala/kafka/server/BrokerServer.scala ########## @@ -499,12 +499,13 @@ class BrokerServer( if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) - if (logManager != null) - CoreUtils.swallow(logManager.shutdown(), this) - // be sure to shutdown scheduler after log manager + // be sure to shutdown scheduler before log manager Review comment: @ijuma and @ccding : The description of ScheduledThreadPoolExecutor.shutdown() says `This method does not wait for previously submitted tasks to complete execution. Use awaitTermination to do that. ` So, we probably want to call awaitTermination() with a timeout like 10secs to make sure all existing tasks complete before shutting down other components. We probably can't use shutdownNow() since it interrupts the task and could cause IOException when blocking operations (e.g., force) are applied on a file channel. This will then lead to unclean shutdown. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org