satishd commented on code in PR #15141:
URL: https://github.com/apache/kafka/pull/15141#discussion_r1444173390


##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1510,10 +1510,21 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           }
         }
         localLog.checkIfMemoryMappedBufferClosed()
-        // remove the segments for lookups
-        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
-        deleteProducerSnapshots(deletable, asyncDelete = true)
-        
incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, 
LogStartOffsetIncrementReason.SegmentDeletion)
+        if (remoteLogEnabled()) {
+          /**
+           * See @link{https://issues.apache.org/jira/browse/KAFKA-16073} for 
the background of this fix
+           */
+          val newLocalLogStartOffset = 
localLog.segments.higherSegment(deletable.last.baseOffset()).get().baseOffset()
+          incrementStartOffset(newLocalLogStartOffset, 
LogStartOffsetIncrementReason.SegmentDeletion)

Review Comment:
   Please add UTs for this change. They should cover different scenarios like 
   * more than 1 deletable segment
   * empty deletable segments or
   * only active segment that needs to be deleted



##########
core/src/main/scala/kafka/log/UnifiedLog.scala:
##########
@@ -1510,10 +1510,21 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           }
         }
         localLog.checkIfMemoryMappedBufferClosed()
-        // remove the segments for lookups
-        localLog.removeAndDeleteSegments(segmentsToDelete, asyncDelete = true, 
reason)
-        deleteProducerSnapshots(deletable, asyncDelete = true)
-        
incrementStartOffset(localLog.segments.firstSegmentBaseOffset.getAsLong, 
LogStartOffsetIncrementReason.SegmentDeletion)
+        if (remoteLogEnabled()) {
+          /**
+           * See @link{https://issues.apache.org/jira/browse/KAFKA-16073} for 
the background of this fix
+           */
+          val newLocalLogStartOffset = 
localLog.segments.higherSegment(deletable.last.baseOffset()).get().baseOffset()

Review Comment:
   I would suggest still making the change to use `segmentsToDelete` instead of 
`deletable` as that is more accurate and covers if we change the logic to 
generate deletable segments in tiered-storage path in future. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to