soarez commented on code in PR #15335:
URL: https://github.com/apache/kafka/pull/15335#discussion_r1512748010


##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -289,13 +289,10 @@ class BrokerMetadataPublisher(
     try {
       // Start log manager, which will perform (potentially lengthy)
       // recovery-from-unclean-shutdown if required.
-      logManager.startup(metadataCache.getAllTopics())
-
-      // Delete partition directories which we're not supposed to have. We have
-      // to do this before starting ReplicaManager, so that the stray replicas
-      // don't block creation of new ones with different IDs but the same 
names.
-      // See KAFKA-14616 for details.
-      logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics())
+      logManager.startup(
+        metadataCache.getAllTopics(),
+        shouldBeStrayKraftLog = log => 
LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log)

Review Comment:
   A suggestion, due to the following concerns:
   
   * LogManager shouldn't handle metadata records, these types shouldn't depend 
on each other. The metadata records should be handled here instead.
   * The argument name looks a bit strange, namely the 'should' and 'kraft' 
parts.
   
   ```suggestion
           isStray = (topicId, partition) => 
Option(newImage.topics().getPartition(topicId.getOrElse{
             throw new RuntimeException(s"Partition $partition does not have a 
topic ID, " +
               "which is not allowed when running in KRaft mode.")
           }, partition.partition())).exists(_.replicas.contains(brokerId))
   ```
   
   Perhaps LogManager should declare a type for this argument since it's 
propagated down the call stack several levels?



##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File],
     } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) {
       addStrayLog(topicPartition, log)
       warn(s"Loaded stray log: $logDir")
+    } else if (shouldBeStrayKraftLog(log)) {
+      // Mark the partition directories we're not supposed to have as stray. 
We have to do this
+      // during log load because topics may have been recreated with the same 
name while a disk
+      // was offline.
+      // See KAFKA-16234, KAFKA-16157 and KAFKA-14616 for details.

Review Comment:
   Perhaps it could help a future reader to clarify that kraft mode (as opposed 
to zk) does not track deleted topics nor prevent them from being re-created 
with the same name before every replica has been deleted, and so there's no way 
for a broker with a to-be-deleted replica in an offline directory to detect 
this earlier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to