chia7712 commented on code in PR #19371: URL: https://github.com/apache/kafka/pull/19371#discussion_r2105869770
########## storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java: ########## @@ -198,7 +197,7 @@ public void testUpdateConfig() { assertEquals(oldConfig, log.config()); Properties props = new Properties(); - props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1); + props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, oldConfig.segmentSize() + 1); Review Comment: it can keep using `TopicConfig.SEGMENT_BYTES_CONFIG`, right? ########## core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala: ########## @@ -95,29 +95,29 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness { @Test def testDynamicTopicConfigChange(): Unit = { val tp = new TopicPartition("test", 0) - val oldSegmentSize = 1000 + val oldSegmentSize = 2 * 1024 * 1024 val logProps = new Properties() logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldSegmentSize.toString) createTopic(tp.topic, 1, 1, logProps) TestUtils.retry(10000) { val logOpt = this.brokers.head.logManager.getLog(tp) assertTrue(logOpt.isDefined) - assertEquals(oldSegmentSize, logOpt.get.config.segmentSize) + assertEquals(oldSegmentSize, logOpt.get.config.segmentSize()) } val newSegmentSize = 2000 val admin = createAdminClient() try { val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic()) - val op = new AlterConfigOp(new ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString), + val op = new AlterConfigOp(new ConfigEntry(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, newSegmentSize.toString), Review Comment: this test is used to verify the alter, so you can use larger `newSegmentSize` to keep using `TopicConfig.SEGMENT_BYTES_CONFIG` ########## streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java: ########## @@ -248,7 +247,7 @@ public void consumerConfigMustContainStreamPartitionAssignorConfig() { props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L); props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 7L); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host"); - props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 100); + props.put(StreamsConfig.topicPrefix("internal.segment.bytes"), 100); Review Comment: this is used to test the updated configs get returned, so we can increase the configured value instead of using internal config. ########## core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala: ########## @@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest { } dir } + + private def createMetadataLogConfig( + internalLogSegmentBytes: Int, + logSegmentMillis: Long, + retentionMaxBytes: Long, + retentionMillis: Long, + internalMaxBatchSizeInBytes: Int, + internalMaxFetchSizeInBytes: Int, + internalDeleteDelayMillis: Long + ): MetadataLogConfig = { + val config: util.Map[String, Any] = util.Map.of( Review Comment: `val config = util.Map.of(` ########## core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala: ########## @@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest { } dir } + + private def createMetadataLogConfig( + internalLogSegmentBytes: Int, + logSegmentMillis: Long, + retentionMaxBytes: Long, + retentionMillis: Long, + internalMaxBatchSizeInBytes: Int, Review Comment: Could you please add default value to `internalMaxBatchSizeInBytes`, `internalMaxFetchSizeInBytes`, and `internalDeleteDelayMillis` to streamline the callers? ########## metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java: ########## @@ -166,6 +166,10 @@ public Map<String, ConfigEntry> resolveEffectiveTopicConfigs( ConfigDef configDef = configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF); HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>(); for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) { + // This config is internal; if the user hasn't set it explicitly, it should not be returned. + if (configKey.internalConfig && !dynamicTopicConfigs.containsKey(configKey.name)) { Review Comment: Have you added test for this change? ########## storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java: ########## @@ -446,6 +447,14 @@ public static List<String> configNames() { return CONFIG.names().stream().sorted().toList(); } + public static List<String> nonInternalConfigNames() { + return CONFIG.configKeys().entrySet() + .stream() + .filter(entry -> !entry.getValue().internalConfig) + .map(Map.Entry::getKey) + .sorted().toList(); + } + public static Optional<String> serverConfigName(String configName) { Review Comment: it is unused now. please remove it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org