kamalcph commented on code in PR #20811:
URL: https://github.com/apache/kafka/pull/20811#discussion_r2648864098
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+
+ TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build();
+
+ try {
+ verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
+ } finally {
+ customRlmm.close();
+ }
Review Comment:
```suggestion
try (TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
.bootstrapServers(clusterInstance.bootstrapServers())
.overrideRemoteLogMetadataManagerProps(overrideProps)
.build()) {
verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
}
```
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+
+ TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build();
+
+ try {
+ verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
+ } finally {
+ customRlmm.close();
+ }
+ }
+
+ private void
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+ short expectedMinIsr,
+ String valueDescription)
+ throws ExecutionException, InterruptedException {
+ try (Admin admin = clusterInstance.admin()) {
+ String metadataTopic =
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+ // Wait for the topic to be created
+ // RemoteLogMetadataManagerTestUtils uses 3 partitions for the
metadata topic
+ clusterInstance.waitTopicCreation(metadataTopic, 3);
+
+ // Verify the topic exists
+ assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+
+ // Describe the topic configs to verify min.insync.replicas
+ ConfigResource topicResource = new
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+ DescribeConfigsResult describeResult =
admin.describeConfigs(List.of(topicResource));
+ Config config = describeResult.all().get().get(topicResource);
+
+ assertTrue(config != null, "Topic config should not be null");
Review Comment:
Please replace assertTrue -> assertNotNull
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+
+ TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build();
+
+ try {
+ verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
+ } finally {
+ customRlmm.close();
+ }
+ }
+
+ private void
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+ short expectedMinIsr,
+ String valueDescription)
Review Comment:
nit: align the parameter indentation.
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
Review Comment:
```suggestion
verifyRemoteLogMetadataTopicMinIsr(remoteLogMetadataManager, (short)
2, "default value");
```
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+
+ TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build();
+
+ try {
+ verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
+ } finally {
+ customRlmm.close();
+ }
+ }
+
+ private void
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+ short expectedMinIsr,
+ String valueDescription)
+ throws ExecutionException, InterruptedException {
+ try (Admin admin = clusterInstance.admin()) {
+ String metadataTopic =
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+ // Wait for the topic to be created
+ // RemoteLogMetadataManagerTestUtils uses 3 partitions for the
metadata topic
+ clusterInstance.waitTopicCreation(metadataTopic, 3);
Review Comment:
```suggestion
clusterInstance.waitTopicCreation(metadataTopic,
RemoteLogMetadataManagerTestUtils.METADATA_TOPIC_PARTITIONS_COUNT);
```
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java:
##########
@@ -370,4 +375,58 @@ public void testInitializationFailure() throws
IOException, InterruptedException
Exit.resetExitProcedure();
}
}
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsr() throws ExecutionException,
InterruptedException {
+ // Initialize the manager which will create the __remote_log_metadata
topic
+ TopicBasedRemoteLogMetadataManager rlmm = topicBasedRlmm();
+ verifyRemoteLogMetadataTopicMinIsr(rlmm, (short) 2, "default value");
+ }
+
+ @ClusterTest
+ public void testRemoteLogMetadataTopicMinIsrWithCustomValue() throws
ExecutionException, InterruptedException, IOException {
+ // Create a manager with custom min.isr value
+ short customMinIsr = 3;
+ Map<String, Object> overrideProps = Map.of(
+
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_MIN_ISR_PROP,
customMinIsr
+ );
+
+ TopicBasedRemoteLogMetadataManager customRlmm =
RemoteLogMetadataManagerTestUtils.builder()
+ .bootstrapServers(clusterInstance.bootstrapServers())
+ .overrideRemoteLogMetadataManagerProps(overrideProps)
+ .build();
+
+ try {
+ verifyRemoteLogMetadataTopicMinIsr(customRlmm, customMinIsr,
"custom value");
+ } finally {
+ customRlmm.close();
+ }
+ }
+
+ private void
verifyRemoteLogMetadataTopicMinIsr(TopicBasedRemoteLogMetadataManager rlmm,
+ short expectedMinIsr,
+ String valueDescription)
+ throws ExecutionException, InterruptedException {
+ try (Admin admin = clusterInstance.admin()) {
+ String metadataTopic =
TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME;
+
+ // Wait for the topic to be created
+ // RemoteLogMetadataManagerTestUtils uses 3 partitions for the
metadata topic
+ clusterInstance.waitTopicCreation(metadataTopic, 3);
+
+ // Verify the topic exists
+ assertTrue(rlmm.doesTopicExist(admin, metadataTopic));
+
+ // Describe the topic configs to verify min.insync.replicas
+ ConfigResource topicResource = new
ConfigResource(ConfigResource.Type.TOPIC, metadataTopic);
+ DescribeConfigsResult describeResult =
admin.describeConfigs(List.of(topicResource));
+ Config config = describeResult.all().get().get(topicResource);
+
+ assertTrue(config != null, "Topic config should not be null");
+ ConfigEntry minIsrEntry =
config.get(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG);
+ assertTrue(minIsrEntry != null, "min.insync.replicas config should
exist");
Review Comment:
Please replace assertTrue -> assertNotNull
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java:
##########
@@ -192,4 +194,20 @@ private void assertMaskedSensitiveConfigurations(String
configString) {
Arrays.stream(sensitiveConfigKeys)
.forEach(config -> assertTrue(configString.contains(config +
"=(redacted)")));
}
+
+ @Test
+ public void testDefaultMinIsr() {
+ Map<String, Object> props = createValidConfigProps(new HashMap<>(),
new HashMap<>(), new HashMap<>());
Review Comment:
Instead of `new HashMap<>`, can it be replaced with `Collections.emptyMap`
```suggestion
Map<String, Object> props =
createValidConfigProps(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
```
##########
storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java:
##########
@@ -192,4 +194,20 @@ private void assertMaskedSensitiveConfigurations(String
configString) {
Arrays.stream(sensitiveConfigKeys)
.forEach(config -> assertTrue(configString.contains(config +
"=(redacted)")));
}
+
+ @Test
+ public void testDefaultMinIsr() {
+ Map<String, Object> props = createValidConfigProps(new HashMap<>(),
new HashMap<>(), new HashMap<>());
+ TopicBasedRemoteLogMetadataManagerConfig rlmmConfig = new
TopicBasedRemoteLogMetadataManagerConfig(props);
+ assertEquals(DEFAULT_REMOTE_LOG_METADATA_TOPIC_MIN_ISR,
rlmmConfig.metadataTopicMinIsr());
+ }
+
+ @Test
+ public void testCustomMinIsr() {
+ Map<String, Object> props = createValidConfigProps(new HashMap<>(),
new HashMap<>(), new HashMap<>());
Review Comment:
same as above:
```suggestion
Map<String, Object> props =
createValidConfigProps(Collections.emptyMap(), Collections.emptyMap(),
Collections.emptyMap());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]