Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-23 Thread via GitHub


chia7712 merged PR #15777:
URL: https://github.com/apache/kafka/pull/15777


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-23 Thread via GitHub


chia7712 commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2071783047

   The failed tests is fixed by 
https://github.com/chia7712/kafka/commit/6e998cffdd33e343945877ccee1fec8337c7d57d


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-22 Thread via GitHub


gaurav-narula commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070018194

   @chia7712 Rebased. PTAL :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-22 Thread via GitHub


chia7712 commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2070011423

   #15776 is merged so please rebase code :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082 Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-22 Thread via GitHub


gaurav-narula commented on PR #15777:
URL: https://github.com/apache/kafka/pull/15777#issuecomment-2068874029

   CC: @soarez @chia7712 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-19 Thread via GitHub


chia7712 merged PR #15136:
URL: https://github.com/apache/kafka/pull/15136


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-19 Thread via GitHub


chia7712 commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2067228882

   the failed `testParseAndValidateAddressesWithReverseLookup` is traced by 
#15758. will merge it


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1571030504


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   Added a comment in 
[3d78e55](https://github.com/apache/kafka/pull/15136/commits/3d78e5567fd16aa77f0b9f8c510ad26f0e6443c6)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1571012160


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   > I think that's not necessarily true. findAbandonedFutureLogs may find a 
sourceLog if the log directory is online but I don't think it's safe to update 
the high watermark even then because sourceLog may be ahead of futureLog.
   
   you are right!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570959086


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   I think that's not necessarily true. `findAbandonedFutureLogs` may find a 
`sourceLog` if the log directory is online but I don't think it's safe to 
update the high watermark even then because sourceLog may be ahead of futureLog.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570922398


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,49 +1217,62 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+  info(s"The current replica is successfully replaced with the future 
replica for $topicPartition")
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {

Review Comment:
   it seems we don't need `updateHighWatermark`, since we call 
`updateHighWatermark` only if `sourceLog` is defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570920951


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   Maybe we need to add comments for `abortAndPauseCleaning`. The 
`recoverAbandonedFutureLogs` is called in starting and calling 
`abortAndPauseCleaning` here is a bit weird to me 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   Addressed in 
[d02fc0b](https://github.com/apache/kafka/pull/15136/commits/d02fc0b9abeacf95096d4275232ea0cf829d836a)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   Addressed this and other comments in 
[31412a9](https://github.com/apache/kafka/pull/15136/commits/31412a9485b63731a49b5d44d6dc6fbeaf52dd0f)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570449270


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   Addressed this and other comments in 
[d089929](https://github.com/apache/kafka/pull/15136/commits/d089929b5a92c0be308f685980e2f787b652ed86)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-18 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1570371982


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   > I guess cleaner.abortAndPauseCleaning is added because we call 
resumeCleaning later, and it will cause error if we don't call 
abortAndPauseCleaning here?
   
   That's correct if the cleaning operation hasn't started. The cleaning 
operation is scheduled on a separate thread so we cannot be sure if 
`inProgress` map in `LogCleanerManager` has a key for the given topicPartition 
at the time we iterate over these logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569178614


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,63 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")

Review Comment:
   `sourceLog` could be empty now, so maybe we need to revise the log message



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   we can replace this `abortAndPauseCleaning(tp)`



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {
+cleaner.abortAndPauseCleaning(tp)
+  }
+
+  replaceCurrentWithFutureLog(currentLog, futureLog)
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   line#1189 will print `info(s"The current replica is successfully replaced 
with the future replica for $topicPartition")` and that is not much related to 
fact, right? 



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1178,6 +1178,33 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+  if (cleaner != null) {

Review Comment:
   I guess `cleaner.abortAndPauseCleaning` is added because we call 
`resumeCleaning` later, and it will cause error if we don't call 
`abortAndPauseCleaning` here? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1569131419


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   Addressed in 
[062e932](https://github.com/apache/kafka/pull/15136/commits/062e932f260ce9e1df9571b2fc982c63cbaf0f7c)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568904632


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1189,50 +1216,61 @@ class LogManager(logDirs: Seq[File],
   val sourceLog = currentLogs.get(topicPartition)
   val destLog = futureLogs.get(topicPartition)
 
-  info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
   if (sourceLog == null)
 throw new KafkaStorageException(s"The current replica for 
$topicPartition is offline")
   if (destLog == null)
 throw new KafkaStorageException(s"The future replica for 
$topicPartition is offline")
 
-  destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
-  // the metrics tags still contain "future", so we have to remove it.
-  // we will add metrics back after sourceLog remove the metrics
-  destLog.removeLogMetrics()
-  destLog.updateHighWatermark(sourceLog.highWatermark)
+  replaceCurrentWithFutureLog(Option(sourceLog), destLog, 
updateHighWatermark = true)
+}
+  }
+
+  def replaceCurrentWithFutureLog(sourceLog: Option[UnifiedLog], destLog: 
UnifiedLog, updateHighWatermark: Boolean = false): Unit = {
+val topicPartition = destLog.topicPartition
+info(s"Attempting to replace current log $sourceLog with $destLog for 
$topicPartition")
 
-  // Now that future replica has been successfully renamed to be the 
current replica
-  // Update the cached map and log cleaner as appropriate.
-  futureLogs.remove(topicPartition)
-  currentLogs.put(topicPartition, destLog)
-  if (cleaner != null) {
-cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-resumeCleaning(topicPartition)
-  }
+destLog.renameDir(UnifiedLog.logDirName(topicPartition), 
shouldReinitialize = true)
+// the metrics tags still contain "future", so we have to remove it.
+// we will add metrics back after sourceLog remove the metrics
+destLog.removeLogMetrics()
+if (updateHighWatermark && sourceLog.isDefined) {
+  destLog.updateHighWatermark(sourceLog.get.highWatermark)
+}
 
-  try {
-sourceLog.renameDir(UnifiedLog.logDeleteDirName(topicPartition), 
shouldReinitialize = true)
+// Now that future replica has been successfully renamed to be the current 
replica
+// Update the cached map and log cleaner as appropriate.
+futureLogs.remove(topicPartition)
+currentLogs.put(topicPartition, destLog)
+if (cleaner != null) {
+  cleaner.alterCheckpointDir(topicPartition, 
sourceLog.map(_.parentDirFile), destLog.parentDirFile)

Review Comment:
   It seems `cleaner.alterCheckpointDir` will do nothing if `sourceLog` is 
empty. Maybe we can revert those changes and run `alterCheckpointDir` only if 
`sourceLog` is defined.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568882032


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   I've refactored LogManager to use `replaceCurrentWithFutureLog` in 
[b87a21f](https://github.com/apache/kafka/pull/15136/commits/b87a21f3bd0eea6e4083a2d14b41053361f7b40a).
 PTAL



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-17 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1568883111


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   Rebased to use the new method in the builder



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-16 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1566938172


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   I have created https://issues.apache.org/jira/browse/KAFKA-16559 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565638692


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   That's a great suggestion! I'm going to try refactoring this to use 
`replaceCurrentWithFutureLog`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565572433


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")

Review Comment:
   `${tp}` -> `$tp`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565571409


##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   Personally, I'd like to change setNumBrokerNodes(int) to setBrokerNodes(int, 
int). The second argument is used to define the number of data folders. After 
all, not all tests require 2+ folders. Maybe we can address that in another PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-15 Thread via GitHub


chia7712 commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1565569471


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   it seems to me that we need to call `closeHandlers` at least if we don't do 
refactor in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1562307197


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   I agree option (b) is better. But for this:
   > We can still run into trouble if the directory with the main replica is 
offline. At some point that will cause a crash if the directory ever comes back 
online. But there's nothing we can do about that here. Maybe future work could 
improve how the broker handles loading conflicting logs.
   
   I believe this PR: https://github.com/apache/kafka/pull/15335 should already 
fix this issue. @OmniaGM , could you help confirm it? If not, I think we should 
not promote the future log when main replica is offline to cause potential 
future issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-12 Thread via GitHub


showuon commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1562284319


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1176,6 +1176,42 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { case (futureLog, currentLog) =>
+  val tp = futureLog.topicPartition
+
+  futureLog.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  futureLog.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLog.foreach { log =>
+log.removeLogMetrics()
+log.renameDir(UnifiedLog.logDeleteDirName(tp), shouldReinitialize = 
false)
+addLogToBeDeleted(log)
+info(s"Old log for partition ${tp} is renamed to 
${log.dir.getAbsolutePath} and is scheduled for deletion")
+  }
+
+  currentLogs.put(tp, futureLog)
+  futureLog.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")

Review Comment:
   I can see we basically do the similar things as what 
`replaceCurrentWithFutureLog` did, why should we duplicate the codes here 
again? Could we not be able to call `replaceCurrentWithFutureLog` with 
additional parameters to indicate we want to skip some actions?
   
   Like what we did now, we didn't close the `currentLog`, which should have 
potential resource leak. Using the `replaceCurrentWithFutureLog` could avoid 
this issue happening. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-10 Thread via GitHub


showuon commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2047306952

   Will check it again within this week. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-10 Thread via GitHub


soarez commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-2046971782

   @showuon PTAL


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-09 Thread via GitHub


soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1557442275


##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1437,7 +1438,9 @@ class KRaftClusterTest {
   assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
 }
 
-// Modify foo-0 so that it refers to a future replica
+// Modify foo-0 so that it refers to a future replica.
+// This has the same effect as the main replica being in an offline
+// log dir and the broker crashing just at the time of promotion

Review Comment:
   I think I know what you mean, but I don't know if this is easy to understand 
in its current phrasing. Here's a suggestion:
   
   This is equivalent to a failure during the promotion of the future replica 
and a restart with directory for the main replica being offline. 



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1456,6 +1459,76 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
+setNumBrokerNodes(3).
+setNumControllerNodes(1).build()).
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val broker0 = cluster.brokers().get(0)
+val broker1 = cluster.brokers().get(1)
+val foo0 = new TopicPartition("foo", 0)
+
+admin.createTopics(Arrays.asList(
+  new NewTopic("foo", 3, 3.toShort))).all().get()
+
+// Wait until foo-0 is created on broker0.
+TestUtils.retry(6) {
+  assertTrue(broker0.logManager.getLog(foo0).isDefined)
+}
+
+// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
+broker0.shutdown()
+TestUtils.retry(6) {
+  val info = broker1.metadataCache.getPartitionInfo("foo", 0)
+  assertTrue(info.isDefined)
+  assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
+}
+
+val log = broker0.logManager.getLog(foo0).get
+
+// Copy foo-0 to another log dir
+val parentDir = log.parentDir
+var targetParentDir = parentDir.substring(0, parentDir.length - 1)
+if (parentDir.endsWith("0")) {
+  targetParentDir += "1"
+} else {
+  targetParentDir += "0"
+}

Review Comment:
   This assumes your change to `BrokerNode.build()` and its specific naming 
pattern. We'd get a more robust test if instead you access the directory list 
on the broker, filter out `parentDir`, and choose one of the remaining log dirs.



##
core/src/test/java/kafka/testkit/BrokerNode.java:
##
@@ -81,8 +82,7 @@ public BrokerNode build(
 logDataDirectories = Collections.
 singletonList(String.format("combined_%d", id));
 } else {
-logDataDirectories = Collections.
-singletonList(String.format("broker_%d_data0", id));
+logDataDirectories = 
Collections.unmodifiableList(Arrays.asList(String.format("broker_%d_data0", 
id), String.format("broker_%d_data1", id)));

Review Comment:
   I'm wondering if this is going to break test cases that don't support JBOD, 
if so there should be some new failing tests.



##
core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala:
##
@@ -1456,6 +1459,76 @@ class KRaftClusterTest {
   cluster.close()
 }
   }
+
+  @Test
+  def testAbandonedFutureReplicaRecovered_mainReplicaInOnlineLogDir(): Unit = {
+val cluster = new KafkaClusterTestKit.Builder(
+  new TestKitNodes.Builder().
+setBootstrapMetadataVersion(MetadataVersion.IBP_3_7_IV2).
+setNumBrokerNodes(3).
+setNumControllerNodes(1).build()).
+  build()
+try {
+  cluster.format()
+  cluster.startup()
+  val admin = Admin.create(cluster.clientProperties())
+  try {
+val broker0 = cluster.brokers().get(0)
+val broker1 = cluster.brokers().get(1)
+val foo0 = new TopicPartition("foo", 0)
+
+admin.createTopics(Arrays.asList(
+  new NewTopic("foo", 3, 3.toShort))).all().get()
+
+// Wait until foo-0 is created on broker0.
+TestUtils.retry(6) {
+  assertTrue(broker0.logManager.getLog(foo0).isDefined)
+}
+
+// Shut down broker0 and wait until the ISR of foo-0 is set to [1, 2]
+broker0.shutdown()
+TestUtils.retry(6) {
+  val info = broker1.metadataCache.getPartitionInfo("foo", 0)
+  assertTrue(info.isDefined)
+  assertEquals(Set(1, 2), info.get.isr().asScala.toSet)
+}
+
+

Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-04 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1551349514


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   Addressed in 430546fa5a78118497ada6373607f6a25c78a8e8. Please take a look.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-03 Thread via GitHub


soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1549709937


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   If we didn't know if the future log was caught up or not, then I'd prefer 
(a), but at this point I can't conceive of a different scenario – other than a 
failure during replica promotion – that would cause the future log to be in the 
directory assigned in the metadata. So I agree that the two logs likely will be 
either caught up or very close. So I agree it makes more sense to do (b) - 
promote the future log and delete the main one.
   
   We can still run into trouble if the directory with the main replica is 
offline. At some point that will cause a crash if the directory ever comes back 
online. But there's nothing we can do about that here. Maybe future work could 
improve how the broker handles loading conflicting logs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-04-03 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1549695928


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   Thanks for the feedback.
   
   For (2), we've couple of options. We can either:
   
   (a) ignore the future replica (say in dir2) if the main replica exists in an 
online log dir (say dir1) or,
   (b) promote the future replica (in dir2)  and remove the main replica (in 
dir1).
   
   (a) would result in ReplicaManager spawning a replicaAlterLogDir thread for 
the future replica and correcting the assignment to dir1, only for it to be 
changed back again to dir2 when the replicaAlterLogDir thread finishes its job. 
Refer 
https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2734
 and 
https://github.com/apache/kafka/blob/acecd370cc3b25f12926e7a4664a2648f08c6c9a/core/src/main/scala/kafka/server/ReplicaManager.scala#L2745
   
   Since in these scenarios, the future replica is almost caught up with the 
main replica, I'm leaning towards option (b) to avoid more reassignments. 
Please let me know if you feel otherwise.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-03-29 Thread via GitHub


showuon commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1544295936


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   For (1), I think we'll catch up until the high watermark before publish the 
image. So it should be safe.
   For (2), good suggestion.



##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2745,10 +2745,10 @@ class ReplicaManager(val config: KafkaConfig,
   "local leaders.")
 replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
 localLeaders.forKeyValue { (tp, info) =>
+  val partitionAssignedDirectoryId = 
directoryIds.find(_._1.topicPartition() == tp).map(_._2)
   getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
 try {
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)

Review Comment:
   Same here, unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-03-04 Thread via GitHub


soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1511125756


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2790,7 +2791,6 @@ class ReplicaManager(val config: KafkaConfig,
   //   is unavailable. This is required to ensure that we include the 
partition's
   //   high watermark in the checkpoint file (see KAFKA-1647).
   val state = info.partition.toLeaderAndIsrPartitionState(tp, isNew)
-  val partitionAssignedDirectoryId = 
directoryIds.find(_._1.topicPartition() == tp).map(_._2)

Review Comment:
   It seems the changes in this file are no longer necessary?



##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1173,6 +1173,35 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def recoverAbandonedFutureLogs(brokerId: Int, newTopicsImage: TopicsImage): 
Unit = {
+val abandonedFutureLogs = findAbandonedFutureLogs(brokerId, newTopicsImage)
+abandonedFutureLogs.foreach { log =>
+  val tp = log.topicPartition
+
+  log.renameDir(UnifiedLog.logDirName(tp), shouldReinitialize = true)
+  log.removeLogMetrics()
+  futureLogs.remove(tp)
+
+  currentLogs.put(tp, log)
+  log.newMetrics()
+
+  info(s"Successfully renamed abandoned future log for $tp")
+}
+  }
+
+  private def findAbandonedFutureLogs(brokerId: Int, newTopicsImage: 
TopicsImage): Iterable[UnifiedLog] = {
+futureLogs.values.flatMap { log =>
+  val topicId = log.topicId.getOrElse {
+throw new RuntimeException(s"The log dir $log does not have a topic 
ID, " +
+  "which is not allowed when running in KRaft mode.")
+  }
+  val partitionId = log.topicPartition.partition()
+  Option(newTopicsImage.getPartition(topicId, partitionId))
+.filter(pr => 
directoryId(log.parentDir).contains(pr.directory(brokerId)))
+.map(_ => log)

Review Comment:
   1. Do we have the guarantee that the topic will be in the new topics image? 
Can't the topic be in later metadata delta if a compaction hasn't yet occurred?
   2. Shouldn't we check if the main replica is on one of the other 
directories? If the main replica is on an offline dir (as in the scenario 
described in the issue) the broker will refuse to start once it is restarted 
with that directory online, as it will see the two main replicas. If the log 
directory for the main replica is online, we should be able to detect that here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-03-01 Thread via GitHub


soarez commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r1509052378


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2859,10 +2867,10 @@ class ReplicaManager(val config: KafkaConfig,
   "local leaders.")
 replicaFetcherManager.removeFetcherForPartitions(localLeaders.keySet)
 localLeaders.forKeyValue { (tp, info) =>
-  getOrCreatePartition(tp, delta, info.topicId).foreach { case (partition, 
isNew) =>
+  val partitionAssignedDirectoryId = 
directoryIds.find(_._1.topicPartition() == tp).map(_._2)
+  getOrCreatePartition(tp, delta, info.topicId, isLocalFollower = false, 
partitionAssignedDirectoryId).foreach { case (partition, isNew) =>

Review Comment:
   > `isLocalFollower = false`
   
   I don't think this is ok. Setting this to false disables the fix when the 
broker is the leader. Which can happen in RF=1 or with RF=N when the broker 
comes back in the ISR (e.g. because there were no new messages, and the other 
replicas are all shutdown).
   
   I had a conversation with @gaurav-narula about this, and the plan is to 
apply this fix earlier when the logs are being loaded.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-29 Thread via GitHub


OmniaGM commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-1914798524

   Hi, @rondagostino, @showuon and @cmccabe can one of you have a look into 
this pr please and merge it if you are happy with the changes? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-08 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r169837


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2773,6 +2776,12 @@ class ReplicaManager(val config: KafkaConfig,
 Some(partition, false)
 
   case HostedPartition.None =>
+var isNew = true

Review Comment:
   I don't think that would work because `isNew` is mutated inside the callback 
which may be executed conditionally.
   
   Edit: Perhaps I can have `maybeRecoverAbandonedFutureLog` return a boolean 
instead



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-08 Thread via GitHub


gaurav-narula commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r169837


##
core/src/main/scala/kafka/server/ReplicaManager.scala:
##
@@ -2773,6 +2776,12 @@ class ReplicaManager(val config: KafkaConfig,
 Some(partition, false)
 
   case HostedPartition.None =>
+var isNew = true

Review Comment:
   I don't think that would work because `isNew` is mutated inside the callback 
which may be executed conditionally.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-08 Thread via GitHub


OmniaGM commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r157092


##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5592,6 +5592,63 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: 
Boolean): Unit = {
+// Given
+val localId = 1
+val topicPartition = new TopicPartition("foo", 0)
+
+val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+val mockReplicaAlterLogDirsManager = 
mock(classOf[ReplicaAlterLogDirsManager])
+val replicaManager = setupReplicaManagerWithMockedPurgatories(
+  timer = new MockTimer(time),
+  brokerId = localId,
+  mockReplicaFetcherManager = Some(mockReplicaFetcherManager),
+  mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager),
+  enableRemoteStorage = enableRemoteStorage,
+  shouldMockLog = true
+)
+
+val directoryId1 = Uuid.randomUuid()
+val directoryId2 = Uuid.randomUuid()
+val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true)
+
+val mockLogMgr = replicaManager.logManager
+when(
+  mockLogMgr.getLog(topicPartition, isFuture = true)
+).thenReturn {
+  Some(mockFutureLog)
+}
+when(mockLogMgr.getLog(topicPartition)).thenReturn {
+  None
+}
+
+when(
+  mockLogMgr.directoryId(mockFutureLog.parentDir)
+).thenReturn {
+  Some(directoryId1)

Review Comment:
   same as above



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -4831,15 +4831,15 @@ class ReplicaManagerTest {
 try {
   val foo0 = new TopicPartition("foo", 0)
   val emptyDelta = new TopicsDelta(TopicsImage.EMPTY)
-  val (fooPart, fooNew) = replicaManager.getOrCreatePartition(foo0, 
emptyDelta, FOO_UUID).get
+  val (fooPart, fooNew) = replicaManager.getOrCreatePartition(foo0, 
emptyDelta, FOO_UUID, false, None).get

Review Comment:
   Any reason why we are not setting `partitionAssignedDirectoryId` to None as 
a default in `replicaManager.getOrCreatePartition` signature? 



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5592,6 +5592,63 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: 
Boolean): Unit = {
+// Given
+val localId = 1
+val topicPartition = new TopicPartition("foo", 0)
+
+val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+val mockReplicaAlterLogDirsManager = 
mock(classOf[ReplicaAlterLogDirsManager])
+val replicaManager = setupReplicaManagerWithMockedPurgatories(
+  timer = new MockTimer(time),
+  brokerId = localId,
+  mockReplicaFetcherManager = Some(mockReplicaFetcherManager),
+  mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager),
+  enableRemoteStorage = enableRemoteStorage,
+  shouldMockLog = true
+)
+
+val directoryId1 = Uuid.randomUuid()
+val directoryId2 = Uuid.randomUuid()
+val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true)
+
+val mockLogMgr = replicaManager.logManager
+when(
+  mockLogMgr.getLog(topicPartition, isFuture = true)
+).thenReturn {

Review Comment:
   `{` is used to hold a block but in this case it is a simple value. So maybe 
replace all `.thenReturn {Some(mockFutureLog)}` to 
`.thenReturn(Some(mockFutureLog))`



##
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala:
##
@@ -5592,6 +5592,63 @@ class ReplicaManagerTest {
 }
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testDeltaFollowerRecoverAbandonedFutureReplica(enableRemoteStorage: 
Boolean): Unit = {
+// Given
+val localId = 1
+val topicPartition = new TopicPartition("foo", 0)
+
+val mockReplicaFetcherManager = mock(classOf[ReplicaFetcherManager])
+val mockReplicaAlterLogDirsManager = 
mock(classOf[ReplicaAlterLogDirsManager])
+val replicaManager = setupReplicaManagerWithMockedPurgatories(
+  timer = new MockTimer(time),
+  brokerId = localId,
+  mockReplicaFetcherManager = Some(mockReplicaFetcherManager),
+  mockReplicaAlterLogDirsManager = Some(mockReplicaAlterLogDirsManager),
+  enableRemoteStorage = enableRemoteStorage,
+  shouldMockLog = true
+)
+
+val directoryId1 = Uuid.randomUuid()
+val directoryId2 = Uuid.randomUuid()
+val mockFutureLog = setupMockLog("/KAFKA-16082-test", isFuture = true)
+
+val mockLogMgr = replicaManager.logManager
+when(
+  mockLogMgr.getLog(topicPartition, isFuture = true)
+).thenReturn {
+  Some(mockFutureLog)
+}
+when(mockLogMgr.getLog(topicPartition)).thenReturn {
+  None

Review Comment:

Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-08 Thread via GitHub


OmniaGM commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r142323


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1156,6 +1156,32 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def maybeRecoverAbandonedFutureLog(topicPartition: TopicPartition, 
partitionAssignedDirectoryId: Option[Uuid], callback: () => Unit): Unit = {
+logCreationOrDeletionLock synchronized {
+  for {
+futureLog <- getLog(topicPartition, isFuture = true)
+futureDirId <- directoryId(futureLog.parentDir)
+assignedDirId <- partitionAssignedDirectoryId
+if futureDirId == assignedDirId
+  } {
+val sourceLog = futureLog

Review Comment:
   `sourceLog` can be moved into the `for` block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-08 Thread via GitHub


OmniaGM commented on code in PR #15136:
URL: https://github.com/apache/kafka/pull/15136#discussion_r142323


##
core/src/main/scala/kafka/log/LogManager.scala:
##
@@ -1156,6 +1156,32 @@ class LogManager(logDirs: Seq[File],
 }
   }
 
+  def maybeRecoverAbandonedFutureLog(topicPartition: TopicPartition, 
partitionAssignedDirectoryId: Option[Uuid], callback: () => Unit): Unit = {
+logCreationOrDeletionLock synchronized {
+  for {
+futureLog <- getLog(topicPartition, isFuture = true)
+futureDirId <- directoryId(futureLog.parentDir)
+assignedDirId <- partitionAssignedDirectoryId
+if futureDirId == assignedDirId
+  } {
+val sourceLog = futureLog

Review Comment:
   `sourceLog` can move into the `for` block. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16082: Avoid resuming future replica if current replica is in the same directory [kafka]

2024-01-05 Thread via GitHub


gaurav-narula commented on PR #15136:
URL: https://github.com/apache/kafka/pull/15136#issuecomment-1878536740

   CC: @OmniaGM @pprovenzano 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org