dajac commented on a change in pull request #8672: URL: https://github.com/apache/kafka/pull/8672#discussion_r436704800
########## File path: core/src/main/scala/kafka/log/LogCleanerManager.scala ########## @@ -273,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File], * 6. If the partition is already paused, a new call to this function * will increase the paused count by one. */ - def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = { + def abortAndPauseCleaning(topicPartition: TopicPartition, partitionDeleted: Boolean = false): Unit = { Review comment: Actually, we already have `abortCleaning` for this purpose and `abortCleaning` calls `abortAndPauseCleaning`. I was trying to avoid logging a message when the partition is deleted because it does not bring much and literally flood the log when many partitions are deleted. While re-looking at this, I have done it differently now. I have found that logs were spread between the LogCleanerManager and the LogManager and that we were logging when resuming cleaning but not all the time. I have consolidated all the cases where we want to log explicitly in helper methods. It also helps with not always having to check if `cleaner != null`. Let me know what you think. ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File], if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { - cleaner.abortCleaning(topicPartition) - cleaner.updateCheckpoints(removedLog.parentDirFile) + cleaner.abortCleaning(topicPartition, partitionDeleted = true) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty) Review comment: I totally agree with you. I have tried different ways trying to keep a better encapsulation but I have found a really satisfying way to get there. As you pointed out, the involvement of `Partition` makes this complex. Here is my best idea so far: * We remove the calls to `asyncDelete` in the `Partition.delete` method. It seems safe to "delete the partition" while keeping the files on disk while holding the `replicaStateChangeLock` in the `ReplicaManager`. * We use a `asyncDelete` that takes a batch of logs, deletes them and checkpoint. * We rename `Partition.delete` to something like `Partition.close` as the log is not really deleted any more. What do you think? ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File], if (removedLog != null) { //We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it. if (cleaner != null && !isFuture) { - cleaner.abortCleaning(topicPartition) - cleaner.updateCheckpoints(removedLog.parentDirFile) + cleaner.abortCleaning(topicPartition, partitionDeleted = true) } removedLog.renameDir(Log.logDeleteDirName(topicPartition)) - checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, ArrayBuffer.empty) Review comment: I have implemented it to see. You can check out the last commit. ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File], // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() - checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) - checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) + checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile) Review comment: Indeed, they are coupled from that perspective. We can keep them together in one method though. ########## File path: core/src/main/scala/kafka/log/LogManager.scala ########## @@ -869,17 +903,18 @@ class LogManager(logDirs: Seq[File], currentLogs.put(topicPartition, destLog) if (cleaner != null) { cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, destLog.parentDirFile) - cleaner.resumeCleaning(Seq(topicPartition)) - info(s"Compaction for partition $topicPartition is resumed") + resumeCleaning(topicPartition) } try { sourceLog.renameDir(Log.logDeleteDirName(topicPartition)) // Now that replica in source log directory has been successfully renamed for deletion. // Close the log, update checkpoint files, and enqueue this log to be deleted. sourceLog.close() - checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, ArrayBuffer.empty) - checkpointLogStartOffsetsInDir(sourceLog.parentDirFile) + val logDir = sourceLog.parentDirFile + val logsToCheckpoint = logsByDir(logDir) + checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) + checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) Review comment: FYI: This block of code appears twice so it would merit its own method. I haven't done it because that I felt that it would mix too many things in one method and the method name would be too long. Therefore, I left the optimisation of computing and reusing the `logsByDir` on the caller side. I don't feel too strongly about this though. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org