chia7712 commented on code in PR #21379:
URL: https://github.com/apache/kafka/pull/21379#discussion_r2801941737


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/Cleaner.java:
##########
@@ -169,9 +176,17 @@ public Map.Entry<Long, CleanerStats> doClean(LogToClean 
cleanable, long currentT
                 log.name(), new Date(cleanableHorizonMs), new 
Date(legacyDeleteHorizonMs));
         CleanedTransactionMetadata transactionMetadata = new 
CleanedTransactionMetadata();
 
+        double sizeRatio = 
segmentOverflowPartitions.getOrDefault(log.topicPartition(), 1.0);
+        if (sizeRatio != 1.0) {
+            logger.info("Partition {} has overflow history. " + "Reducing 
effective segment size to {}% for this round.",
+                    log.topicPartition(), sizeRatio * 100);
+        }
+
+        int effectiveMaxSize = (int) (log.config().segmentSize() * sizeRatio);
+
         List<List<LogSegment>> groupedSegments = groupSegmentsBySize(
                 log.logSegments(0, endOffset),
-                log.config().segmentSize(),
+                effectiveMaxSize,

Review Comment:
   yes, we can roll the new segment when either the "size check" or "overflow 
check" is triggered.
   
   ```java
                   // 1. Size Check: current size + retained size > config limit
                   // 2. Overflow Check: max offset - base offset > 
Integer.MAX_VALUE
                   boolean willExceedSize = (long) dest.size() + 
retained.sizeInBytes() > log.config().segmentSize();
                   boolean willOverflow = result.maxOffset() - 
dest.baseOffset() > Integer.MAX_VALUE;
   
                   if (willExceedSize || willOverflow) {
                       logger.info("Rolling new segment. Condition met: 
size_exceeded={}, overflow={}. (Segment size: {}, Batch size: {}, BaseOffset: 
{}, MaxOffset: {})",
                               willExceedSize, willOverflow, dest.size(), 
retained.sizeInBytes(), dest.baseOffset(), result.maxOffset());
   
                       dest = rollNewSegment(log, dest, cleanedSegments, 
transactionMetadata, retained);
                   }
   ```
   
   However, I have another concern regrading "temporary disk usage". If we 
remove the initial segment grouping entirely, it might requires a significant 
amount of disk space to hold all the cleaned segments simultaneously before the 
replacement happens.
   
   
   I believe the grouping logic should be retained, but simplified to serve as 
a batch size threshold. This way, we can control the cleaning scope to avoid 
occupying too much disk space, while still allowing the inner logic to split 
segment dynamically if needed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to