junrao commented on code in PR #20334: URL: https://github.com/apache/kafka/pull/20334#discussion_r2304758593
########## clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java: ########## @@ -502,7 +508,7 @@ public class ProducerConfig extends AbstractConfig { .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, Collections.emptyList(), - new ConfigDef.NonNullValidator(), + ConfigDef.ValidList.anyNonDuplicateValues(true, false), Review Comment: This is an existing issue. Should we change Collections.emptyList() to List.Of() above? ########## clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java: ########## @@ -615,7 +615,7 @@ public class ConsumerConfig extends AbstractConfig { .define(INTERCEPTOR_CLASSES_CONFIG, Type.LIST, Collections.emptyList(), - new ConfigDef.NonNullValidator(), + ConfigDef.ValidList.anyNonDuplicateValues(true, false), Review Comment: This is an existing issue. Should we change Collections.emptyList() to List.Of() above? ########## connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java: ########## @@ -246,15 +247,19 @@ protected static ConfigDef baseConfigDef() { Importance.LOW, CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC) .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, - JmxReporter.class.getName(), Importance.LOW, + JmxReporter.class.getName(), + ConfigDef.ValidList.anyNonDuplicateValues(true, false), + Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC) .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC) .define(HEADER_CONVERTER_VERSION, Type.STRING, HEADER_CONVERTER_VERSION_DEFAULT, Importance.LOW, HEADER_CONVERTER_VERSION_DOC) - .define(CONFIG_PROVIDERS_CONFIG, Type.LIST, + .define(CONFIG_PROVIDERS_CONFIG, + Type.LIST, Collections.emptyList(), + ConfigDef.ValidList.anyNonDuplicateValues(true, false), Review Comment: This is an existing issue. Should we change Collections.emptyList() to List.Of() above? ########## server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java: ########## @@ -36,8 +38,8 @@ public class ServerLogConfigs { public static final String LOG_DIRS_CONFIG = LOG_PREFIX + "dirs"; public static final String LOG_DIR_CONFIG = LOG_PREFIX + "dir"; - public static final String LOG_DIR_DEFAULT = "/tmp/kafka-logs"; - public static final String LOG_DIR_DOC = "The directory in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; + public static final List<String> LOG_DIR_DEFAULT = List.of("/tmp/kafka-logs"); + public static final String LOG_DIR_DOC = "The directories in which the log data is kept (supplemental for " + LOG_DIRS_CONFIG + " property)"; Review Comment: Perhaps we could say `A comma-separated list of the directories where the log data is stored. Synonym to LOG_DIRS_CONFIG` ########## core/src/test/scala/unit/kafka/KafkaConfigTest.scala: ########## @@ -90,15 +94,34 @@ class KafkaConfigTest { "requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller") properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092") - assertBadConfigContainingMessage(properties, - "No security protocol defined for listener CONTROLLER") + properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT") + KafkaConfig.fromProps(properties) + } + @Test + def testControllerListenerNamesMismatch(): Unit = { + val properties = new Properties() + properties.put(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller") + properties.put(KRaftConfigs.NODE_ID_CONFIG, 0) + properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "OTHER") + properties.put(QuorumConfig.QUORUM_BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") + properties.put(SocketServerConfigs.LISTENERS_CONFIG, "CONTROLLER://:9092") properties.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, "CONTROLLER:PLAINTEXT") + assertBadConfigContainingMessage(properties, "requirement failed: The listeners config must only contain KRaft controller listeners from controller.listener.names when process.roles=controller") + } - properties.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER") - KafkaConfig.fromProps(properties) + @Test + def testControllerSecurityProtocolMissing(): Unit = { Review Comment: testControllerSecurityProtocolMissing => testControllerSecurityProtocolMapMissing ########## connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java: ########## @@ -61,11 +61,13 @@ public abstract class HeaderFrom<R extends ConnectRecord<R>> implements Transfor public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELDS_FIELD, ConfigDef.Type.LIST, - NO_DEFAULT_VALUE, new NonEmptyListValidator(), + NO_DEFAULT_VALUE, + new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Field names in the record whose values are to be copied or moved to headers.") .define(HEADERS_FIELD, ConfigDef.Type.LIST, - NO_DEFAULT_VALUE, new NonEmptyListValidator(), + NO_DEFAULT_VALUE, + new NonEmptyListValidator(), Review Comment: Changes in this file are not needed. ########## clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java: ########## @@ -713,32 +713,6 @@ public void testInterceptorConstructorClose(GroupProtocol groupProtocol) { } } - @ParameterizedTest - @EnumSource(GroupProtocol.class) - public void testInterceptorConstructorConfigurationWithExceptionShouldCloseRemainingInstances(GroupProtocol groupProtocol) { Review Comment: Why is this test removed? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -144,8 +144,8 @@ public Optional<String> serverConfigName(String configName) { public static final ConfigDef SERVER_CONFIG_DEF = new ConfigDef() .define(ServerLogConfigs.NUM_PARTITIONS_CONFIG, INT, ServerLogConfigs.NUM_PARTITIONS_DEFAULT, atLeast(1), MEDIUM, ServerLogConfigs.NUM_PARTITIONS_DOC) - .define(ServerLogConfigs.LOG_DIR_CONFIG, STRING, ServerLogConfigs.LOG_DIR_DEFAULT, HIGH, ServerLogConfigs.LOG_DIR_DOC) - .define(ServerLogConfigs.LOG_DIRS_CONFIG, STRING, null, HIGH, ServerLogConfigs.LOG_DIRS_DOC) + .define(ServerLogConfigs.LOG_DIR_CONFIG, LIST, ServerLogConfigs.LOG_DIR_DEFAULT, ConfigDef.ValidList.anyNonDuplicateValues(false, true), HIGH, ServerLogConfigs.LOG_DIR_DOC) Review Comment: This shouldn't be nullable, right? ########## server/src/main/java/org/apache/kafka/server/config/AbstractKafkaConfig.java: ########## @@ -137,7 +144,8 @@ public Map<ListenerName, SecurityProtocol> effectiveListenerSecurityProtocolMap( // 2. No SSL or SASL protocols are used in regular listeners (Note: controller listeners // are not included in 'listeners' config when process.roles=broker) if (controllerListenerNames().stream().anyMatch(AbstractKafkaConfig::isSslOrSasl) || - Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).stream() + getList(SocketServerConfigs.LISTENERS_CONFIG).stream() + .map(String::trim) Review Comment: Do we need to call `trim()` here? ########## server/src/main/java/org/apache/kafka/server/config/KRaftConfigs.java: ########## @@ -70,12 +70,12 @@ public class KRaftConfigs { public static final String CONTROLLER_PERFORMANCE_ALWAYS_LOG_THRESHOLD_MS_DOC = "We will log an error message about controller events that take longer than this threshold."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in("broker", "controller"), HIGH, PROCESS_ROLES_DOC) + .define(PROCESS_ROLES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.in(false, "broker", "controller"), HIGH, PROCESS_ROLES_DOC) .define(NODE_ID_CONFIG, INT, ConfigDef.NO_DEFAULT_VALUE, atLeast(0), HIGH, NODE_ID_DOC) .define(INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_CONFIG, INT, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DEFAULT, null, MEDIUM, INITIAL_BROKER_REGISTRATION_TIMEOUT_MS_DOC) .define(BROKER_HEARTBEAT_INTERVAL_MS_CONFIG, INT, BROKER_HEARTBEAT_INTERVAL_MS_DEFAULT, null, MEDIUM, BROKER_HEARTBEAT_INTERVAL_MS_DOC) .define(BROKER_SESSION_TIMEOUT_MS_CONFIG, INT, BROKER_SESSION_TIMEOUT_MS_DEFAULT, null, MEDIUM, BROKER_SESSION_TIMEOUT_MS_DOC) - .define(CONTROLLER_LISTENER_NAMES_CONFIG, STRING, null, null, HIGH, CONTROLLER_LISTENER_NAMES_DOC) + .define(CONTROLLER_LISTENER_NAMES_CONFIG, LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.ValidList.anyNonDuplicateValues(false, false), HIGH, CONTROLLER_LISTENER_NAMES_DOC) Review Comment: This is an existing issue. Under the broker config doc in the website, we need to include it as the essential config. ########## core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala: ########## @@ -3187,6 +3187,55 @@ class UnifiedLogTest { assertEquals(segments, log.numberOfSegments, "There should be 3 segments remaining") } + @Test + def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithSizeRetention(): Unit = { + def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) + val recordSize = createRecords.sizeInBytes + val logConfig = LogTestUtils.createLogConfig( + segmentBytes = recordSize * 2, + localRetentionBytes = recordSize / 2, + cleanupPolicy = "", + remoteLogStorageEnable = true + ) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + + for (_ <- 0 until 10) + log.appendAsLeader(createRecords, 0) + + val segmentsBefore = log.numberOfSegments + log.updateHighWatermark(log.logEndOffset) + log.updateHighestOffsetInRemoteStorage(log.logEndOffset - 1) + val deleteOldSegments = log.deleteOldSegments() + + assertTrue(log.numberOfSegments < segmentsBefore, "Some segments should be deleted due to size retention") + assertTrue(deleteOldSegments > 0, "At least one segment should be deleted") + } + + @Test + def shouldDeleteLocalLogSegmentsWhenPolicyIsEmptyWithMsRetention(): Unit = { + def createRecords = TestUtils.singletonRecords("test".getBytes, key = "test".getBytes(), timestamp = 10L) + val recordSize = createRecords.sizeInBytes + val logConfig = LogTestUtils.createLogConfig( + segmentBytes = recordSize * 2, + localRetentionMs = 10000, + cleanupPolicy = "", + remoteLogStorageEnable = true + ) + val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true) + + for (_ <- 0 until 10) + log.appendAsLeader(createRecords, 0) + + // mark the oldest segment as older the retention.ms + log.logSegments.asScala.head.setLastModified(mockTime.milliseconds - 20000) Review Comment: Hmm, why do we need this? The time based retention should be based on the timestamp in the records, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org