showuon commented on code in PR #16681: URL: https://github.com/apache/kafka/pull/16681#discussion_r1726561154
########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1209,60 +1195,80 @@ void cleanupExpiredRemoteLogSegments() throws RemoteStorageException, ExecutionE long logStartOffset = log.logStartOffset(); long logEndOffset = log.logEndOffset(); - Optional<RetentionSizeData> retentionSizeData = buildRetentionSizeData(log.config().retentionSize, - log.onlyLocalLogSegmentsSize(), logEndOffset, epochWithOffsets); - Optional<RetentionTimeData> retentionTimeData = buildRetentionTimeData(log.config().retentionMs); - - RemoteLogRetentionHandler remoteLogRetentionHandler = new RemoteLogRetentionHandler(retentionSizeData, retentionTimeData); - Iterator<Integer> epochIterator = epochWithOffsets.navigableKeySet().iterator(); - boolean canProcess = true; - List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); long sizeOfDeletableSegmentsBytes = 0L; - while (canProcess && epochIterator.hasNext()) { - Integer epoch = epochIterator.next(); - Iterator<RemoteLogSegmentMetadata> segmentsIterator = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, epoch); - while (canProcess && segmentsIterator.hasNext()) { - if (isCancelled()) { - logger.info("Returning from remote log segments cleanup for the remaining segments as the task state is changed."); - return; - } - RemoteLogSegmentMetadata metadata = segmentsIterator.next(); - if (segmentIdsBeingCopied.contains(metadata.remoteLogSegmentId())) { - logger.debug("Copy for the segment {} is currently in process. Skipping cleanup for it and the remaining segments", - metadata.remoteLogSegmentId()); - canProcess = false; - continue; - } - if (RemoteLogSegmentState.DELETE_SEGMENT_FINISHED.equals(metadata.state())) { + final List<RemoteLogSegmentMetadata> segmentsToDelete = new ArrayList<>(); + final List<RemoteLogSegmentMetadata> validSegments = new ArrayList<>(); + for (Integer remoteLeaderEpoch: epochWithOffsets.navigableKeySet()) { + Iterator<RemoteLogSegmentMetadata> it = remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, remoteLeaderEpoch); + while (it.hasNext()) { + final RemoteLogSegmentMetadata segment = it.next(); + + // We can remove all segments in COPY_SEGMENT_STARTED but the last one as they are dangling + if (segment.state().equals(RemoteLogSegmentState.COPY_SEGMENT_STARTED) && it.hasNext()) { Review Comment: During implementing the fix, I found this check is not correct because if there are 5 consecutive upload failure, we will have 5 different segment ID metadata under `COPY_SEGMENT_STARTED`, with the same epoch, say 0. So, when we are doing `remoteLogMetadataManager.listRemoteLogSegments(topicIdPartition, 0);` We'll get 5 segments, but only filter out the last segment, while we should filter out all these 5 segments while doing remote log size calculation. In my PR, I fix it by only count the segments with `COPY_SEGMENT_FINISHED` and `DELETE_SEGMENT_START`. Let me know if you have any comment. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org