junrao commented on a change in pull request #11351:
URL: https://github.com/apache/kafka/pull/11351#discussion_r714188175



##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -482,6 +482,20 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => 
CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler 
could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when 
LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old 
segments due to
+       * retention. However, the old segments could have been closed by the 
LogManager, which would
+       * cause an exception and subsequently mark logdir as offline. As a 
result, the broker would
+       * not flush the remaining partitions or write the clean shutdown 
marker. Ultimately, the
+       * broker would have to take hours to recover the log during restart and 
are subject to
+       * potential data loss.

Review comment:
       I don't think unclean shutdown will cause data loss if acks = all is 
used.

##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), 
TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time '%s' is 
scheduled.".format(name))
+        return null

Review comment:
       In UnifiedLog, we have code uses the returned future.
   
   `  val producerExpireCheck = scheduler.schedule(name = 
"PeriodicProducerExpirationCheck", fun = () => {
   `
   With this change, perhaps we could return an Option and let the caller deal 
with it accordingly?

##########
File path: core/src/main/scala/kafka/utils/KafkaScheduler.scala
##########
@@ -107,7 +107,10 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), 
TimeUnit.MILLISECONDS.convert(period, unit)))
     this synchronized {
-      ensureRunning()
+      if (!isStarted) {
+        info("Kafka scheduler is not running at the time '%s' is 
scheduled.".format(name))

Review comment:
       Perhaps we could add that we are ignoring the named task.

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -482,6 +482,20 @@ class BrokerServer(
       }
       metadataSnapshotter.foreach(snapshotter => 
CoreUtils.swallow(snapshotter.close(), this))
 
+      /**
+       * We must shutdown the scheduler early because otherwise, the scheduler 
could touch other
+       * resources that might have been shutdown and cause exceptions.
+       * For example, if we didn't shutdown the scheduler first, when 
LogManager was closing
+       * partitions one by one, the scheduler might concurrently delete old 
segments due to
+       * retention. However, the old segments could have been closed by the 
LogManager, which would
+       * cause an exception and subsequently mark logdir as offline. As a 
result, the broker would

Review comment:
       exception => IOException ?

##########
File path: core/src/main/scala/kafka/server/BrokerServer.scala
##########
@@ -499,12 +499,13 @@ class BrokerServer(
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
 
-      if (logManager != null)
-        CoreUtils.swallow(logManager.shutdown(), this)
-      // be sure to shutdown scheduler after log manager
+      // be sure to shutdown scheduler before log manager

Review comment:
       @ijuma and @ccding :  The description of 
ScheduledThreadPoolExecutor.shutdown() says 
   
   `This method does not wait for previously submitted tasks to complete 
execution. Use awaitTermination to do that.
   `
   
   So, we probably want to call awaitTermination() with a timeout like 10secs 
to make sure all existing tasks complete before shutting down other components.
   
   We probably can't use shutdownNow() since it interrupts the task and could 
cause IOException when blocking operations (e.g., force) are applied on a file 
channel. This will then lead to unclean shutdown.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to