junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2294227533


##########
core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala:
##########
@@ -260,11 +260,13 @@ abstract class QuorumTestHarness extends Logging {
     props.setProperty(MetadataLogConfig.METADATA_LOG_DIR_CONFIG, 
metadataDir.getAbsolutePath)
     val proto = controllerListenerSecurityProtocol.toString
     val securityProtocolMaps = extraControllerSecurityProtocols().map(sc => sc 
+ ":" + sc).mkString(",")
-    val listeners = extraControllerSecurityProtocols().map(sc => sc + 
"://localhost:0").mkString(",")
-    val listenerNames = extraControllerSecurityProtocols().mkString(",")
+    val listeners = extraControllerSecurityProtocols().map(sc => sc + 
"://localhost:0").mkString(",").trim

Review Comment:
   Why is `trim` needed? Ditto below.



##########
clients/src/main/java/org/apache/kafka/common/config/TopicConfig.java:
##########
@@ -159,7 +159,9 @@ public class TopicConfig {
         "<a href=\"#compaction\">log compaction</a>, which retains the latest 
value for each key. " +
         "It is also possible to specify both policies in a comma-separated 
list (e.g. \"delete,compact\"). " +
         "In this case, old segments will be discarded per the retention time 
and size configuration, " +
-        "while retained segments will be compacted.";
+        "while retained segments will be compacted." +
+        "An empty list means infinite retention - no cleanup policies will be 
applied and log segments " +
+        "will be retained indefinitely.";

Review Comment:
   It would be useful to mention that local retention is still enforced with 
remote storage enabled. Also, could we update the doc for cleanup.policy in 
ServerLogConfigs too?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -561,9 +561,8 @@ public static void 
validateRemoteStorageOnlyIfSystemEnabled(Map<?, ?> props, boo
     @SuppressWarnings("unchecked")
     private static void 
validateRemoteStorageRequiresDeleteCleanupPolicy(Map<?, ?> props) {
         List<String> cleanupPolicy = (List<String>) 
props.get(TopicConfig.CLEANUP_POLICY_CONFIG);
-        Set<String> policySet = cleanupPolicy.stream().map(policy -> 
policy.toLowerCase(Locale.getDefault())).collect(Collectors.toSet());
-        if (!Set.of(TopicConfig.CLEANUP_POLICY_DELETE).equals(policySet)) {
-            throw new ConfigException("Remote log storage only supports topics 
with cleanup.policy=delete");
+        if (!cleanupPolicy.isEmpty() && (cleanupPolicy.size() != 1 || 
!TopicConfig.CLEANUP_POLICY_DELETE.equals(cleanupPolicy.get(0)))) {
+            throw new ConfigException("Remote log storage only supports topics 
with cleanup.policy=delete or cleanup.policy is empty list.");

Review Comment:
   cleanup.policy is empty list => cleanup.policy being an empty list



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1879,16 +1879,25 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
 
     /**
      * If topic deletion is enabled, delete any local log segments that have 
either expired due to time based
-     * retention or because the log size is > retentionSize. Whether or not 
deletion is enabled, delete any local
+     * retention or because the log size is > retentionSize. Empty 
cleanup.policy with remote storage enabled 
+     * behaves the same as deletion policy. Whether or not deletion is 
enabled, delete any local

Review Comment:
   Empty cleanup.policy with remote storage enabled behaves the same as 
deletion policy => Empty cleanup.policy is the same as delete with infinite 
retention. So, we only need to delete local segments if remote storage is 
enabled.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to