kowshik commented on a change in pull request #11345: URL: https://github.com/apache/kafka/pull/11345#discussion_r758691775
########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Flush all local log segments + * + * @param inclusive should be true during a clean shutdown, and false otherwise. The reason is that + * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): Unit` function to flush empty + * active segments, which is important to make sure we don't lose the empty index file during shutdown. */ - def flush(): Unit = flush(logEndOffset) + def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive) /** * Flush local log segments for all offsets up to offset-1 * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush(offset: Long): Unit = { - maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { - if (offset > localLog.recoveryPoint) { - debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + + def flush(offset: Long): Unit = flush(offset, false) + + /** + * Flush local log segments for all offsets up to offset-1 if includingOffset=false; up to offset + * if includingOffset=true. The recovery point is set to offset-1. + * + * @param offset The offset to flush up to (non-inclusive); the new recovery point + * @param includingOffset Whether the flush includes the provided offset. + */ + private def flush(offset: Long, includingOffset: Boolean): Unit = { + val flushOffset = if (includingOffset) offset + 1 else offset + val recoveryPoint = offset Review comment: Can we call this `newRecoveryPoint` ? ########## File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala ########## @@ -1630,6 +1630,20 @@ class UnifiedLogTest { assertThrows(classOf[OffsetOutOfRangeException], () => LogTestUtils.readLog(log, 1026, 1000)) } + @Test + def testFlushingEmptyActiveSegments(): Unit = { Review comment: Is it possible to add another test in `LogLoaderTest`, where, we clean shutdown a Log with empty active segment and reload the log again. Then, we should expect that the recovery code path doesn't declare the segment to be corrupted. ########## File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala ########## @@ -1630,6 +1630,20 @@ class UnifiedLogTest { assertThrows(classOf[OffsetOutOfRangeException], () => LogTestUtils.readLog(log, 1026, 1000)) } + @Test + def testFlushingEmptyActiveSegments(): Unit = { + val logConfig = LogTestUtils.createLogConfig() + val log = createLog(logDir, logConfig) + val message = TestUtils.singletonRecords(value = "Test".getBytes, timestamp = mockTime.milliseconds) + log.appendAsLeader(message, leaderEpoch = 0) + log.roll() + assertEquals(2, logDir.listFiles(_.getName.endsWith(".log")).length) + assertEquals(1, logDir.listFiles(_.getName.endsWith(".index")).length) + log.flush(true) Review comment: Just before this line, can we assert that the active segment is empty? this sets up the reason why we are force flushing it here. ########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Flush all local log segments + * + * @param inclusive should be true during a clean shutdown, and false otherwise. The reason is that + * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): Unit` function to flush empty + * active segments, which is important to make sure we don't lose the empty index file during shutdown. */ - def flush(): Unit = flush(logEndOffset) + def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive) /** * Flush local log segments for all offsets up to offset-1 * * @param offset The offset to flush up to (non-inclusive); the new recovery point */ - def flush(offset: Long): Unit = { - maybeHandleIOException(s"Error while flushing log for $topicPartition in dir ${dir.getParent} with offset $offset") { - if (offset > localLog.recoveryPoint) { - debug(s"Flushing log up to offset $offset, last flushed: $lastFlushTime, current time: ${time.milliseconds()}, " + + def flush(offset: Long): Unit = flush(offset, false) Review comment: There is another flush method defined above with only one parameter, which is a boolean. It is easy to confuse that with this method, which also has only one parameter. So just for readability, does it make sense to call this as `def flushUptoOffsetExclusive(offset: Long)`? ########## File path: core/src/main/scala/kafka/log/UnifiedLog.scala ########## @@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long, /** * Flush all local log segments + * + * @param inclusive should be true during a clean shutdown, and false otherwise. The reason is that + * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): Unit` function to flush empty + * active segments, which is important to make sure we don't lose the empty index file during shutdown. */ - def flush(): Unit = flush(logEndOffset) + def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive) Review comment: Can we call this parameter as `forceFlushActiveSegment`? I don't understand what does `inclusive` mean in this public API. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org