divijvaidya commented on code in PR #18140: URL: https://github.com/apache/kafka/pull/18140#discussion_r1880421924
########## core/src/test/java/kafka/admin/DeleteTopicTest.java: ########## @@ -239,7 +239,7 @@ public void testDeleteNonExistingTopic(ClusterInstance cluster) throws Exception @ClusterTest(serverProperties = { @ClusterConfigProperty(key = "log.cleaner.enable", value = "true"), @ClusterConfigProperty(key = "log.cleanup.policy", value = "compact"), - @ClusterConfigProperty(key = "log.segment.bytes", value = "100"), + @ClusterConfigProperty(key = "log.segment.bytes", value = "1048576"), Review Comment: does this test really need this config? From what it looks like it does the following: 1. create a topic 2. Write 100 X 3 entries where 100 entries are duplicated 3. Waits for cleaning to complete until 0 offset <-- this looks wrong 4. Deletes the topic 5. Verifies topic deletion Why do we need a segment roll here? (unless I am missing something) ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -625,7 +626,7 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup "Config not updated in LogManager") val log = servers.head.logManager.getLog(new TopicPartition(topic, 0)).getOrElse(throw new IllegalStateException("Log not found")) - TestUtils.waitUntilTrue(() => log.config.segmentSize == 4000, "Existing topic config using defaults not updated") + TestUtils.waitUntilTrue(() => log.config.segmentSize == 1048576, "Existing topic config using defaults not updated") Review Comment: You will need to change validation in line 641 as well to retain the original intention of this test `log.logSegments.asScala.exists(_.size > 3000)` ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -610,11 +610,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, "1000") props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, "1000") props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "false") - reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")) + reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")) // Verify that all broker defaults have been updated servers.foreach { server => props.forEach { (k, v) => + TestUtils.waitUntilTrue(() => server.config.originals.get(k) != null, "Configs not present") Review Comment: Thank you for adding this. ########## core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala: ########## @@ -610,11 +610,12 @@ class DynamicBrokerReconfigurationTest extends QuorumTestHarness with SaslSetup props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_BEFORE_MAX_MS_CONFIG, "1000") props.put(ServerLogConfigs.LOG_MESSAGE_TIMESTAMP_AFTER_MAX_MS_CONFIG, "1000") props.put(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, "false") - reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "4000")) + reconfigureServers(props, perBrokerConfig = false, (ServerLogConfigs.LOG_SEGMENT_BYTES_CONFIG, "1048576")) Review Comment: This test doesn't really care about the new value that we are reconfiguring it to as long it is a valid value. It just tests if the reconfiguration is successful. Could we set it to something like 2GB? It will ensure that if we change the constraint again in future, we wouldn't have to modify this test. ########## core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala: ########## @@ -1188,7 +1188,7 @@ class KafkaConfigTest { case TopicConfig.COMPRESSION_ZSTD_LEVEL_CONFIG => assertDynamic(kafkaConfigProp, "5", () => config.zstdCompressionLevel) case TopicConfig.SEGMENT_BYTES_CONFIG => - assertDynamic(kafkaConfigProp, 10000, () => config.logSegmentBytes) + assertDynamic(kafkaConfigProp, 1048576, () => config.logSegmentBytes) Review Comment: Doesn't have to be the min possible value. A very large value like 2GB will ensure that we don't have to change it in future ########## core/src/test/scala/unit/kafka/server/LogOffsetTest.scala: ########## @@ -224,14 +224,14 @@ class LogOffsetTest extends BaseRequestTest { val now = Time.SYSTEM.milliseconds + 30000 // pretend it is the future to avoid race conditions with the fs val offsets = log.legacyFetchOffsetsBefore(now, 15) - assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), offsets) + assertEquals(Seq(20L, 0L), offsets) TestUtils.waitUntilTrue(() => isLeaderLocalOnBroker(topic, topicPartition.partition, broker), "Leader should be elected") val request = ListOffsetsRequest.Builder.forReplica(0, 0) .setTargetTimes(buildTargetTimes(topicPartition, now, 15).asJava).build() val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala - assertEquals(Seq(20L, 18L, 16L, 14L, 12L, 10L, 8L, 6L, 4L, 2L, 0L), consumerOffsets) Review Comment: can you please help me understanding this change? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org