junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2288958876


##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -79,7 +80,13 @@ public AbstractKafkaConfig(ConfigDef definition, Map<?, ?> 
originals, Map<String
     }
 
     public List<String> logDirs() {
-        return 
Csv.parseCsvList(Optional.ofNullable(getString(ServerLogConfigs.LOG_DIRS_CONFIG)).orElse(getString(ServerLogConfigs.LOG_DIR_CONFIG)));
+        return Optional.ofNullable(getList(ServerLogConfigs.LOG_DIRS_CONFIG))
+                .orElse(Stream.of(getString(ServerLogConfigs.LOG_DIR_CONFIG))
+                        .map(String::trim).filter(s -> !s.isEmpty()).toList()

Review Comment:
   It's probably better to keep the empty string. Otherwise, we could fail with 
IllegalArgumentException when accessing an empty list, which is less intuitive.
   
   Also, if we want to do `trim()`, it's probably better to do this 
consistently on all configs. 



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1887,8 +1887,10 @@ public int deleteOldSegments() throws IOException {
             return deleteLogStartOffsetBreachedSegments() +
                     deleteRetentionSizeBreachedSegments() +
                     deleteRetentionMsBreachedSegments();
-        } else {
+        } else if (config().compact) {
             return deleteLogStartOffsetBreachedSegments();
+        } else {
+            return deleteLogStartOffsetBreachedSegments() + 
deleteRetentionSizeBreachedSegments();

Review Comment:
   This is not quite right. If `cleanup.policy` is empty, we want to ignore 
`log.retention.bytes` and `log.retention.ms` and only pick up 
`log.local.retention.bytes` and `log.local.retention.ms` if remote storage is 
enabled.



##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -199,11 +199,14 @@ public static List<Endpoint> listenerListToEndPoints(
     }
 
     public static List<Endpoint> listenerListToEndPoints(
-        String input,
+        List<String> input,
         Function<ListenerName, SecurityProtocol> nameToSecurityProto
     ) {
         List<Endpoint> results = new ArrayList<>();
-        for (String entry : Csv.parseCsvList(input.trim())) {
+        for (String entry : input) {
+            if (entry.isEmpty()) {

Review Comment:
   Hmm, if it's empty, we should continue and fail with an exception, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to