soarez commented on code in PR #15335: URL: https://github.com/apache/kafka/pull/15335#discussion_r1512748010
########## core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala: ########## @@ -289,13 +289,10 @@ class BrokerMetadataPublisher( try { // Start log manager, which will perform (potentially lengthy) // recovery-from-unclean-shutdown if required. - logManager.startup(metadataCache.getAllTopics()) - - // Delete partition directories which we're not supposed to have. We have - // to do this before starting ReplicaManager, so that the stray replicas - // don't block creation of new ones with different IDs but the same names. - // See KAFKA-14616 for details. - logManager.deleteStrayKRaftReplicas(brokerId, newImage.topics()) + logManager.startup( + metadataCache.getAllTopics(), + shouldBeStrayKraftLog = log => LogManager.isStrayKraftReplica(brokerId, newImage.topics(), log) Review Comment: A suggestion, due to the following concerns: * LogManager shouldn't handle metadata records, these types shouldn't depend on each other. The metadata records should be handled here instead. * The argument name looks a bit strange, namely the 'should' and 'kraft' parts. ```suggestion isStray = (topicId, partition) => Option(newImage.topics().getPartition(topicId.getOrElse{ throw new RuntimeException(s"Partition $partition does not have a topic ID, " + "which is not allowed when running in KRaft mode.") }, partition.partition())).exists(_.replicas.contains(brokerId)) ``` Perhaps LogManager should declare a type for this argument since it's propagated down the call stack several levels? ########## core/src/main/scala/kafka/log/LogManager.scala: ########## @@ -354,6 +355,14 @@ class LogManager(logDirs: Seq[File], } else if (logDir.getName.endsWith(UnifiedLog.StrayDirSuffix)) { addStrayLog(topicPartition, log) warn(s"Loaded stray log: $logDir") + } else if (shouldBeStrayKraftLog(log)) { + // Mark the partition directories we're not supposed to have as stray. We have to do this + // during log load because topics may have been recreated with the same name while a disk + // was offline. + // See KAFKA-16234, KAFKA-16157 and KAFKA-14616 for details. Review Comment: Perhaps it could help a future reader to clarify that kraft mode (as opposed to zk) does not track deleted topics nor prevent them from being re-created with the same name before every replica has been deleted, and so there's no way for a broker with a to-be-deleted replica in an offline directory to detect this earlier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org