junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r638367249



##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    deleteOldSegments(shouldDelete, RetentionSizeBreach)
+    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
       nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
     }
 
-    deleteOldSegments(shouldDelete, StartOffsetBreach)
+    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
    * The size of the log in bytes
    */
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
    */
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
    */
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(

Review comment:
       I feel RollAction actually makes the code harder to understand than 
before. So, it would be useful to think through if we could avoid it. In 
particular, it seems that anything in postRollAction could just be done in the 
caller if we return enough context. We are taking a producer snapshot in 
preRollAction. However, since we are not adding new data here. It seems that we 
could take producer snapshot in Log.roll() after calling localLog.roll() while 
holding the Log.lock.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
       }
     }
 
-    deleteOldSegments(shouldDelete, RetentionSizeBreach)
+    deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
     def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
       nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
     }
 
-    deleteOldSegments(shouldDelete, StartOffsetBreach)
+    deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
    * The size of the log in bytes
    */
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
    */
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
    */
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+    preRollAction = (newSegment: LogSegment) => {
+      // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
+      // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
+      // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
+      // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
+      // we manually override the state offset here prior to taking the 
snapshot.
+      producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+      producerStateManager.takeSnapshot()
+    },
+    postRollAction = (newSegment: LogSegment, deletedSegment: 
Option[LogSegment]) => {
+      deletedSegment.foreach(segment => 
deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
       This seems to have exposed an existing bug. During roll, deletedSegment 
will be non-empty if there is an existing segment of 0 size with the 
newOffsetToRoll. However, since we take a producer snapshot on newOffsetToRoll 
before calling postRollAction, we will be deleting the same snapshot we just 
created.
   
   In this case, I think we don't need to delete producerSnapshot for 
deletedSegment.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1572,144 +1414,69 @@ class Log(@volatile private var _dir: File,
         .map(_.messageOffset)
         .getOrElse(maxOffsetInMessages - Integer.MAX_VALUE)
 
-      roll(Some(rollOffset))
+      localLog.roll(Some(rollOffset), Some(rollAction))
     } else {
       segment
     }
   }
 
   /**
-   * Roll the log over to a new active segment starting with the current 
logEndOffset.
+   * Roll the local log over to a new active segment starting with the current 
logEndOffset.
    * This will trim the index to the exact size of the number of entries it 
currently contains.
    *
    * @return The newly rolled segment
    */
   def roll(expectedNextOffset: Option[Long] = None): LogSegment = {
-    maybeHandleIOException(s"Error while rolling log segment for 
$topicPartition in dir ${dir.getParent}") {
-      val start = time.hiResClockMs()
-      lock synchronized {
-        checkIfMemoryMappedBufferClosed()
-        val newOffset = math.max(expectedNextOffset.getOrElse(0L), 
logEndOffset)
-        val logFile = Log.logFile(dir, newOffset)
-
-        if (segments.contains(newOffset)) {
-          // segment with the same base offset already exists and loaded
-          if (activeSegment.baseOffset == newOffset && activeSegment.size == 
0) {
-            // We have seen this happen (see KAFKA-6388) after shouldRoll() 
returns true for an
-            // active segment of size zero because of one of the indexes is 
"full" (due to _maxEntries == 0).
-            warn(s"Trying to roll a new log segment with start offset 
$newOffset " +
-                 s"=max(provided offset = $expectedNextOffset, LEO = 
$logEndOffset) while it already " +
-                 s"exists and is active with size 0. Size of time index: 
${activeSegment.timeIndex.entries}," +
-                 s" size of offset index: 
${activeSegment.offsetIndex.entries}.")
-            removeAndDeleteSegments(Seq(activeSegment), asyncDelete = true, 
LogRoll)
-          } else {
-            throw new KafkaException(s"Trying to roll a new log segment for 
topic partition $topicPartition with start offset $newOffset" +
-                                     s" =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) while it already exists. Existing " +
-                                     s"segment is ${segments.get(newOffset)}.")
-          }
-        } else if (!segments.isEmpty && newOffset < activeSegment.baseOffset) {
-          throw new KafkaException(
-            s"Trying to roll a new log segment for topic partition 
$topicPartition with " +
-            s"start offset $newOffset =max(provided offset = 
$expectedNextOffset, LEO = $logEndOffset) lower than start offset of the active 
segment $activeSegment")
-        } else {
-          val offsetIdxFile = offsetIndexFile(dir, newOffset)
-          val timeIdxFile = timeIndexFile(dir, newOffset)
-          val txnIdxFile = transactionIndexFile(dir, newOffset)
-
-          for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) 
if file.exists) {
-            warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
-            Files.delete(file.toPath)
-          }
-
-          segments.lastSegment.foreach(_.onBecomeInactiveSegment())
-        }
-
-        // take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
-        // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
-        // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
-        // may actually be ahead of the current producer state end offset 
(which corresponds to the log end offset),
-        // we manually override the state offset here prior to taking the 
snapshot.
-        producerStateManager.updateMapEndOffset(newOffset)
-        producerStateManager.takeSnapshot()
-
-        val segment = LogSegment.open(dir,
-          baseOffset = newOffset,
-          config,
-          time = time,
-          initFileSize = config.initFileSize,
-          preallocate = config.preallocate)
-        addSegment(segment)
-
-        // We need to update the segment base offset and append position data 
of the metadata when log rolls.
-        // The next offset should not change.
-        updateLogEndOffset(nextOffsetMetadata.messageOffset)
-
-        // schedule an asynchronous flush of the old segment
-        scheduler.schedule("flush-log", () => flush(newOffset), delay = 0L)
-
-        info(s"Rolled new log segment at offset $newOffset in 
${time.hiResClockMs() - start} ms.")
-
-        segment
-      }
+    lock synchronized {
+      localLog.roll(expectedNextOffset, Some(rollAction))
     }
   }
 
   /**
    * The number of messages appended to the log since the last flush
    */
-  private def unflushedMessages: Long = this.logEndOffset - this.recoveryPoint
+  private def unflushedMessages: Long = logEndOffset - localLog.recoveryPoint
 
   /**
-   * Flush all log segments
+   * Flush all local log segments
    */
   def flush(): Unit = flush(this.logEndOffset)
 
   /**
-   * Flush log segments for all offsets up to offset-1
+   * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
   def flush(offset: Long): Unit = {
     maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > this.recoveryPoint) {
+      if (offset > localLog.recoveryPoint) {
         debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
           s"unflushed: $unflushedMessages")
-        val segments = logSegments(this.recoveryPoint, offset)
-        segments.foreach(_.flush())
-        // if there are any new segments, we need to flush the parent 
directory for crash consistency
-        segments.lastOption.filter(_.baseOffset >= 
this.recoveryPoint).foreach(_ => Utils.flushDir(dir.toPath))
-
+        localLog.flush(offset)
         lock synchronized {
-          checkIfMemoryMappedBufferClosed()
-          if (offset > this.recoveryPoint) {
-            this.recoveryPoint = offset
-            lastFlushedTime.set(time.milliseconds)
-          }
+          localLog.markFlushed(offset)
         }
       }
     }
   }
 
   /**
-   * Completely delete this log directory and all contents from the file 
system with no delay
+   * Completely delete the local log directory and all contents from the file 
system with no delay
    */
   private[log] def delete(): Unit = {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in 
dir ${dir.getParent}") {
       lock synchronized {
-        checkIfMemoryMappedBufferClosed()
         producerExpireCheck.cancel(true)
-        removeAndDeleteSegments(logSegments, asyncDelete = false, LogDeletion)
         leaderEpochCache.foreach(_.clear())
-        Utils.delete(dir)
-        // File handlers will be closed if this log is deleted
-        isMemoryMappedBufferClosed = true
+        val deletedSegments = localLog.delete()
+        deleteProducerSnapshotAsync(deletedSegments)

Review comment:
       Hmm, the ordering is a bit weird. We delete the directory in 
localLog.delete(). However, the producer snapshot is in the directory and is 
deleted later.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {
+      log.close()
+    }
+    Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+    def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+      new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+    }
+  }
+
+  object KeyValue {
+    def fromRecord(record: Record): KeyValue = {
+      val key =
+        if (record.hasKey)
+          StandardCharsets.UTF_8.decode(record.key()).toString
+        else
+          ""
+      val value =
+        if (record.hasValue)
+          StandardCharsets.UTF_8.decode(record.value()).toString
+        else
+          ""
+      KeyValue(key, value)
+    }
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): 
Iterable[SimpleRecord] = {
+    keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+    records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+                            log: LocalLog = log,
+                            initialOffset: Long = 0L): Unit = {
+    log.append(lastOffset = initialOffset + records.size - 1,
+      largestTimestamp = records.head.timestamp,
+      shallowOffsetOfMaxTimestamp = initialOffset,
+      records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+                          startOffset: Long = 0L,
+                          maxLength: => Int = log.segments.activeSegment.size,
+                          minOneMessage: Boolean = false,
+                          maxOffsetMetadata: => LogOffsetMetadata = 
log.logEndOffsetMetadata,
+                          includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
+    log.read(startOffset,
+             maxLength,
+             minOneMessage = minOneMessage,
+             maxOffsetMetadata,
+             includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    assertFalse(logDir.listFiles.isEmpty)
+    val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+    val deletedSegments = log.delete()
+    assertTrue(log.segments.isEmpty)
+    assertEquals(segmentsBeforeDelete, deletedSegments)
+    assertThrows(classOf[KafkaStorageException], () => 
log.checkIfMemoryMappedBufferClosed())
+    assertFalse(logDir.exists)
+  }
+
+  @Test
+  def testLogDeleteFailureAfterCloseHandlers(): Unit = {
+    log.closeHandlers()
+    assertEquals(1, log.segments.numberOfSegments)
+    val segmentsBeforeDelete = log.segments.values
+    assertThrows(classOf[KafkaStorageException], () => log.delete())
+    assertEquals(1, log.segments.numberOfSegments)
+    assertEquals(segmentsBeforeDelete, log.segments.values)
+    assertTrue(logDir.exists)
+  }
+
+  @Test
+  def testUpdateConfig(): Unit = {
+    val oldConfig = log.config
+    assertEquals(oldConfig, log.config)
+
+    val newConfig = createLogConfig()

Review comment:
       It seems newConfig is always the same as oldConfig?

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {
+      log.close()
+    }
+    Utils.delete(tmpDir)
+  }
+
+  case class KeyValue(key: String, value: String) {
+    def toRecord(timestamp: => Long = mockTime.milliseconds): SimpleRecord = {
+      new SimpleRecord(timestamp, key.getBytes, value.getBytes)
+    }
+  }
+
+  object KeyValue {
+    def fromRecord(record: Record): KeyValue = {
+      val key =
+        if (record.hasKey)
+          StandardCharsets.UTF_8.decode(record.key()).toString
+        else
+          ""
+      val value =
+        if (record.hasValue)
+          StandardCharsets.UTF_8.decode(record.value()).toString
+        else
+          ""
+      KeyValue(key, value)
+    }
+  }
+
+  private def kvsToRecords(keyValues: Iterable[KeyValue]): 
Iterable[SimpleRecord] = {
+    keyValues.map(kv => kv.toRecord())
+  }
+
+  private def recordsToKvs(records: Iterable[Record]): Iterable[KeyValue] = {
+    records.map(r => KeyValue.fromRecord(r))
+  }
+
+  private def appendRecords(records: Iterable[SimpleRecord],
+                            log: LocalLog = log,
+                            initialOffset: Long = 0L): Unit = {
+    log.append(lastOffset = initialOffset + records.size - 1,
+      largestTimestamp = records.head.timestamp,
+      shallowOffsetOfMaxTimestamp = initialOffset,
+      records = MemoryRecords.withRecords(initialOffset, CompressionType.NONE, 
0, records.toList : _*))
+  }
+
+  private def readRecords(log: LocalLog = log,
+                          startOffset: Long = 0L,
+                          maxLength: => Int = log.segments.activeSegment.size,
+                          minOneMessage: Boolean = false,
+                          maxOffsetMetadata: => LogOffsetMetadata = 
log.logEndOffsetMetadata,
+                          includeAbortedTxns: Boolean = false): FetchDataInfo 
= {
+    log.read(startOffset,
+             maxLength,
+             minOneMessage = minOneMessage,
+             maxOffsetMetadata,
+             includeAbortedTxns = includeAbortedTxns)
+  }
+
+  @Test
+  def testLogDeleteSuccess(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    assertFalse(logDir.listFiles.isEmpty)
+    val segmentsBeforeDelete = List[LogSegment]() ++ log.segments.values
+    val deletedSegments = log.delete()
+    assertTrue(log.segments.isEmpty)
+    assertEquals(segmentsBeforeDelete, deletedSegments)
+    assertThrows(classOf[KafkaStorageException], () => 
log.checkIfMemoryMappedBufferClosed())
+    assertFalse(logDir.exists)
+  }
+
+  @Test
+  def testLogDeleteFailureAfterCloseHandlers(): Unit = {
+    log.closeHandlers()
+    assertEquals(1, log.segments.numberOfSegments)
+    val segmentsBeforeDelete = log.segments.values
+    assertThrows(classOf[KafkaStorageException], () => log.delete())
+    assertEquals(1, log.segments.numberOfSegments)
+    assertEquals(segmentsBeforeDelete, log.segments.values)
+    assertTrue(logDir.exists)
+  }
+
+  @Test
+  def testUpdateConfig(): Unit = {
+    val oldConfig = log.config
+    assertEquals(oldConfig, log.config)
+
+    val newConfig = createLogConfig()
+    log.updateConfig(newConfig)
+    assertEquals(newConfig, log.config)
+  }
+
+  @Test
+  def testLogDirRenameToNewDir(): Unit = {
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    log.roll()
+    assertEquals(2, log.segments.numberOfSegments)
+    val newLogDir = TestUtils.randomPartitionLogDir(tmpDir)
+    assertTrue(log.renameDir(newLogDir.getName))
+    assertFalse(logDir.exists())
+    assertTrue(newLogDir.exists())
+    assertEquals(newLogDir, log.dir)
+    assertEquals(newLogDir.getParent, log.parentDir)
+    assertEquals(newLogDir.getParent, log.dir.getParent)
+    log.segments.values.foreach(segment => assertEquals(newLogDir.getPath, 
segment.log.file().getParentFile.getPath))
+    assertEquals(2, log.segments.numberOfSegments)
+  }
+
+  @Test
+  def testLogDirRenameToExistingDir(): Unit = {
+    assertFalse(log.renameDir(log.dir.getName))
+  }
+
+  @Test
+  def testLogFlush(): Unit = {
+    assertEquals(0L, log.recoveryPoint)
+    assertEquals(mockTime.milliseconds, log.lastFlushTime)
+
+    val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+    appendRecords(List(record))
+    mockTime.sleep(1)
+    val newSegment = log.roll()
+    log.flush(newSegment.baseOffset)
+    log.markFlushed(newSegment.baseOffset)
+    assertEquals(1L, log.recoveryPoint)
+    assertEquals(mockTime.milliseconds, log.lastFlushTime)
+  }
+
+  @Test
+  def testLogAppend(): Unit = {
+    val fetchDataInfoBeforeAppend = readRecords(maxLength = 1)
+    assertTrue(fetchDataInfoBeforeAppend.records.records.asScala.isEmpty)
+
+    mockTime.sleep(1)
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    assertEquals(2L, log.logEndOffset)
+    assertEquals(0L, log.recoveryPoint)
+    val fetchDataInfo = readRecords()
+    assertEquals(2L, fetchDataInfo.records.records.asScala.size)
+    assertEquals(keyValues, 
recordsToKvs(fetchDataInfo.records.records.asScala))
+  }
+
+  @Test
+  def testLogCloseSuccess(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.close()
+    assertThrows(classOf[ClosedChannelException], () => 
appendRecords(kvsToRecords(keyValues), initialOffset = 2L))
+  }
+
+  @Test
+  def testLogCloseIdempotent(): Unit = {
+    log.close()
+    // Check that LocalLog.close() is idempotent
+    log.close()
+  }
+
+  @Test
+  def testLogCloseFailureWhenInMemoryBufferClosed(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.closeHandlers()
+    assertThrows(classOf[KafkaStorageException], () => log.close())
+  }
+
+  @Test
+  def testLogCloseHandlers(): Unit = {
+    val keyValues = Seq(KeyValue("abc", "ABC"), KeyValue("de", "DE"))
+    appendRecords(kvsToRecords(keyValues))
+    log.closeHandlers()
+    assertThrows(classOf[ClosedChannelException],
+                 () => appendRecords(kvsToRecords(keyValues), initialOffset = 
2L))
+  }
+
+  @Test
+  def testLogCloseHandlersIdempotent(): Unit = {
+    log.closeHandlers()
+    // Check that LocalLog.closeHandlers() is idempotent
+    log.closeHandlers()
+  }
+
+  private def testRemoveAndDeleteSegments(asyncDelete: Boolean): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    class TestDeletionReason extends SegmentDeletionReason {
+      private var _deletedSegments: Iterable[LogSegment] = List[LogSegment]()
+
+      override def logReason(toDelete: List[LogSegment]): Unit = {
+        _deletedSegments = List[LogSegment]() ++ toDelete
+      }
+
+      def deletedSegments: Iterable[LogSegment] = _deletedSegments
+    }
+    val reason = new TestDeletionReason()
+    val toDelete = List[LogSegment]() ++ log.segments.values
+    log.removeAndDeleteSegments(toDelete, asyncDelete = asyncDelete, reason)
+    if (asyncDelete) {
+      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
+    }
+    assertTrue(log.segments.isEmpty)
+    assertEquals(toDelete, reason.deletedSegments)
+    toDelete.foreach(segment => assertTrue(segment.deleted()))
+  }
+
+  @Test
+  def testRemoveAndDeleteSegmentsSync(): Unit = {
+    testRemoveAndDeleteSegments(asyncDelete = false)
+  }
+
+  @Test
+  def testRemoveAndDeleteSegmentsAsync(): Unit = {
+    testRemoveAndDeleteSegments(asyncDelete = true)
+  }
+
+  private def testDeleteSegmentFiles(asyncDelete: Boolean): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    val toDelete = List[LogSegment]() ++ log.segments.values
+    LocalLog.deleteSegmentFiles(toDelete, asyncDelete = asyncDelete, log.dir, 
log.topicPartition, log.config, log.scheduler, log.logDirFailureChannel, "")
+    if (asyncDelete) {
+      toDelete.foreach {
+        segment =>
+          assertFalse(segment.deleted())
+          assertTrue(segment.hasSuffix(LocalLog.DeletedFileSuffix))
+      }
+      mockTime.sleep(log.config.fileDeleteDelayMs + 1)
+    }
+    toDelete.foreach(segment => assertTrue(segment.deleted()))
+  }
+
+  @Test
+  def testDeleteSegmentFilesSync(): Unit = {
+    testDeleteSegmentFiles(asyncDelete = false)
+  }
+
+  @Test
+  def testDeleteSegmentFilesAsync(): Unit = {
+    testDeleteSegmentFiles(asyncDelete = true)
+  }
+
+  @Test
+  def testDeletableSegmentsFilter(): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    {
+      val deletable = log.deletableSegments(
+        (segment: LogSegment, _: Option[LogSegment], _: Long) => 
segment.baseOffset <= 5)
+      val expected = log.segments.nonActiveLogSegmentsFrom(0L).filter(segment 
=> segment.baseOffset <= 5)
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment], _: Long) => true)
+      val expected = log.segments.nonActiveLogSegmentsFrom(0L).toList
+      assertEquals(expected, deletable.toList)
+    }
+
+    {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = 9L)
+      val deletable = log.deletableSegments((_: LogSegment, _: 
Option[LogSegment], _: Long) => true)
+      val expected = log.segments.values.toList
+      assertEquals(expected, deletable.toList)
+    }
+  }
+
+  @Test
+  def testDeletableSegmentsIteration(): Unit = {
+    for (offset <- 0 to 8) {
+      val record = new SimpleRecord(mockTime.milliseconds, "a".getBytes)
+      appendRecords(List(record), initialOffset = offset)
+      log.roll()
+    }
+
+    assertEquals(10L, log.segments.numberOfSegments)
+
+    var offset = 0
+    log.deletableSegments(
+      (segment: LogSegment, nextSegmentOpt: Option[LogSegment], logEndOffset: 
Long) => {
+        assertEquals(offset, segment.baseOffset)
+        val floorSegmentOpt = log.segments.floorSegment(offset)
+        assertTrue(floorSegmentOpt.isDefined)
+        assertEquals(floorSegmentOpt.get, segment)
+        if (offset == log.logEndOffset) {
+          assertFalse(nextSegmentOpt.isDefined)
+        } else {
+          assertTrue(nextSegmentOpt.isDefined)
+          val higherSegmentOpt = log.segments.higherSegment(segment.baseOffset)
+          assertTrue(higherSegmentOpt.isDefined)
+          assertEquals(segment.baseOffset + 1, higherSegmentOpt.get.baseOffset)
+          assertEquals(higherSegmentOpt.get, nextSegmentOpt.get)
+        }
+        assertEquals(log.logEndOffset, logEndOffset)
+        offset += 1
+        true
+      })

Review comment:
       Should we assert sth after the log.deletableSegments() call?

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1852,65 +1612,24 @@ class Log(@volatile private var _dir: File,
     logString.toString
   }
 
-  /**
-   * This method deletes the given log segments by doing the following for 
each of them:
-   * <ol>
-   *   <li>It removes the segment from the segment map so that it will no 
longer be used for reads.
-   *   <li>It renames the index and log files by appending .deleted to the 
respective file name
-   *   <li>It can either schedule an asynchronous delete operation to occur in 
the future or perform the deletion synchronously
-   * </ol>
-   * Asynchronous deletion allows reads to happen concurrently without 
synchronization and without the possibility of
-   * physically deleting a file while it is being read.
-   *
-   * This method does not need to convert IOException to KafkaStorageException 
because it is either called before all logs are loaded
-   * or the immediate caller will catch and handle IOException
-   *
-   * @param segments The log segments to schedule for deletion
-   * @param asyncDelete Whether the segment files should be deleted 
asynchronously
-   */
-  private def removeAndDeleteSegments(segments: Iterable[LogSegment],
-                                      asyncDelete: Boolean,
-                                      reason: SegmentDeletionReason): Unit = {
-    if (segments.nonEmpty) {
-      lock synchronized {
-        // As most callers hold an iterator into the `segments` collection and 
`removeAndDeleteSegment` mutates it by
-        // removing the deleted segment, we should force materialization of 
the iterator here, so that results of the
-        // iteration remain valid and deterministic.
-        val toDelete = segments.toList
-        reason.logReason(this, toDelete)
-        toDelete.foreach { segment =>
-          this.segments.remove(segment.baseOffset)
-        }
-        deleteSegmentFiles(toDelete, asyncDelete)
-      }
-    }
-  }
-
-  private def deleteSegmentFiles(segments: Iterable[LogSegment], asyncDelete: 
Boolean, deleteProducerStateSnapshots: Boolean = true): Unit = {
-    Log.deleteSegmentFiles(segments, asyncDelete, 
deleteProducerStateSnapshots, dir, topicPartition,
-      config, scheduler, logDirFailureChannel, producerStateManager, 
this.logIdent)
-  }
-
   private[log] def replaceSegments(newSegments: Seq[LogSegment], oldSegments: 
Seq[LogSegment], isRecoveredSwapFile: Boolean = false): Unit = {

Review comment:
       It seems that we could remove isRecoveredSwapFile since it's always 
false from the caller.

##########
File path: core/src/main/scala/kafka/log/Log.scala
##########
@@ -1806,37 +1566,37 @@ class Log(@volatile private var _dir: File,
     endOffset: Long
   ): Unit = {
     logStartOffset = startOffset
-    nextOffsetMetadata = LogOffsetMetadata(endOffset, 
activeSegment.baseOffset, activeSegment.size)
-    recoveryPoint = math.min(recoveryPoint, endOffset)
-    rebuildProducerState(endOffset, producerStateManager)
+    lock synchronized {
+      rebuildProducerState(endOffset, producerStateManager)
+    }

Review comment:
       This change has a couple of issues. 
   (1) updateHighWatermark() now only updates the offset, but not the 
corresponding offset metadata. The offset metadata is needed in serving fetch 
requests. Recomputing that requires index lookup and log scan, and can be 
extensive. So, we need to preserve the offset metadata during truncate() and 
truncateFully().
   (2) I think updateHighWatermark() needs to be called within the lock. 
updateHighWatermark() reads local log's logEndOffset. So, we don't want the 
logEndOffset to change while updateHighWatermark() is called.

##########
File path: core/src/main/scala/kafka/log/LogLoader.scala
##########
@@ -246,17 +262,17 @@ object LogLoader extends Logging {
         return fn
       } catch {
         case e: LogSegmentOffsetOverflowException =>
-          info(s"${params.logIdentifier}Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
-          Log.splitOverflowedSegment(
+          info(s"${params.logIdentifier} Caught segment overflow error: 
${e.getMessage}. Split segment and retry.")
+          val result = Log.splitOverflowedSegment(
             e.segment,
             params.segments,
             params.dir,
             params.topicPartition,
             params.config,
             params.scheduler,
             params.logDirFailureChannel,
-            params.producerStateManager,
             params.logIdentifier)
+          deleteProducerSnapshotsAsync(result.deletedSegments, params)

Review comment:
       This is unnecessary since during splitting, the old segment is replaced 
with a new segment with the same base offset. So, result.deletedSegments is 
always empty.

##########
File path: core/src/test/scala/unit/kafka/log/LocalLogTest.scala
##########
@@ -0,0 +1,734 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.channels.ClosedChannelException
+import java.nio.charset.StandardCharsets
+import java.util.regex.Pattern
+import java.util.{Collections, Properties}
+
+import kafka.server.{FetchDataInfo, KafkaConfig, LogDirFailureChannel, 
LogOffsetMetadata}
+import kafka.utils.{MockTime, Scheduler, TestUtils}
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.common.errors.KafkaStorageException
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, Record, 
SimpleRecord}
+import org.apache.kafka.common.utils.{Time, Utils}
+import org.junit.jupiter.api.Assertions.{assertFalse, _}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+
+import scala.jdk.CollectionConverters._
+
+class LocalLogTest {
+
+  import kafka.log.LocalLogTest._
+
+  var config: KafkaConfig = null
+  val tmpDir: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val topicPartition = new TopicPartition("test_topic", 1)
+  val logDirFailureChannel = new LogDirFailureChannel(10)
+  val mockTime = new MockTime()
+  val log: LocalLog = createLocalLogWithActiveSegment(config = 
createLogConfig())
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, "127.0.0.1:1", port = -1)
+    config = KafkaConfig.fromProps(props)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    if (!log.isMemoryMappedBufferClosed) {

Review comment:
       Should we use log.checkIfMemoryMappedBufferClosed()?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to