chia7712 opened a new pull request, #15637:
URL: https://github.com/apache/kafka/pull/15637

   It seems to me the root cause is 
`LogDirFailureHandler#testIOExceptionDuringLogRoll` does not clean 
`directoryIds` in holding `replicaStateChangeLock`, and hence metadata event 
thread can see the intermediate state of failure handle and then assume the 
deleted folder is online 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/cluster/Partition.scala#L877)
   
   ```scala
     private def createLogInAssignedDirectoryId(partitionState: 
LeaderAndIsrPartitionState, highWatermarkCheckpoints: OffsetCheckpoints, 
topicId: Option[Uuid], targetLogDirectoryId: Option[Uuid]): Unit = {
       targetLogDirectoryId match {
         case Some(directoryId) =>
           // [CHIA] logManager.onlineLogDirId(directoryId) return true
           // there are two results:
           // 1) KafkaStorageException is thrown by `LogManager.getOrCreateLog`
           // 2) log is hosted by another directory (id) rather than 
targetLogDirectoryId
             if (logManager.onlineLogDirId(directoryId) || 
!logManager.hasOfflineLogDirs() || directoryId == DirectoryId.UNASSIGNED) {
             createLogIfNotExists(partitionState.isNew, isFutureReplica = 
false, highWatermarkCheckpoints, topicId, targetLogDirectoryId)
           } else {
             warn(s"Skipping creation of log because there are potentially 
offline log " +
               s"directories and log may already exist there. 
directoryId=$directoryId, " +
               s"topicId=$topicId, targetLogDirectoryId=$targetLogDirectoryId")
           }
   
         case None =>
           createLogIfNotExists(partitionState.isNew, isFutureReplica = false, 
highWatermarkCheckpoints, topicId)
       }
     }
   ```
   
   Hence, there are two options to stabilize the `testIOExceptionDuringLogRoll`
   
   1. call `logManager.handleLogDirFailure(dir)` in holding 
`replicaStateChangeLock` 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala#L2473)
 to avoid race condition.
   
   2. change the assert to allow both empty folder and the folder which having 
different directory (id). 
https://github.com/apache/kafka/blob/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala#L1715
   
   BTW, the option 2 means we allow to using other directory to replace 
`targetLogDirectoryId` when creating log. That violates the comment: "@param 
targetLogDirectoryId The directory Id that should host the the partition's 
topic." 
(https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/LogManager.scala#L1015)
   
   In short, this PR adopt the option 1.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to