junrao commented on a change in pull request #11345:
URL: https://github.com/apache/kafka/pull/11345#discussion_r754503632



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1504,22 +1504,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   /**
    * Flush all local log segments
+   *
+   * @param inclusive should be true during a clean shutdown, and false 
otherwise. The reason is that
+   * we have to pass logEngOffset + 1 to the `localLog.flush(offset: Long): 
Unit` function to flush empty
+   * active segments, which is important to make sure we don't lose the empty 
index file during shutdown.
    */
-  def flush(): Unit = flush(logEndOffset)
+  def flush(inclusive: Boolean): Unit = flush(logEndOffset, inclusive)
 
   /**
    * Flush local log segments for all offsets up to offset-1
    *
    * @param offset The offset to flush up to (non-inclusive); the new recovery 
point
    */
-  def flush(offset: Long): Unit = {
-    maybeHandleIOException(s"Error while flushing log for $topicPartition in 
dir ${dir.getParent} with offset $offset") {
-      if (offset > localLog.recoveryPoint) {
-        debug(s"Flushing log up to offset $offset, last flushed: 
$lastFlushTime,  current time: ${time.milliseconds()}, " +
+  def flush(offset: Long): Unit = flush(offset, false)
+
+  private def flush(offset: Long, includingOffset: Boolean): Unit = {

Review comment:
       Could we add a comment on how includingOffset affects the recovery point?

##########
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##########
@@ -212,7 +212,7 @@ final class KafkaMetadataLog private (
   }
 
   override def flush(): Unit = {
-    log.flush()
+    log.flush(true)

Review comment:
       This is called on every log append. We only need to do the inclusive 
part in close().




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to