dajac commented on a change in pull request #8672:
URL: https://github.com/apache/kafka/pull/8672#discussion_r436704800



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -273,7 +272,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
    *  6. If the partition is already paused, a new call to this function
    *     will increase the paused count by one.
    */
-  def abortAndPauseCleaning(topicPartition: TopicPartition): Unit = {
+  def abortAndPauseCleaning(topicPartition: TopicPartition, partitionDeleted: 
Boolean = false): Unit = {

Review comment:
       Actually, we already have `abortCleaning` for this purpose and 
`abortCleaning` calls `abortAndPauseCleaning`. I was trying to avoid logging a 
message when the partition is deleted because it does not bring much and 
literally flood the log when many partitions are deleted.
   
   While re-looking at this, I have done it differently now. I have found that 
logs were spread between the LogCleanerManager and the LogManager and that we 
were logging when resuming cleaning but not all the time. I have consolidated 
all the cases where we want to log explicitly in helper methods. It also helps 
with not always having to check if `cleaner != null`. Let me know what you 
think. 

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File],
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be 
deleted before actually deleting it.
       if (cleaner != null && !isFuture) {
-        cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.parentDirFile)
+        cleaner.abortCleaning(topicPartition, partitionDeleted = true)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, 
ArrayBuffer.empty)

Review comment:
       I totally agree with you. I have tried different ways trying to keep a 
better encapsulation but I have found a really satisfying way to get there. As 
you pointed out, the involvement of `Partition` makes this complex.
   
   Here is my best idea so far:
   * We remove the calls to `asyncDelete` in the `Partition.delete` method. It 
seems safe to "delete the partition" while keeping the files on disk while 
holding the `replicaStateChangeLock` in the `ReplicaManager`.
   * We use a `asyncDelete` that takes a batch of logs, deletes them and 
checkpoint.
   * We rename `Partition.delete` to something like `Partition.close` as the 
log is not really deleted any more.
   
   What do you think?

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -912,12 +969,9 @@ class LogManager(logDirs: Seq[File],
     if (removedLog != null) {
       //We need to wait until there is no more cleaning task on the log to be 
deleted before actually deleting it.
       if (cleaner != null && !isFuture) {
-        cleaner.abortCleaning(topicPartition)
-        cleaner.updateCheckpoints(removedLog.parentDirFile)
+        cleaner.abortCleaning(topicPartition, partitionDeleted = true)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.parentDirFile, 
ArrayBuffer.empty)

Review comment:
       I have implemented it to see. You can check out the last commit.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -878,8 +936,7 @@ class LogManager(logDirs: Seq[File],
         // Now that replica in source log directory has been successfully 
renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
         sourceLog.close()
-        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+        checkpointRecoveryAndLogStartOffsetsInDir(sourceLog.parentDirFile)

Review comment:
       Indeed, they are coupled from that perspective. We can keep them 
together in one method though.

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -869,17 +903,18 @@ class LogManager(logDirs: Seq[File],
       currentLogs.put(topicPartition, destLog)
       if (cleaner != null) {
         cleaner.alterCheckpointDir(topicPartition, sourceLog.parentDirFile, 
destLog.parentDirFile)
-        cleaner.resumeCleaning(Seq(topicPartition))
-        info(s"Compaction for partition $topicPartition is resumed")
+        resumeCleaning(topicPartition)
       }
 
       try {
         sourceLog.renameDir(Log.logDeleteDirName(topicPartition))
         // Now that replica in source log directory has been successfully 
renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be 
deleted.
         sourceLog.close()
-        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.parentDirFile, 
ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(sourceLog.parentDirFile)
+        val logDir = sourceLog.parentDirFile
+        val logsToCheckpoint = logsByDir(logDir)
+        checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, 
logsToCheckpoint, ArrayBuffer.empty)
+        checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)

Review comment:
       FYI: This block of code appears twice so it would merit its own method. 
I haven't done it because that I felt that it would mix too many things in one 
method and the method name would be too long. Therefore, I left the 
optimisation of computing and reusing the `logsByDir` on the caller side. I 
don't feel too strongly about this though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to