chia7712 commented on code in PR #20334: URL: https://github.com/apache/kafka/pull/20334#discussion_r2311409416
########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java: ########## @@ -139,12 +139,12 @@ public abstract class RestServerConfig extends AbstractConfig { public static void addPublicConfig(ConfigDef configDef) { addInternalConfig(configDef); configDef - .define( - REST_EXTENSION_CLASSES_CONFIG, + .define(REST_EXTENSION_CLASSES_CONFIG, ConfigDef.Type.LIST, - "", - ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC - ).define(ADMIN_LISTENERS_CONFIG, + List.of(), + ConfigDef.ValidList.anyNonDuplicateValues(true, false), + ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC) + .define(ADMIN_LISTENERS_CONFIG, ConfigDef.Type.LIST, null, new AdminListenersValidator(), Review Comment: it seems `AdminListenersValidator` could be replaced by `ConfigDef.ValidList.anyNonDuplicateValues(true, true)`, right? ########## clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java: ########## @@ -1006,26 +1006,59 @@ else if (max == null) public static class ValidList implements Validator { final ValidString validString; + final boolean isEmptyAllowed; + final boolean isNullAllowed; - private ValidList(List<String> validStrings) { + private ValidList(List<String> validStrings, boolean isEmptyAllowed, boolean isNullAllowed) { this.validString = new ValidString(validStrings); + this.isEmptyAllowed = isEmptyAllowed; + this.isNullAllowed = isNullAllowed; + } + + public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed, boolean isNullAllowed) { + return new ValidList(List.of(), isEmptyAllowed, isNullAllowed); } public static ValidList in(String... validStrings) { - return new ValidList(Arrays.asList(validStrings)); + return new ValidList(List.of(validStrings), true, false); + } + + public static ValidList in(boolean isEmptyAllowed, String... validStrings) { + if (!isEmptyAllowed && validStrings.length == 0) { + throw new IllegalArgumentException("Valid strings list cannot be empty for inNonEmpty validator"); Review Comment: It seems this method was previously named `inNonEmpty`. Could you update the error message to reflect that? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java: ########## @@ -1906,16 +1906,25 @@ private int deleteSegments(List<LogSegment> deletable, SegmentDeletionReason rea /** * If topic deletion is enabled, delete any local log segments that have either expired due to time based - * retention or because the log size is > retentionSize. Whether or not deletion is enabled, delete any local - * log segments that are before the log start offset + * retention or because the log size is > retentionSize. Empty cleanup.policy is the same as delete with + * infinite retention, so we only need to delete local segments if remote storage is enabled. Whether or + * not deletion is enabled, delete any local log segments that are before the log start offset */ public int deleteOldSegments() throws IOException { if (config().delete) { Review Comment: ```java if (config().delete) return deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments(); if (config().compact) return deleteLogStartOffsetBreachedSegments(); // add documentation for this new behavior if (remoteLogEnabledAndRemoteCopyEnabled()) return deleteLogStartOffsetBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteRetentionMsBreachedSegments(); // add documentation for this new behavior return deleteLogStartOffsetBreachedSegments(); ``` ########## server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java: ########## @@ -63,7 +63,7 @@ public class SocketServerConfigs { "is assumed if no explicit mapping is provided and no other security protocol is in use."; public static final String LISTENERS_CONFIG = "listeners"; - public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092"; + public static final List<String> LISTENERS_DEFAULT = List.of("PLAINTEXT://:9092"); Review Comment: I'm not sure what the rule is for changing the default value from `String` to `List<String>`. For example, the default value type of `metric.reporters` is still `String` rather than `List<String>` ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java: ########## @@ -132,8 +132,7 @@ protected static void configureSslContextFactoryAlgorithms(SslContextFactory ssl ssl.setProtocol((String) getOrDefault(sslConfigValues, SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL)); List<String> sslCipherSuites = (List<String>) sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); - if (sslCipherSuites != null) - ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0])); + ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0])); Review Comment: ditto on line#126 ########## core/src/main/scala/kafka/utils/CoreUtils.scala: ########## @@ -121,16 +122,16 @@ object CoreUtils { def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = inLock[T](lock.writeLock)(fun) - def listenerListToEndPoints(listeners: String, securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = { + def listenerListToEndPoints(listeners: java.util.List[String], securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = { listenerListToEndPoints(listeners, securityProtocolMap, requireDistinctPorts = true) } - private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: String): Unit = { + private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: java.util.List[String]): Unit = { val distinctPorts = endpoints.map(_.port).distinct - require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: $listeners") + require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener must have a different port, listeners: ${listeners.stream().collect(Collectors.joining(","))}") Review Comment: Do we really need `listeners.stream().collect(Collectors.joining(","))`? the default implementation of `toString` should work fine in this case. ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java: ########## @@ -132,8 +132,7 @@ protected static void configureSslContextFactoryAlgorithms(SslContextFactory ssl ssl.setProtocol((String) getOrDefault(sslConfigValues, SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL)); List<String> sslCipherSuites = (List<String>) sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG); - if (sslCipherSuites != null) - ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0])); + ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0])); Review Comment: Should we align the behavior with `DefaultSslEngineFactory` to set it only if `sslCipherSuites` is not empty? ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -716,23 +716,21 @@ public void testInterceptorConstructorClose(GroupProtocol groupProtocol) { @ParameterizedTest @EnumSource(GroupProtocol.class) public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) { - final int targetInterceptor = 3; + final int targetInterceptor = 1; Review Comment: @m1a2st renaming it does not cover the test case of "remaining instances are closed", right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org