yyu1993 commented on a change in pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#discussion_r835676088



##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1355,15 +1355,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
       val numToDelete = deletable.size
       if (numToDelete > 0) {
         // we must always have at least one segment, so if we are going to 
delete all the segments, create a new one first
-        if (localLog.segments.numberOfSegments == numToDelete)
-          roll()
-        lock synchronized {
-          localLog.checkIfMemoryMappedBufferClosed()
-          // remove the segments for lookups
-          localLog.removeAndDeleteSegments(deletable, asyncDelete = true, 
reason)
-          deleteProducerSnapshots(deletable, asyncDelete = true)
-          
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get, 
SegmentDeletion)
+        var segmentsToDelete = deletable
+        if (localLog.segments.numberOfSegments == numToDelete) {
+          val newSegment = roll()
+          if (deletable.last.baseOffset == newSegment.baseOffset) {
+            warn(s"Empty active segment at ${deletable.last.baseOffset} was 
deleted and recreated due to $reason")
+            segmentsToDelete = deletable.dropRight(1)

Review comment:
       In roll(), we only call createAndDeleteSegment() when new segment's 
offset is same as the last segment's offset and last segment is empty. So in 
the case when new segment's offset is different, we will not delete the last 
segment in roll(). It will be deleted as part of segmentsToDelete. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to