Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
poorbarcode merged PR #22769: URL: https://github.com/apache/pulsar/pull/22769 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
Demogorgon314 commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612915366 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 1000_000); +}); + // Disable replication. setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); +Thread.sleep(3 * 1000); Review Comment: Should we use `Awaitility` to verify the condition? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
poorbarcode commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612788710 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -748,4 +767,55 @@ public void testDeletePartitionedTopic() throws Exception { .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); } } + +@Test +public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); +admin1.topics().createPartitionedTopic(topicName, 2); + +// Verify replicator works. +verifyReplicationWorks(topicName); + +// Disable topic level replication. +setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); +setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); + +// Expand topic. +admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + +// Wait for async tasks that were triggered by expanding topic partitions. +Thread.sleep(3 * 1000); + +// Verify: the topics on the remote cluster did not been expanded. + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2); + +cleanupTopics(() -> { +admin1.topics().deletePartitionedTopic(topicName, false); +admin2.topics().deletePartitionedTopic(topicName, false); +}); +} + +@Test +public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); +admin1.topics().createPartitionedTopic(topicName, 2); + +// Verify replicator works. +verifyReplicationWorks(topicName); + +// Expand topic. +admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + +// Verify: the topics on the remote cluster did not been expanded. Review Comment: Fixed, Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
poorbarcode commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612788299 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 1000_000); +}); + // Disable replication. setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); +Thread.sleep(3 * 1000); +TopicPolicies topicPolicies1 = pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)); Review Comment: Removed ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { Review Comment: Sorry, it is for debugging, I removed it. ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 1000_000); +}); + // Disable replication. setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); +Thread.sleep(3 * 1000); Review Comment: Because the operation that calls the expanding API will be executed aync -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
Demogorgon314 commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612624344 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -748,4 +767,55 @@ public void testDeletePartitionedTopic() throws Exception { .persistentTopicExists(TopicName.get(topicName).getPartition(1)).join()); } } + +@Test +public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); +admin1.topics().createPartitionedTopic(topicName, 2); + +// Verify replicator works. +verifyReplicationWorks(topicName); + +// Disable topic level replication. +setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); +setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); + +// Expand topic. +admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + +// Wait for async tasks that were triggered by expanding topic partitions. +Thread.sleep(3 * 1000); + +// Verify: the topics on the remote cluster did not been expanded. + assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 2); + +cleanupTopics(() -> { +admin1.topics().deletePartitionedTopic(topicName, false); +admin2.topics().deletePartitionedTopic(topicName, false); +}); +} + +@Test +public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + replicatedNamespace + "/tp_"); +admin1.topics().createPartitionedTopic(topicName, 2); + +// Verify replicator works. +verifyReplicationWorks(topicName); + +// Expand topic. +admin1.topics().updatePartitionedTopic(topicName, 3); + assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 3); + +// Verify: the topics on the remote cluster did not been expanded. Review Comment: The code comment is wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
Demogorgon314 commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612619986 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 1000_000); +}); + // Disable replication. setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); +Thread.sleep(3 * 1000); Review Comment: Why do we sleep 3s here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
Demogorgon314 commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612619392 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { Review Comment: I see this check will retry many times, and it takes a very long time, is this expected? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
Demogorgon314 commented on code in PR #22769: URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612618664 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws Exception { // Verify replicator works. verifyReplicationWorks(topicName); +admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000); +Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() -> { + assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 1000_000); +}); + // Disable replication. setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, pulsar1); setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, pulsar2); +Thread.sleep(3 * 1000); +TopicPolicies topicPolicies1 = pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName)); Review Comment: The `topicPolicies1` and `topicPolicies2` were never used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]
poorbarcode commented on PR #22769: URL: https://github.com/apache/pulsar/pull/22769#issuecomment-2128348138 /pulsarbot rerun-failure-checks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org