junrao commented on a change in pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#discussion_r835434811
##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1355,15 +1355,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to
delete all the segments, create a new one first
- if (localLog.segments.numberOfSegments == numToDelete)
- roll()
- lock synchronized {
- localLog.checkIfMemoryMappedBufferClosed()
- // remove the segments for lookups
- localLog.removeAndDeleteSegments(deletable, asyncDelete = true,
reason)
- deleteProducerSnapshots(deletable, asyncDelete = true)
-
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get,
SegmentDeletion)
+ var segmentsToDelete = deletable
+ if (localLog.segments.numberOfSegments == numToDelete) {
+ val newSegment = roll()
+ if (deletable.last.baseOffset == newSegment.baseOffset) {
+ warn(s"Empty active segment at ${deletable.last.baseOffset} was
deleted and recreated due to $reason")
+ segmentsToDelete = deletable.dropRight(1)
Review comment:
Hmm, I am not sure if this part of the logic is completely right. roll()
eventually calls createAndDeleteSegment() that always deletes the current
active segment. So, in the case when the new segment's offset is different from
the last segment's offset, it seems we will be deleting the last segment twice,
once in createAndDeleteSegment() and another here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]