Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 merged PR #15777: URL: https://github.com/apache/kafka/pull/15777 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2071783047 The failed tests is fixed by https://github.com/chia7712/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070018194 @chia7712 Rebased. PTAL :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070011423 #15776 is merged so please rebase code :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on PR #15777: URL: https://github.com/apache/kafka/pull/15777#issuecomment-2068874029 CC: @soarez @chia7712 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 merged PR #15136: URL: https://github.com/apache/kafka/pull/15136 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2067228882 the failed `testParseAndValidateAddressesWithReverseLookup` is traced by #15758. will merge it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1571030504 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: Added a comment in [3d78e55](https://github.com/apache/kafka/pull/15136/commits/3d78e5567fd16aa77f0b9f8c510ad26f0e6443c6) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1571012160 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: > I think that's not necessarily true. findAbandonedFutureLogs may find a sourceLog if the log directory is online but I don't think it's safe to update the high watermark even then because sourceLog may be ahead of futureLog. you are right! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570959086 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: I think that's not necessarily true. `findAbandonedFutureLogs` may find a `sourceLog` if the log directory is online but I don't think it's safe to update the high watermark even then because sourceLog may be ahead of futureLog. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570922398 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) + info(s"The current replica is successfully replaced with the future replica for $topicPartition") +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { Review Comment: it seems we don't need `updateHighWatermark`, since we call `updateHighWatermark` only if `sourceLog` is defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570920951 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: Maybe we need to add comments for `abortAndPauseCleaning`. The `recoverAbandonedFutureLogs` is called in starting and calling `abortAndPauseCleaning` here is a bit weird to me -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: Addressed in [d02fc0b](https://github.com/apache/kafka/pull/15136/commits/d02fc0b9abeacf95096d4275232ea0cf829d836a) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: Addressed this and other comments in [31412a9](https://github.com/apache/kafka/pull/15136/commits/31412a9485b63731a49b5d44d6dc6fbeaf52dd0f) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: Addressed this and other comments in [d089929](https://github.com/apache/kafka/pull/15136/commits/d089929b5a92c0be308f685980e2f787b652ed86) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: > I guess cleaner.abortAndPauseCleaning is added because we call resumeCleaning later, and it will cause error if we don't call abortAndPauseCleaning here? That's correct if the cleaning operation hasn't started. The cleaning operation is scheduled on a separate thread so we cannot be sure if `inProgress` map in `LogCleanerManager` has a key for the given topicPartition at the time we iterate over these logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569178614 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") Review Comment: `sourceLog` could be empty now, so maybe we need to revise the log message ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: we can replace this `abortAndPauseCleaning(tp)` ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { +cleaner.abortAndPauseCleaning(tp) + } + + replaceCurrentWithFutureLog(currentLog, futureLog) + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: line#1189 will print `info(s"The current replica is successfully replaced with the future replica for $topicPartition")` and that is not much related to fact, right? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + if (cleaner != null) { Review Comment: I guess `cleaner.abortAndPauseCleaning` is added because we call `resumeCleaning` later, and it will cause error if we don't call `abortAndPauseCleaning` here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: Addressed in [062e932](https://github.com/apache/kafka/pull/15136/commits/062e932f260ce9e1df9571b2fc982c63cbaf0f7c) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568904632 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File], val sourceLog = currentLogs.get(topicPartition) val destLog = futureLogs.get(topicPartition) - info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") if (sourceLog == null) throw new KafkaStorageException(s"The current replica for $topicPartition is offline") if (destLog == null) throw new KafkaStorageException(s"The future replica for $topicPartition is offline") - destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) - // the metrics tags still contain "future", so we have to remove it. - // we will add metrics back after sourceLog remove the metrics - destLog.removeLogMetrics() - destLog.updateHighWatermark(sourceLog.highWatermark) + replaceCurrentWithFutureLog(Option(sourceLog), destLog, updateHighWatermark = true) +} + } + + def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: UnifiedLog, updateHighWatermark: Boolean = false): Unit = { +val topicPartition = destLog.topicPartition +info(s"Attempting to replace current log $sourceLog with $destLog for $topicPartition") - // Now that future replica has been successfully renamed to be the current replica - // Update the cached map and log cleaner as appropriate. - futureLogs.remove(topicPartition) - currentLogs.put(topicPartition, destLog) - if (cleaner != null) { -cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) -resumeCleaning(topicPartition) - } +destLog.renameDir(UnifiedLog.logDirName(topicPartition), shouldReinitialize = true) +// the metrics tags still contain "future", so we have to remove it. +// we will add metrics back after sourceLog remove the metrics +destLog.removeLogMetrics() +if (updateHighWatermark && sourceLog.isDefined) { + destLog.updateHighWatermark(sourceLog.get.highWatermark) +} - try { -sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), shouldReinitialize = true) +// Now that future replica has been successfully renamed to be the current replica +// Update the cached map and log cleaner as appropriate. +futureLogs.remove(topicPartition) +currentLogs.put(topicPartition, destLog) +if (cleaner != null) { + cleaner.alterCheckpointDir(topicPartition, sourceLog.map(_.parentDirFile), destLog.parentDirFile) Review Comment: It seems `cleaner.alterCheckpointDir` will do nothing if `sourceLog` is empty. Maybe we can revert those changes and run `alterCheckpointDir` only if `sourceLog` is defined. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568882032 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: I've refactored LogManager to use `replaceCurrentWithFutureLog` in [b87a21f](https://github.com/apache/kafka/pull/15136/commits/b87a21f3bd0eea6e4083a2d14b41053361f7b40a). PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1568883111 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -81,8 +82,7 @@ public BrokerNode build( logDataDirectories = Collections. singletonList(String.format("combined_%d", id)); } else { -logDataDirectories = Collections. -singletonList(String.format("broker_%d_data0", id)); +logDataDirectories = Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", id), String.format("broker_%d_data1", id))); Review Comment: Rebased to use the new method in the builder -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1566938172 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -81,8 +82,7 @@ public BrokerNode build( logDataDirectories = Collections. singletonList(String.format("combined_%d", id)); } else { -logDataDirectories = Collections. -singletonList(String.format("broker_%d_data0", id)); +logDataDirectories = Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", id), String.format("broker_%d_data1", id))); Review Comment: I have created https://issues.apache.org/jira/browse/KAFKA-16559 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1565638692 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: That's a great suggestion! I'm going to try refactoring this to use `replaceCurrentWithFutureLog` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1565572433 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") Review Comment: `${tp}` -> `$tp` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1565571409 ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -81,8 +82,7 @@ public BrokerNode build( logDataDirectories = Collections. singletonList(String.format("combined_%d", id)); } else { -logDataDirectories = Collections. -singletonList(String.format("broker_%d_data0", id)); +logDataDirectories = Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", id), String.format("broker_%d_data1", id))); Review Comment: Personally, I'd like to change setNumBrokerNodes(int) to setBrokerNodes(int, int). The second argument is used to define the number of data folders. After all, not all tests require 2+ folders. Maybe we can address that in another PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
chia7712 commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1565569471 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: it seems to me that we need to call `closeHandlers` at least if we don't do refactor in this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1562307197 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: I agree option (b) is better. But for this: > We can still run into trouble if the directory with the main replica is offline. At some point that will cause a crash if the directory ever comes back online. But there's nothing we can do about that here. Maybe future work could improve how the broker handles loading conflicting logs. I believe this PR: https://github.com/apache/kafka/pull/15335 should already fix this issue. @OmniaGM , could you help confirm it? If not, I think we should not promote the future log when main replica is offline to cause potential future issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1562284319 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { case (futureLog, currentLog) => + val tp = futureLog.topicPartition + + futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + futureLog.removeLogMetrics() + futureLogs.remove(tp) + + currentLog.foreach { log => +log.removeLogMetrics() +log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = false) +addLogToBeDeleted(log) +info(s"Old log for partition ${tp} is renamed to ${log.dir.getAbsolutePath} and is scheduled for deletion") + } + + currentLogs.put(tp, futureLog) + futureLog.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") Review Comment: I can see we basically do the similar things as what `replaceCurrentWithFutureLog` did, why should we duplicate the codes here again? Could we not be able to call `replaceCurrentWithFutureLog` with additional parameters to indicate we want to skip some actions? Like what we did now, we didn't close the `currentLog`, which should have potential resource leak. Using the `replaceCurrentWithFutureLog` could avoid this issue happening. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2047306952 Will check it again within this week. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-2046971782 @showuon PTAL -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1557442275 ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1437,7 +1438,9 @@ class KRaftClusterTest { assertEquals(Set(1, 2), info.get.isr().asScala.toSet) } -// Modify foo-0 so that it refers to a future replica +// Modify foo-0 so that it refers to a future replica. +// This has the same effect as the main replica being in an offline +// log dir and the broker crashing just at the time of promotion Review Comment: I think I know what you mean, but I don't know if this is easy to understand in its current phrasing. Here's a suggestion: This is equivalent to a failure during the promotion of the future replica and a restart with directory for the main replica being offline. ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1456,6 +1459,76 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). +setNumBrokerNodes(3). +setNumControllerNodes(1).build()). + build() +try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { +val broker0 = cluster.brokers().get(0) +val broker1 = cluster.brokers().get(1) +val foo0 = new TopicPartition("foo", 0) + +admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + +// Wait until foo-0 is created on broker0. +TestUtils.retry(6) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) +} + +// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] +broker0.shutdown() +TestUtils.retry(6) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) +} + +val log = broker0.logManager.getLog(foo0).get + +// Copy foo-0 to another log dir +val parentDir = log.parentDir +var targetParentDir = parentDir.substring(0, parentDir.length - 1) +if (parentDir.endsWith("0")) { + targetParentDir += "1" +} else { + targetParentDir += "0" +} Review Comment: This assumes your change to `BrokerNode.build()` and its specific naming pattern. We'd get a more robust test if instead you access the directory list on the broker, filter out `parentDir`, and choose one of the remaining log dirs. ## core/src/test/java/kafka/testkit/BrokerNode.java: ## @@ -81,8 +82,7 @@ public BrokerNode build( logDataDirectories = Collections. singletonList(String.format("combined_%d", id)); } else { -logDataDirectories = Collections. -singletonList(String.format("broker_%d_data0", id)); +logDataDirectories = Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", id), String.format("broker_%d_data1", id))); Review Comment: I'm wondering if this is going to break test cases that don't support JBOD, if so there should be some new failing tests. ## core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala: ## @@ -1456,6 +1459,76 @@ class KRaftClusterTest { cluster.close() } } + + @Test + def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = { +val cluster = new KafkaClusterTestKit.Builder( + new TestKitNodes.Builder(). +setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2). +setNumBrokerNodes(3). +setNumControllerNodes(1).build()). + build() +try { + cluster.format() + cluster.startup() + val admin = Admin.create(cluster.clientProperties()) + try { +val broker0 = cluster.brokers().get(0) +val broker1 = cluster.brokers().get(1) +val foo0 = new TopicPartition("foo", 0) + +admin.createTopics(Arrays.asList( + new NewTopic("foo", 3, 3.toShort))).all().get() + +// Wait until foo-0 is created on broker0. +TestUtils.retry(6) { + assertTrue(broker0.logManager.getLog(foo0).isDefined) +} + +// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2] +broker0.shutdown() +TestUtils.retry(6) { + val info = broker1.metadataCache.getPartitionInfo("foo", 0) + assertTrue(info.isDefined) + assertEquals(Set(1, 2), info.get.isr().asScala.toSet) +} + +
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1551349514 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: Addressed in 430546fa5a78118497ada6373607f6a25c78a8e8. Please take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1549709937 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: If we didn't know if the future log was caught up or not, then I'd prefer (a), but at this point I can't conceive of a different scenario – other than a failure during replica promotion – that would cause the future log to be in the directory assigned in the metadata. So I agree that the two logs likely will be either caught up or very close. So I agree it makes more sense to do (b) - promote the future log and delete the main one. We can still run into trouble if the directory with the main replica is offline. At some point that will cause a crash if the directory ever comes back online. But there's nothing we can do about that here. Maybe future work could improve how the broker handles loading conflicting logs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1549695928 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: Thanks for the feedback. For (2), we've couple of options. We can either: (a) ignore the future replica (say in dir2) if the main replica exists in an online log dir (say dir1) or, (b) promote the future replica (in dir2) and remove the main replica (in dir1). (a) would result in ReplicaManager spawning a replicaAlterLogDir thread for the future replica and correcting the assignment to dir1, only for it to be changed back again to dir2 when the replicaAlterLogDir thread finishes its job. Refer https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2734 and https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2745 Since in these scenarios, the future replica is almost caught up with the main replica, I'm leaning towards option (b) to avoid more reassignments. Please let me know if you feel otherwise. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
showuon commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1544295936 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: For (1), I think we'll catch up until the high watermark before publish the image. So it should be safe. For (2), good suggestion. ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2745,10 +2745,10 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.forKeyValue { (tp, info) => + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => try { val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) Review Comment: Same here, unnecessary change. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1511125756 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2790,7 +2791,6 @@ class ReplicaManager(val config: KafkaConfig, // is unavailable. This is required to ensure that we include the partition's // high watermark in the checkpoint file (see KAFKA-1647). val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew) - val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) Review Comment: It seems the changes in this file are no longer necessary? ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File], } } + def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Unit = { +val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage) +abandonedFutureLogs.foreach { log => + val tp = log.topicPartition + + log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true) + log.removeLogMetrics() + futureLogs.remove(tp) + + currentLogs.put(tp, log) + log.newMetrics() + + info(s"Successfully renamed abandoned future log for $tp") +} + } + + private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): Iterable[UnifiedLog] = { +futureLogs.values.flatMap { log => + val topicId = log.topicId.getOrElse { +throw new RuntimeException(s"The log dir $log does not have a topic ID, " + + "which is not allowed when running in KRaft mode.") + } + val partitionId = log.topicPartition.partition() + Option(newTopicsImage.getPartition(topicId, partitionId)) +.filter(pr => directoryId(log.parentDir).contains(pr.directory(brokerId))) +.map(_ => log) Review Comment: 1. Do we have the guarantee that the topic will be in the new topics image? Can't the topic be in later metadata delta if a compaction hasn't yet occurred? 2. Shouldn't we check if the main replica is on one of the other directories? If the main replica is on an offline dir (as in the scenario described in the issue) the broker will refuse to start once it is restarted with that directory online, as it will see the two main replicas. If the log directory for the main replica is online, we should be able to detect that here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
soarez commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r1509052378 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2859,10 +2867,10 @@ class ReplicaManager(val config: KafkaConfig, "local leaders.") replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet) localLeaders.forKeyValue { (tp, info) => - getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, isNew) => + val partitionAssignedDirectoryId = directoryIds.find(_._1.topicPartition() == tp).map(_._2) + getOrCreatePartition(tp, delta, info.topicId, isLocalFollower = false, partitionAssignedDirectoryId).foreach { case (partition, isNew) => Review Comment: > `isLocalFollower = false` I don't think this is ok. Setting this to false disables the fix when the broker is the leader. Which can happen in RF=1 or with RF=N when the broker comes back in the ISR (e.g. because there were no new messages, and the other replicas are all shutdown). I had a conversation with @gaurav-narula about this, and the plan is to apply this fix earlier when the logs are being loaded. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
OmniaGM commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-1914798524 Hi, @rondagostino, @showuon and @cmccabe can one of you have a look into this pr please and merge it if you are happy with the changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r169837 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2773,6 +2776,12 @@ class ReplicaManager(val config: KafkaConfig, Some(partition, false) case HostedPartition.None => +var isNew = true Review Comment: I don't think that would work because `isNew` is mutated inside the callback which may be executed conditionally. Edit: Perhaps I can have `maybeRecoverAbandonedFutureLog` return a boolean instead -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r169837 ## core/src/main/scala/kafka/server/ReplicaManager.scala: ## @@ -2773,6 +2776,12 @@ class ReplicaManager(val config: KafkaConfig, Some(partition, false) case HostedPartition.None => +var isNew = true Review Comment: I don't think that would work because `isNew` is mutated inside the callback which may be executed conditionally. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
OmniaGM commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r157092 ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5592,6 +5592,63 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: Boolean): Unit = { +// Given +val localId = 1 +val topicPartition = new TopicPartition("foo", 0) + +val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) +val mockReplicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager]) +val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager), + mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager), + enableRemoteStorage = enableRemoteStorage, + shouldMockLog = true +) + +val directoryId1 = Uuid.randomUuid() +val directoryId2 = Uuid.randomUuid() +val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true) + +val mockLogMgr = replicaManager.logManager +when( + mockLogMgr.getLog(topicPartition, isFuture = true) +).thenReturn { + Some(mockFutureLog) +} +when(mockLogMgr.getLog(topicPartition)).thenReturn { + None +} + +when( + mockLogMgr.directoryId(mockFutureLog.parentDir) +).thenReturn { + Some(directoryId1) Review Comment: same as above ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -4831,15 +4831,15 @@ class ReplicaManagerTest { try { val foo0 = new TopicPartition("foo", 0) val emptyDelta = new TopicsDelta(TopicsImage.EMPTY) - val (fooPart, fooNew) = replicaManager.getOrCreatePartition(foo0, emptyDelta, FOO_UUID).get + val (fooPart, fooNew) = replicaManager.getOrCreatePartition(foo0, emptyDelta, FOO_UUID, false, None).get Review Comment: Any reason why we are not setting `partitionAssignedDirectoryId` to None as a default in `replicaManager.getOrCreatePartition` signature? ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5592,6 +5592,63 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: Boolean): Unit = { +// Given +val localId = 1 +val topicPartition = new TopicPartition("foo", 0) + +val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) +val mockReplicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager]) +val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager), + mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager), + enableRemoteStorage = enableRemoteStorage, + shouldMockLog = true +) + +val directoryId1 = Uuid.randomUuid() +val directoryId2 = Uuid.randomUuid() +val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true) + +val mockLogMgr = replicaManager.logManager +when( + mockLogMgr.getLog(topicPartition, isFuture = true) +).thenReturn { Review Comment: `{` is used to hold a block but in this case it is a simple value. So maybe replace all `.thenReturn {Some(mockFutureLog)}` to `.thenReturn(Some(mockFutureLog))` ## core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala: ## @@ -5592,6 +5592,63 @@ class ReplicaManagerTest { } } + @ParameterizedTest + @ValueSource(booleans = Array(true, false)) + def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: Boolean): Unit = { +// Given +val localId = 1 +val topicPartition = new TopicPartition("foo", 0) + +val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager]) +val mockReplicaAlterLogDirsManager = mock(classOf[ReplicaAlterLogDirsManager]) +val replicaManager = setupReplicaManagerWithMockedPurgatories( + timer = new MockTimer(time), + brokerId = localId, + mockReplicaFetcherManager = Some(mockReplicaFetcherManager), + mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager), + enableRemoteStorage = enableRemoteStorage, + shouldMockLog = true +) + +val directoryId1 = Uuid.randomUuid() +val directoryId2 = Uuid.randomUuid() +val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true) + +val mockLogMgr = replicaManager.logManager +when( + mockLogMgr.getLog(topicPartition, isFuture = true) +).thenReturn { + Some(mockFutureLog) +} +when(mockLogMgr.getLog(topicPartition)).thenReturn { + None Review Comment:
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
OmniaGM commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r142323 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1156,6 +1156,32 @@ class LogManager(logDirs: Seq[File], } } + def maybeRecoverAbandonedFutureLog(topicPartition: TopicPartition, partitionAssignedDirectoryId: Option[Uuid], callback: () => Unit): Unit = { +logCreationOrDeletionLock synchronized { + for { +futureLog <- getLog(topicPartition, isFuture = true) +futureDirId <- directoryId(futureLog.parentDir) +assignedDirId <- partitionAssignedDirectoryId +if futureDirId == assignedDirId + } { +val sourceLog = futureLog Review Comment: `sourceLog` can be moved into the `for` block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
OmniaGM commented on code in PR #15136: URL: https://github.com/apache/kafka/pull/15136#discussion_r142323 ## core/src/main/scala/kafka/log/LogManager.scala: ## @@ -1156,6 +1156,32 @@ class LogManager(logDirs: Seq[File], } } + def maybeRecoverAbandonedFutureLog(topicPartition: TopicPartition, partitionAssignedDirectoryId: Option[Uuid], callback: () => Unit): Unit = { +logCreationOrDeletionLock synchronized { + for { +futureLog <- getLog(topicPartition, isFuture = true) +futureDirId <- directoryId(futureLog.parentDir) +assignedDirId <- partitionAssignedDirectoryId +if futureDirId == assignedDirId + } { +val sourceLog = futureLog Review Comment: `sourceLog` can move into the `for` block. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]
gaurav-narula commented on PR #15136: URL: https://github.com/apache/kafka/pull/15136#issuecomment-1878536740 CC: @OmniaGM @pprovenzano -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org