junrao commented on a change in pull request #11950:
URL: https://github.com/apache/kafka/pull/11950#discussion_r836679617
##########
File path: core/src/main/scala/kafka/log/LocalLog.scala
##########
@@ -517,14 +558,16 @@ class LocalLog(@volatile private var _dir: File,
debug(s"Truncate and start at offset $newOffset")
checkIfMemoryMappedBufferClosed()
val segmentsToDelete = List[LogSegment]() ++ segments.values
- removeAndDeleteSegments(segmentsToDelete, asyncDelete = true,
LogTruncation(this))
- segments.add(LogSegment.open(dir,
- baseOffset = newOffset,
- config = config,
- time = time,
- initFileSize = config.initFileSize,
- preallocate = config.preallocate))
+ if (segmentsToDelete.size > 1)
Review comment:
Would it be better to combine the two ifs together to sth like the
following? It would also be useful to add a comment why we want do special
treatment for the last segment.
```
if (segmentsToDelete.nonEmpty) {
removeAndDeleteSegments(segmentsToDelete.dropRight(1), asyncDelete =
true, LogTruncation(this))
createAndDeleteSegment(newOffset, segmentsToDelete.last, asyncDelete
= true, LogTruncation(this))
}
```
##########
File path: core/src/main/scala/kafka/log/UnifiedLog.scala
##########
@@ -1355,15 +1355,19 @@ class UnifiedLog(@volatile var logStartOffset: Long,
val numToDelete = deletable.size
if (numToDelete > 0) {
// we must always have at least one segment, so if we are going to
delete all the segments, create a new one first
- if (localLog.segments.numberOfSegments == numToDelete)
- roll()
- lock synchronized {
- localLog.checkIfMemoryMappedBufferClosed()
- // remove the segments for lookups
- localLog.removeAndDeleteSegments(deletable, asyncDelete = true,
reason)
- deleteProducerSnapshots(deletable, asyncDelete = true)
-
maybeIncrementLogStartOffset(localLog.segments.firstSegmentBaseOffset.get,
SegmentDeletion)
+ var segmentsToDelete = deletable
+ if (localLog.segments.numberOfSegments == numToDelete) {
+ val newSegment = roll()
+ if (deletable.last.baseOffset == newSegment.baseOffset) {
+ warn(s"Empty active segment at ${deletable.last.baseOffset} was
deleted and recreated due to $reason")
+ segmentsToDelete = deletable.dropRight(1)
Review comment:
Thanks, Yang. Makes sense to me now.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]