jiafu1115 commented on code in PR #21361:
URL: https://github.com/apache/kafka/pull/21361#discussion_r2739368431


##########
storage/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogManager.java:
##########
@@ -907,28 +908,69 @@ private void maybeUpdateCopiedOffset(UnifiedLog log) 
throws RemoteStorageExcepti
             }
         }
 
+        /**
+         * Check if segment has already expired based on remote storage's 
retention time.
+         */
+        private boolean isSegmentExpiredByTimeForRemoteStorage(LogSegment 
segment, long retentionMs) throws IOException {
+            if (retentionMs <= 0) {
+                return false;
+            }
+            return time.milliseconds() - segment.largestTimestamp() > 
retentionMs;
+        }
+
+        /**
+         * Check if segment has already expired based on remote storageā€˜s 
retention size.
+         */
+        private boolean isSegmentExpiredBySizeForRemoteStorage(LogSegment 
segment, long retentionBytes, long logSize, long accumulatedSkippedSize) {
+            if (retentionBytes <= 0) {
+                return false;
+            }
+            return (logSize - retentionBytes - accumulatedSkippedSize) > 
segment.size();
+        }
+
         /**
          *  Segments which match the following criteria are eligible for 
copying to remote storage:
          *  1) Segment is not the active segment and
          *  2) Segment end-offset is less than the last-stable-offset as 
remote storage should contain only
          *     committed/acked messages
+         * 
+         * Additionally, if a segment has already expired based on remote 
storage's retention configuration,
+         * it will be skipped from upload and logStartOffset will be updated 
to allow local deletion.
+         *
          * @param log The log from which the segments are to be copied
          * @param fromOffset The offset from which the segments are to be 
copied
          * @param lastStableOffset The last stable offset of the log
          * @return candidate log segments to be copied to remote storage
          */
-        List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) {
+        List<EnrichedLogSegment> candidateLogSegments(UnifiedLog log, Long 
fromOffset, Long lastStableOffset) throws IOException {
             List<EnrichedLogSegment> candidateLogSegments = new ArrayList<>();
             List<LogSegment> segments = log.logSegments(fromOffset, 
Long.MAX_VALUE);
-            if (!segments.isEmpty()) {
-                for (int idx = 1; idx < segments.size(); idx++) {
-                    LogSegment previousSeg = segments.get(idx - 1);
-                    LogSegment currentSeg = segments.get(idx);
-                    if (currentSeg.baseOffset() <= lastStableOffset) {
-                        candidateLogSegments.add(new 
EnrichedLogSegment(previousSeg, currentSeg.baseOffset()));
-                    }
+            if (segments.isEmpty()) {
+                return candidateLogSegments;
+            }
+            long retentionMs = log.config() != null ? log.config().retentionMs 
: -1;
+            long retentionSize = log.config() != null ? 
log.config().retentionSize : -1;
+            // Compute log.size() once when retention is size-based; skip when 
not needed to avoid wasted work.
+            long logSize = retentionSize > 0 ? log.size() : -1;
+            long accumulatedSkippedSize = 0;
+            for (int idx = 1; idx < segments.size(); idx++) {
+                LogSegment previousSeg = segments.get(idx - 1);
+                LogSegment currentSeg = segments.get(idx);
+                if (currentSeg.baseOffset() > lastStableOffset) {
+                    continue;
+                }
+                
+                if (isSegmentExpiredByTimeForRemoteStorage(previousSeg, 
retentionMs) || 
+                    isSegmentExpiredBySizeForRemoteStorage(previousSeg, 
retentionSize, logSize, accumulatedSkippedSize)) {
+                    long newLogStartOffset = currentSeg.baseOffset();
+                    log.maybeIncrementLogStartOffset(newLogStartOffset, 
LogStartOffsetIncrementReason.SegmentExpiredByRemoteRetention);
+                    logger.info("Segment {} has already expired based on 
remote storage's retention configuration. Skipping upload and incrementing 
logStartOffset to {} to allow local deletion.",
+                            previousSeg, newLogStartOffset);
+                    accumulatedSkippedSize += previousSeg.size();
+                    continue;

Review Comment:
   Note: The middle segment should not be skipped. Find segments starting from 
the oldest until the user-supplied predicate is false.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to