kowshik commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r758691775



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Flush all local log segments
+   *
+   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive)
 
   /**
    * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
-  def flush(offset: Long): Unit = {
-    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > localLog.recoveryPoint) {
-        debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
+  def flush(offset: Long): Unit = flush(offset, false)
+
+  /**
+   * Flush local log segments for all offsets up to offset-1 if 
includingOffset=false; up to offset
+   * if includingOffset=true. The recovery point is set to offset-1.
+   *
+   * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
+   * @param includingOffset Whether the flush includes the provided offset.
+   */
+  private def flush(offset: Long, includingOffset: Boolean): Unit = {
+    val flushOffset = if (includingOffset) offset + 1  else offset
+    val recoveryPoint = offset

Review comment:
       Can we call this `newRecoveryPoint` ?

##########
File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
##########
@@ -1630,6 +1630,20 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
LogTestUtils.readLog(log, 1026, 1000))
   }
 
+  @Test
+  def testFlushingEmptyActiveSegments(): Unit = {

Review comment:
       Is it possible to add another test in `LogLoaderTest`, where, we clean 
shutdown a Log with empty active segment and reload the log again. Then, we 
should expect that the recovery code path doesn't declare the segment to be 
corrupted.

##########
File path: core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
##########
@@ -1630,6 +1630,20 @@ class UnifiedLogTest {
     assertThrows(classOf[OffsetOutOfRangeException], () => 
LogTestUtils.readLog(log, 1026, 1000))
   }
 
+  @Test
+  def testFlushingEmptyActiveSegments(): Unit = {
+    val logConfig = LogTestUtils.createLogConfig()
+    val log = createLog(logDir, logConfig)
+    val message = TestUtils.singletonRecords(value = "Test".getBytes, 
timestamp = mockTime.milliseconds)
+    log.appendAsLeader(message, leaderEpoch = 0)
+    log.roll()
+    assertEquals(2, logDir.listFiles(_.getName.endsWith(".log")).length)
+    assertEquals(1, logDir.listFiles(_.getName.endsWith(".index")).length)
+    log.flush(true)

Review comment:
       Just before this line, can we assert that the active segment is empty? 
this sets up the reason why we are force flushing it here.

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Flush all local log segments
+   *
+   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive)
 
   /**
    * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
-  def flush(offset: Long): Unit = {
-    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > localLog.recoveryPoint) {
-        debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
+  def flush(offset: Long): Unit = flush(offset, false)

Review comment:
       There is another flush method defined above with only one parameter, 
which is a boolean. It is easy to confuse that with this method, which also has 
only one parameter. So just for readability, does it make sense to call this as 
`def flushUptoOffsetExclusive(offset: Long)`?

##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1504,22 +1504,37 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Flush all local log segments
+   *
+   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive)

Review comment:
       Can we call this parameter as `forceFlushActiveSegment`? I don't 
understand what does `inclusive` mean in this public API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to