kamalcph commented on code in PR #21612:
URL: https://github.com/apache/kafka/pull/21612#discussion_r2870844762


##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1944,25 +1965,29 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
      * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
+        int deletedSegments;
         if (config().delete) {
-            return deleteLogStartOffsetBreachedSegments() +
+            deletedSegments = deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
         } else if (config().compact) {
-            return deleteLogStartOffsetBreachedSegments();
+            deletedSegments = deleteLogStartOffsetBreachedSegments();
         } else {
             // If cleanup.policy is empty and remote storage is enabled, the 
local log segments will 
             // be cleaned based on the values of log.local.retention.bytes and 
log.local.retention.ms
             if (remoteLogEnabledAndRemoteCopyEnabled()) {
-                return deleteLogStartOffsetBreachedSegments() +
+                deletedSegments = deleteLogStartOffsetBreachedSegments() +
                         deleteRetentionSizeBreachedSegments() +
                         deleteRetentionMsBreachedSegments();
             } else {
                 // If cleanup.policy is empty and remote storage is disabled, 
we should not delete any local log segments 
                 // unless the log start offset advances through deleteRecords
-                return deleteLogStartOffsetBreachedSegments();
+                deletedSegments = deleteLogStartOffsetBreachedSegments();
             }
         }
+        // To save CPU cycles, calculate retentionSizeInPercent only when the 
log-cleaner thread runs
+        retentionSizeInPercentValue.set(calculateRetentionSizeInPercent());

Review Comment:
   can we run this in `finally` block to ensure that the size is calculated 
during log deletion errors too? Also, cover this scenario with a test.



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -747,4 +753,101 @@ public static MemoryRecords records(List<SimpleRecord> 
records,
         }
         return builder.build();
     }
+
+    /**
+     * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+     * The metric should only be reported for non-tiered topics with 
size-based retention configured.
+     *
+     * @param remoteLogStorageEnable whether remote log storage is enabled
+     * @param remoteLogCopyDisable whether remote log copy is disabled (only 
relevant when remote storage is enabled)
+     * @param expectedSizeInPercent expected percentage value after retention 
cleanup
+     */
+    @ParameterizedTest
+    @CsvSource({
+        // Remote storage enabled with copy enabled: metric handled by 
RemoteLogManager, returns 0 here
+        "true, false, 0",
+        // Remote storage enabled but copy disabled: metric should be 
calculated (100%)
+        "true, true, 100",
+        // Remote storage disabled: metric should be calculated (100%)
+        "false, false, 100",
+        // Remote storage disabled (remoteLogCopyDisable is ignored): metric 
should be calculated (100%)
+        "false, true, 100"
+    })
+    public void testRetentionSizeInPercentMetric(boolean 
remoteLogStorageEnable, boolean remoteLogCopyDisable, int 
expectedSizeInPercent) throws IOException {
+        Supplier<MemoryRecords> records = () -> 
singletonRecords("test".getBytes());
+        int recordSize = records.get().sizeInBytes();
+        LogConfig logConfig = new LogTestUtils.LogConfigBuilder()
+                .segmentBytes(recordSize * 5)
+                .retentionBytes(recordSize * 10L)
+                .remoteLogStorageEnable(remoteLogStorageEnable)
+                .remoteLogCopyDisable(remoteLogCopyDisable)
+                .build();
+        log = createLog(logDir, logConfig, true);
+
+        String metricName = "name=RetentionSizeInPercent,topic=" + 
log.topicPartition().topic() + 
+                ",partition=" + log.topicPartition().partition();
+
+        // Append some messages to create 3 segments (15 records / 5 records 
per segment = 3 segments)
+        for (int i = 0; i < 15; i++) {
+            log.appendAsLeader(records.get(), 0);
+        }
+
+        // Before deletion, calculate what the percentage should be
+        // Total size = 15 * recordSize, retention = 10 * recordSize
+        // Percentage = (15 * 100) / 10 = 150% (for non-tiered topics)
+        if (!remoteLogStorageEnable || remoteLogCopyDisable) {
+            assertEquals(150, log.calculateRetentionSizeInPercent());
+        }
+
+        log.updateHighWatermark(log.logEndOffset());
+        // For tiered storage tests, simulate remote storage having the data
+        if (remoteLogStorageEnable) {
+            log.updateHighestOffsetInRemoteStorage(9);
+        }
+        log.deleteOldSegments();
+
+        // After deletion: log size should be ~10 * recordSize (2 segments), 
retention = 10 * recordSize
+        // Percentage = (10 * 100) / 10 = 100% (for non-tiered topics)
+        // Verify via Yammer metric (JMX)
+        assertEquals(expectedSizeInPercent, yammerMetricValue(metricName));
+        assertEquals(2, log.numberOfSegments(), "should have 2 segments after 
deletion");
+    }
+
+    @Test
+    public void testRetentionSizeInPercentWithZeroRetention() throws 
IOException {

Review Comment:
   nit: testRetentionSizeInPercentWithZeroRetention -> 
testRetentionSizeInPercentWithNegativeRetention / 
testRetentionSizeInPercentWithInfiniteRetention



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogMetricNames.java:
##########
@@ -24,6 +24,7 @@ public class LogMetricNames {
     public static final String LOG_START_OFFSET = "LogStartOffset";
     public static final String LOG_END_OFFSET = "LogEndOffset";
     public static final String SIZE = "Size";
+    public static final String RETENTION_SIZE_IN_PERCENT = 
"RetentionSizeInPercent";
 
-    public static final List<String> ALL_METRIC_NAMES = 
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE);
+    public static final List<String> ALL_METRIC_NAMES = 
List.of(NUM_LOG_SEGMENTS, LOG_START_OFFSET, LOG_END_OFFSET, SIZE, 
RETENTION_SIZE_IN_PERCENT);

Review Comment:
   nit: fold this line to keep the line width to 120 chars. 



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/UnifiedLogTest.java:
##########
@@ -747,4 +753,101 @@ public static MemoryRecords records(List<SimpleRecord> 
records,
         }
         return builder.build();
     }
+
+    /**
+     * Test RetentionSizeInPercent metric for regular (non-tiered) topics.
+     * The metric should only be reported for non-tiered topics with 
size-based retention configured.
+     *
+     * @param remoteLogStorageEnable whether remote log storage is enabled
+     * @param remoteLogCopyDisable whether remote log copy is disabled (only 
relevant when remote storage is enabled)
+     * @param expectedSizeInPercent expected percentage value after retention 
cleanup
+     */
+    @ParameterizedTest
+    @CsvSource({
+        // Remote storage enabled with copy enabled: metric handled by 
RemoteLogManager, returns 0 here
+        "true, false, 0",
+        // Remote storage enabled but copy disabled: metric should be 
calculated (100%)
+        "true, true, 100",
+        // Remote storage disabled: metric should be calculated (100%)
+        "false, false, 100",
+        // Remote storage disabled (remoteLogCopyDisable is ignored): metric 
should be calculated (100%)
+        "false, true, 100"
+    })
+    public void testRetentionSizeInPercentMetric(boolean 
remoteLogStorageEnable, boolean remoteLogCopyDisable, int 
expectedSizeInPercent) throws IOException {

Review Comment:
   nit: fold this line



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to