Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-27 Thread via GitHub


poorbarcode merged PR #22769:
URL: https://github.com/apache/pulsar/pull/22769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


Demogorgon314 commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612915366


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {
+
assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 
1000_000);
+});
+
 // Disable replication.
 setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
 setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
 
+Thread.sleep(3 * 1000);

Review Comment:
   Should we use `Awaitility` to verify the condition?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


poorbarcode commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612788710


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -748,4 +767,55 @@ public void testDeletePartitionedTopic() throws Exception {
 
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
 }
 }
+
+@Test
+public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() 
throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+admin1.topics().createPartitionedTopic(topicName, 2);
+
+// Verify replicator works.
+verifyReplicationWorks(topicName);
+
+// Disable topic level replication.
+setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
+setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
+
+// Expand topic.
+admin1.topics().updatePartitionedTopic(topicName, 3);
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+// Wait for async tasks that were triggered by expanding topic 
partitions.
+Thread.sleep(3 * 1000);
+
+// Verify: the topics on the remote cluster did not been expanded.
+
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
2);
+
+cleanupTopics(() -> {
+admin1.topics().deletePartitionedTopic(topicName, false);
+admin2.topics().deletePartitionedTopic(topicName, false);
+});
+}
+
+@Test
+public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+admin1.topics().createPartitionedTopic(topicName, 2);
+
+// Verify replicator works.
+verifyReplicationWorks(topicName);
+
+// Expand topic.
+admin1.topics().updatePartitionedTopic(topicName, 3);
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+// Verify: the topics on the remote cluster did not been expanded.

Review Comment:
   Fixed, Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


poorbarcode commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612788299


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {
+
assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 
1000_000);
+});
+
 // Disable replication.
 setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
 setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
 
+Thread.sleep(3 * 1000);
+TopicPolicies topicPolicies1 = 
pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName));

Review Comment:
   Removed



##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {

Review Comment:
   Sorry, it is for debugging, I removed it.



##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {
+
assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 
1000_000);
+});
+
 // Disable replication.
 setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
 setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
 
+Thread.sleep(3 * 1000);

Review Comment:
   Because the operation that calls the expanding API will be executed aync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


Demogorgon314 commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612624344


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -748,4 +767,55 @@ public void testDeletePartitionedTopic() throws Exception {
 
.persistentTopicExists(TopicName.get(topicName).getPartition(1)).join());
 }
 }
+
+@Test
+public void testNoExpandTopicPartitionsWhenDisableTopicLevelReplication() 
throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+admin1.topics().createPartitionedTopic(topicName, 2);
+
+// Verify replicator works.
+verifyReplicationWorks(topicName);
+
+// Disable topic level replication.
+setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
+setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
+
+// Expand topic.
+admin1.topics().updatePartitionedTopic(topicName, 3);
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+// Wait for async tasks that were triggered by expanding topic 
partitions.
+Thread.sleep(3 * 1000);
+
+// Verify: the topics on the remote cluster did not been expanded.
+
assertEquals(admin2.topics().getPartitionedTopicMetadata(topicName).partitions, 
2);
+
+cleanupTopics(() -> {
+admin1.topics().deletePartitionedTopic(topicName, false);
+admin2.topics().deletePartitionedTopic(topicName, false);
+});
+}
+
+@Test
+public void testExpandTopicPartitionsOnNamespaceLevelReplication() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ replicatedNamespace + "/tp_");
+admin1.topics().createPartitionedTopic(topicName, 2);
+
+// Verify replicator works.
+verifyReplicationWorks(topicName);
+
+// Expand topic.
+admin1.topics().updatePartitionedTopic(topicName, 3);
+
assertEquals(admin1.topics().getPartitionedTopicMetadata(topicName).partitions, 
3);
+
+// Verify: the topics on the remote cluster did not been expanded.

Review Comment:
   The code comment is wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


Demogorgon314 commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612619986


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {
+
assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 
1000_000);
+});
+
 // Disable replication.
 setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
 setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
 
+Thread.sleep(3 * 1000);

Review Comment:
   Why do we sleep 3s here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


Demogorgon314 commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612619392


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {

Review Comment:
   I see this check will retry many times, and it takes a very long time, is 
this expected?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


Demogorgon314 commented on code in PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#discussion_r1612618664


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -698,10 +708,19 @@ public void testDeleteNonPartitionedTopic() throws 
Exception {
 // Verify replicator works.
 verifyReplicationWorks(topicName);
 
+admin1.topicPolicies().setCompactionThreshold(topicName, 1000_000);
+Awaitility.await().atMost(Duration.ofSeconds(3600)).untilAsserted(() 
-> {
+
assertEquals(admin2.topicPolicies().getCompactionThreshold(topicName), 
1000_000);
+});
+
 // Disable replication.
 setTopicLevelClusters(topicName, Arrays.asList(cluster1), admin1, 
pulsar1);
 setTopicLevelClusters(topicName, Arrays.asList(cluster2), admin2, 
pulsar2);
 
+Thread.sleep(3 * 1000);
+TopicPolicies topicPolicies1 = 
pulsar1.getTopicPoliciesService().getTopicPolicies(TopicName.get(topicName));

Review Comment:
   The `topicPolicies1` and `topicPolicies2` were never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [fix] [broker] fix topic partitions was expanded even if disabled topic level replication [pulsar]

2024-05-23 Thread via GitHub


poorbarcode commented on PR #22769:
URL: https://github.com/apache/pulsar/pull/22769#issuecomment-2128348138

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org