junrao commented on code in PR #20334:
URL: https://github.com/apache/kafka/pull/20334#discussion_r2304758593


##########
clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java:
##########
@@ -502,7 +508,7 @@ public class ProducerConfig extends AbstractConfig {
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,
                                         Collections.emptyList(),
-                                        new ConfigDef.NonNullValidator(),
+                                        
ConfigDef.ValidList.anyNonDuplicateValues(true, false),

Review Comment:
   This is an existing issue. Should we change Collections.emptyList() to 
List.Of() above?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java:
##########
@@ -615,7 +615,7 @@ public class ConsumerConfig extends AbstractConfig {
                                 .define(INTERCEPTOR_CLASSES_CONFIG,
                                         Type.LIST,
                                         Collections.emptyList(),
-                                        new ConfigDef.NonNullValidator(),
+                                        
ConfigDef.ValidList.anyNonDuplicateValues(true, false),

Review Comment:
   This is an existing issue. Should we change Collections.emptyList() to 
List.Of() above?



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java:
##########
@@ -246,15 +247,19 @@ protected static ConfigDef baseConfigDef() {
                         Importance.LOW,
                         CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
-                        JmxReporter.class.getName(), Importance.LOW,
+                        JmxReporter.class.getName(),
+                        ConfigDef.ValidList.anyNonDuplicateValues(true, false),
+                        Importance.LOW,
                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                 .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         HEADER_CONVERTER_CLASS_DEFAULT,
                         Importance.LOW, HEADER_CONVERTER_CLASS_DOC)
                 .define(HEADER_CONVERTER_VERSION, Type.STRING,
                         HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, 
HEADER_CONVERTER_VERSION_DOC)
-                .define(CONFIG_PROVIDERS_CONFIG, Type.LIST,
+                .define(CONFIG_PROVIDERS_CONFIG,
+                        Type.LIST,
                         Collections.emptyList(),
+                        ConfigDef.ValidList.anyNonDuplicateValues(true, false),

Review Comment:
   This is an existing issue. Should we change Collections.emptyList() to 
List.Of() above?



##########
server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java:
##########
@@ -36,8 +38,8 @@ public class ServerLogConfigs {
 
     public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs";
     public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir";
-    public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs";
-    public static final String LOG_DIR_DOC = "The directory in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";
+    public static final List<String> LOG_DIR_DEFAULT = 
List.of("/tmp/kafka-logs");
+    public static final String LOG_DIR_DOC = "The directories in which the log 
data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)";

Review Comment:
   Perhaps we could say
   
   `A comma-separated list of the directories where the log data is stored. 
Synonym to LOG_DIRS_CONFIG`



##########
core/src/test/scala/unit/kafka/KafkaConfigTest.scala:
##########
@@ -90,15 +94,34 @@ class KafkaConfigTest {
       "requirement failed: The listeners config must only contain KRaft 
controller listeners from controller.listener.names when 
process.roles=controller")
 
     properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
-    assertBadConfigContainingMessage(properties,
-      "No security protocol defined for listener CONTROLLER")
+    properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT")
+    KafkaConfig.fromProps(properties)
+  }
 
+  @Test
+  def testControllerListenerNamesMismatch(): Unit = {
+    val properties = new Properties()
+    properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
+    properties.put(KRaftConfigs.NODE_ID_CONFIG, 0)
+    properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "OTHER")
+    properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9092")
+    properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092")
     properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"CONTROLLER:PLAINTEXT")
+
     assertBadConfigContainingMessage(properties,
       "requirement failed: The listeners config must only contain KRaft 
controller listeners from controller.listener.names when 
process.roles=controller")
+  }
 
-    properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
-    KafkaConfig.fromProps(properties)
+  @Test
+  def testControllerSecurityProtocolMissing(): Unit = {

Review Comment:
   testControllerSecurityProtocolMissing => 
testControllerSecurityProtocolMapMissing



##########
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java:
##########
@@ -61,11 +61,13 @@ public abstract class HeaderFrom<R extends 
ConnectRecord<R>> implements Transfor
 
     public static final ConfigDef CONFIG_DEF = new ConfigDef()
             .define(FIELDS_FIELD, ConfigDef.Type.LIST,
-                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    NO_DEFAULT_VALUE,
+                    new NonEmptyListValidator(),
                     ConfigDef.Importance.HIGH,
                     "Field names in the record whose values are to be copied 
or moved to headers.")
             .define(HEADERS_FIELD, ConfigDef.Type.LIST,
-                    NO_DEFAULT_VALUE, new NonEmptyListValidator(),
+                    NO_DEFAULT_VALUE,
+                    new NonEmptyListValidator(),

Review Comment:
   Changes in this file are not needed.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java:
##########
@@ -713,32 +713,6 @@ public void testInterceptorConstructorClose(GroupProtocol 
groupProtocol) {
         }
     }
 
-    @ParameterizedTest
-    @EnumSource(GroupProtocol.class)
-    public void 
testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol
 groupProtocol) {

Review Comment:
   Why is this test removed?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -144,8 +144,8 @@ public Optional<String> serverConfigName(String configName) 
{
 
     public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef()
             .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, 
ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, 
ServerLogConfigs.NUM_PARTITIONS_DOC)
-            .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, 
ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC)
-            .define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, 
ServerLogConfigs.LOG_DIRS_DOC)
+            .define(ServerLogConfigs.LOG_DIR_CONFIG, LIST, 
ServerLogConfigs.LOG_DIR_DEFAULT, 
ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH, 
ServerLogConfigs.LOG_DIR_DOC)

Review Comment:
   This shouldn't be nullable, right?



##########
server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java:
##########
@@ -137,7 +144,8 @@ public Map<ListenerName, SecurityProtocol> 
effectiveListenerSecurityProtocolMap(
             // 2. No SSL or SASL protocols are used in regular listeners 
(Note: controller listeners
             //    are not included in 'listeners' config when 
process.roles=broker)
             if 
(controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl) 
||
-                    
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream()
+                    getList(SocketServerConfigs.LISTENERS_CONFIG).stream()
+                            .map(String::trim)

Review Comment:
   Do we need to call `trim()` here?



##########
server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java:
##########
@@ -70,12 +70,12 @@ public class KRaftConfigs {
     public static final String 
CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error 
message about controller events that take longer than this threshold.";
 
     public static final ConfigDef CONFIG_DEF =  new ConfigDef()
-            .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, 
ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC)
+            .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, 
ConfigDef.ValidList.in(false, "broker", "controller"), HIGH, PROCESS_ROLES_DOC)
             .define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, 
atLeast(0), HIGH, NODE_ID_DOC)
             .define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, 
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, 
INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC)
             .define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, 
BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, 
BROKER_HEARTBEAT_INTERVAL_MS_DOC)
             .define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, 
BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC)
-            .define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, 
HIGH, CONTROLLER_LISTENER_NAMES_DOC)
+            .define(CONTROLLER_LISTENER_NAMES_CONFIG, LIST, 
ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.anyNonDuplicateValues(false, 
false), HIGH, CONTROLLER_LISTENER_NAMES_DOC)

Review Comment:
   This is an existing issue. Under the broker config doc in the website, we 
need to include it as the essential config.



##########
core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala:
##########
@@ -3187,6 +3187,55 @@ class UnifiedLogTest {
     assertEquals(segments, log.numberOfSegments, "There should be 3 segments 
remaining")
   }
 
+  @Test
+  def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit = 
{
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = 10L)
+    val recordSize = createRecords.sizeInBytes
+    val logConfig = LogTestUtils.createLogConfig(
+      segmentBytes = recordSize * 2,
+      localRetentionBytes = recordSize / 2,
+      cleanupPolicy = "",
+      remoteLogStorageEnable = true
+    )
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    for (_ <- 0 until 10)
+      log.appendAsLeader(createRecords, 0)
+
+    val segmentsBefore = log.numberOfSegments
+    log.updateHighWatermark(log.logEndOffset)
+    log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1)
+    val deleteOldSegments = log.deleteOldSegments()
+
+    assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be 
deleted due to size retention")
+    assertTrue(deleteOldSegments > 0, "At least one segment should be deleted")
+  }
+
+  @Test
+  def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = {
+    def createRecords = TestUtils.singletonRecords("test".getBytes, key = 
"test".getBytes(), timestamp = 10L)
+    val recordSize = createRecords.sizeInBytes
+    val logConfig = LogTestUtils.createLogConfig(
+      segmentBytes = recordSize * 2,
+      localRetentionMs = 10000,
+      cleanupPolicy = "",
+      remoteLogStorageEnable = true
+    )
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+
+    for (_ <- 0 until 10)
+      log.appendAsLeader(createRecords, 0)
+
+    // mark the oldest segment as older the retention.ms
+    log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000)

Review Comment:
   Hmm, why do we need this? The time based retention should be based on the 
timestamp in the records, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to