chia7712 commented on code in PR #19371:
URL: https://github.com/apache/kafka/pull/19371#discussion_r2105869770


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java:
##########
@@ -198,7 +197,7 @@ public void testUpdateConfig() {
         assertEquals(oldConfig, log.config());
 
         Properties props = new Properties();
-        props.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldConfig.segmentSize + 1);
+        props.put(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, 
oldConfig.segmentSize() + 1);

Review Comment:
   it can keep using `TopicConfig.SEGMENT_BYTES_CONFIG`, right?



##########
core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala:
##########
@@ -95,29 +95,29 @@ class DynamicConfigChangeTest extends 
KafkaServerTestHarness {
   @Test
   def testDynamicTopicConfigChange(): Unit = {
     val tp = new TopicPartition("test", 0)
-    val oldSegmentSize = 1000
+    val oldSegmentSize = 2 * 1024 * 1024
     val logProps = new Properties()
     logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, oldSegmentSize.toString)
     createTopic(tp.topic, 1, 1, logProps)
     TestUtils.retry(10000) {
       val logOpt = this.brokers.head.logManager.getLog(tp)
       assertTrue(logOpt.isDefined)
-      assertEquals(oldSegmentSize, logOpt.get.config.segmentSize)
+      assertEquals(oldSegmentSize, logOpt.get.config.segmentSize())
     }
 
     val newSegmentSize = 2000
     val admin = createAdminClient()
     try {
       val resource = new ConfigResource(ConfigResource.Type.TOPIC, tp.topic())
-      val op = new AlterConfigOp(new 
ConfigEntry(TopicConfig.SEGMENT_BYTES_CONFIG, newSegmentSize.toString),
+      val op = new AlterConfigOp(new 
ConfigEntry(LogConfig.INTERNAL_SEGMENT_BYTES_CONFIG, newSegmentSize.toString),

Review Comment:
   this test is used to verify the alter, so you can use larger 
`newSegmentSize` to keep using `TopicConfig.SEGMENT_BYTES_CONFIG`



##########
streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java:
##########
@@ -248,7 +247,7 @@ public void 
consumerConfigMustContainStreamPartitionAssignorConfig() {
         props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 99_999L);
         
props.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG, 
7L);
         props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "dummy:host");
-        props.put(StreamsConfig.topicPrefix(TopicConfig.SEGMENT_BYTES_CONFIG), 
100);
+        props.put(StreamsConfig.topicPrefix("internal.segment.bytes"), 100);

Review Comment:
   this is used to test the updated configs get returned, so we can increase 
the configured value instead of using internal config.



##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest {
     }
     dir
   }
+  
+  private def createMetadataLogConfig(
+    internalLogSegmentBytes: Int,
+    logSegmentMillis: Long,
+    retentionMaxBytes: Long,
+    retentionMillis: Long,
+    internalMaxBatchSizeInBytes: Int,
+    internalMaxFetchSizeInBytes: Int,
+    internalDeleteDelayMillis: Long
+  ): MetadataLogConfig = {
+    val config: util.Map[String, Any] = util.Map.of(

Review Comment:
   `val config = util.Map.of(`



##########
core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala:
##########
@@ -1166,4 +1161,25 @@ object KafkaMetadataLogTest {
     }
     dir
   }
+  
+  private def createMetadataLogConfig(
+    internalLogSegmentBytes: Int,
+    logSegmentMillis: Long,
+    retentionMaxBytes: Long,
+    retentionMillis: Long,
+    internalMaxBatchSizeInBytes: Int,

Review Comment:
   Could you please add default value to `internalMaxBatchSizeInBytes`, 
`internalMaxFetchSizeInBytes`, and `internalDeleteDelayMillis` to streamline 
the callers?



##########
metadata/src/main/java/org/apache/kafka/metadata/KafkaConfigSchema.java:
##########
@@ -166,6 +166,10 @@ public Map<String, ConfigEntry> 
resolveEffectiveTopicConfigs(
         ConfigDef configDef = 
configDefs.getOrDefault(ConfigResource.Type.TOPIC, EMPTY_CONFIG_DEF);
         HashMap<String, ConfigEntry> effectiveConfigs = new HashMap<>();
         for (ConfigDef.ConfigKey configKey : configDef.configKeys().values()) {
+            // This config is internal; if the user hasn't set it explicitly, 
it should not be returned.
+            if (configKey.internalConfig && 
!dynamicTopicConfigs.containsKey(configKey.name)) {

Review Comment:
   Have you added test for this change?



##########
storage/src/main/java/org/apache/kafka/storage/internals/log/LogConfig.java:
##########
@@ -446,6 +447,14 @@ public static List<String> configNames() {
         return CONFIG.names().stream().sorted().toList();
     }
 
+    public static List<String> nonInternalConfigNames() {
+        return CONFIG.configKeys().entrySet()
+                .stream()
+                .filter(entry -> !entry.getValue().internalConfig)
+                .map(Map.Entry::getKey)
+                .sorted().toList();
+    }
+
     public static Optional<String> serverConfigName(String configName) {

Review Comment:
   it is unused now. please remove it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to