chia7712 commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2311409416


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServerConfig.java:
##########
@@ -139,12 +139,12 @@ public abstract class RestServerConfig extends 
AbstractConfig {
     public static void addPublicConfig(ConfigDef configDef) {
         addInternalConfig(configDef);
         configDef
-                .define(
-                        REST_EXTENSION_CLASSES_CONFIG,
+                .define(REST_EXTENSION_CLASSES_CONFIG,
                         ConfigDef.Type.LIST,
-                        "",
-                        ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC
-                ).define(ADMIN_LISTENERS_CONFIG,
+                        List.of(),
+                        ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+                        ConfigDef.Importance.LOW, REST_EXTENSION_CLASSES_DOC)
+                .define(ADMIN_LISTENERS_CONFIG,
                         ConfigDef.Type.LIST,
                         null,
                         new AdminListenersValidator(),

Review Comment:
   it seems `AdminListenersValidator` could be replaced by 
`ConfigDef.ValidList.anyNonDuplicateValues(true, true)`, right?



##########
clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java:
##########
@@ -1006,26 +1006,59 @@ else if (max == null)
     public static class ValidList implements Validator {
 
         final ValidString validString;
+        final boolean isEmptyAllowed;
+        final boolean isNullAllowed;
 
-        private ValidList(List<String> validStrings) {
+        private ValidList(List<String> validStrings, boolean isEmptyAllowed, 
boolean isNullAllowed) {
             this.validString = new ValidString(validStrings);
+            this.isEmptyAllowed = isEmptyAllowed;
+            this.isNullAllowed = isNullAllowed;
+        }
+
+        public static ValidList anyNonDuplicateValues(boolean isEmptyAllowed, 
boolean isNullAllowed) {
+            return new ValidList(List.of(), isEmptyAllowed, isNullAllowed);
         }
 
         public static ValidList in(String... validStrings) {
-            return new ValidList(Arrays.asList(validStrings));
+            return new ValidList(List.of(validStrings), true, false);
+        }
+
+        public static ValidList in(boolean isEmptyAllowed, String... 
validStrings) {
+            if (!isEmptyAllowed && validStrings.length == 0) {
+                throw new IllegalArgumentException("Valid strings list cannot 
be empty for inNonEmpty validator");

Review Comment:
   It seems this method was previously named `inNonEmpty`. Could you update the 
error message to reflect that?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java:
##########
@@ -1906,16 +1906,25 @@ private int deleteSegments(List<LogSegment> deletable, 
SegmentDeletionReason rea
 
     /**
      * If topic deletion is enabled, delete any local log segments that have 
either expired due to time based
-     * retention or because the log size is > retentionSize. Whether or not 
deletion is enabled, delete any local
-     * log segments that are before the log start offset
+     * retention or because the log size is > retentionSize. Empty 
cleanup.policy is the same as delete with 
+     * infinite retention, so we only need to delete local segments if remote 
storage is enabled. Whether or 
+     * not deletion is enabled, delete any local log segments that are before 
the log start offset
      */
     public int deleteOldSegments() throws IOException {
         if (config().delete) {

Review Comment:
   ```java
           if (config().delete)
               return deleteLogStartOffsetBreachedSegments() +
                       deleteRetentionSizeBreachedSegments() +
                       deleteRetentionMsBreachedSegments();
           
           if (config().compact) return deleteLogStartOffsetBreachedSegments();
           
           // add documentation for this new behavior
           if (remoteLogEnabledAndRemoteCopyEnabled())
               return deleteLogStartOffsetBreachedSegments() +
                       deleteRetentionSizeBreachedSegments() +
                       deleteRetentionMsBreachedSegments();
   
           // add documentation for this new behavior
           return deleteLogStartOffsetBreachedSegments();
   ```



##########
server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java:
##########
@@ -63,7 +63,7 @@ public class SocketServerConfigs {
             "is assumed if no explicit mapping is provided and no other 
security protocol is in use.";
 
     public static final String LISTENERS_CONFIG = "listeners";
-    public static final String LISTENERS_DEFAULT = "PLAINTEXT://:9092";
+    public static final List<String> LISTENERS_DEFAULT = 
List.of("PLAINTEXT://:9092");

Review Comment:
   I'm not sure what the rule is for changing the default value from `String` 
to `List<String>`. For example, the default value type of `metric.reporters` is 
still `String` rather than `List<String>`



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java:
##########
@@ -132,8 +132,7 @@ protected static void 
configureSslContextFactoryAlgorithms(SslContextFactory ssl
         ssl.setProtocol((String) getOrDefault(sslConfigValues, 
SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
 
         List<String> sslCipherSuites = (List<String>) 
sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
-        if (sslCipherSuites != null)
-            ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
+        ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));

Review Comment:
   ditto on line#126



##########
core/src/main/scala/kafka/utils/CoreUtils.scala:
##########
@@ -121,16 +122,16 @@ object CoreUtils {
 
   def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.writeLock)(fun)
 
-  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
java.util.Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
+  def listenerListToEndPoints(listeners: java.util.List[String], 
securityProtocolMap: java.util.Map[ListenerName, SecurityProtocol]): 
Seq[Endpoint] = {
     listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
   }
 
-  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
String): Unit = {
+  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
java.util.List[String]): Unit = {
     val distinctPorts = endpoints.map(_.port).distinct
-    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
+    require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: 
${listeners.stream().collect(Collectors.joining(","))}")

Review Comment:
   Do we really need `listeners.stream().collect(Collectors.joining(","))`? the 
default implementation of `toString` should work fine in this case.



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/util/SSLUtils.java:
##########
@@ -132,8 +132,7 @@ protected static void 
configureSslContextFactoryAlgorithms(SslContextFactory ssl
         ssl.setProtocol((String) getOrDefault(sslConfigValues, 
SslConfigs.SSL_PROTOCOL_CONFIG, SslConfigs.DEFAULT_SSL_PROTOCOL));
 
         List<String> sslCipherSuites = (List<String>) 
sslConfigValues.get(SslConfigs.SSL_CIPHER_SUITES_CONFIG);
-        if (sslCipherSuites != null)
-            ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));
+        ssl.setIncludeCipherSuites(sslCipherSuites.toArray(new String[0]));

Review Comment:
   Should we align the behavior with `DefaultSslEngineFactory` to set it only 
if  `sslCipherSuites` is not empty?



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -716,23 +716,21 @@ public void testInterceptorConstructorClose(GroupProtocol 
groupProtocol) {
     @ParameterizedTest
     @EnumSource(GroupProtocol.class)
     public void 
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
 groupProtocol) {
-        final int targetInterceptor = 3;
+        final int targetInterceptor = 1;

Review Comment:
   @m1a2st renaming it does not cover the test case of "remaining instances are 
closed", right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to