Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-24 Thread via GitHub


poorbarcode merged PR #22537:
URL: https://github.com/apache/pulsar/pull/22537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-23 Thread via GitHub


poorbarcode commented on PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#issuecomment-2074019426

   Rebase master


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-23 Thread via GitHub


gaoran10 commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1576003194


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, check 

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-22 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, che

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-22 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, che

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


gaoran10 commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574055874


##
pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java:
##
@@ -621,35 +625,82 @@ protected void 
internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n
 
 private void 
internalCreatePartitionedTopicToReplicatedClustersInBackground(int 
numPartitions) {
 getNamespaceReplicatedClustersAsync(namespaceName)
-.thenAccept(clusters -> {
-for (String cluster : clusters) {
-if 
(!cluster.equals(pulsar().getConfiguration().getClusterName())) {
-// this call happens in the background without 
async composition. completion is logged.
-pulsar().getPulsarResources().getClusterResources()
-.getClusterAsync(cluster)
-.thenCompose(clusterDataOp ->
-((TopicsImpl) 
pulsar().getBrokerService()
-
.getClusterPulsarAdmin(cluster,
-
clusterDataOp).topics())
-
.createPartitionedTopicAsync(
-
topicName.getPartitionedTopicName(),
-numPartitions,
-true, null))
-.whenComplete((__, ex) -> {
-if (ex != null) {
-log.error(
-"[{}] Failed to create 
partitioned topic {} in cluster {}.",
-clientAppId(), topicName, 
cluster, ex);
-} else {
-log.info(
-"[{}] Successfully created 
partitioned topic {} in "
-+ "cluster {}",
-clientAppId(), topicName, 
cluster);
-}
-});
-}
+.thenAccept(clusters -> {
+// this call happens in the background without async 
composition. completion is logged.
+
internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, 
numPartitions);
+});
+}
+
+protected Map> 
internalCreatePartitionedTopicToReplicatedClustersInBackground(
+Set clusters, int numPartitions) {
+final String shortTopicName = topicName.getPartitionedTopicName();
+Map> tasksForAllClusters = new 
HashMap<>();
+for (String cluster : clusters) {
+if (cluster.equals(pulsar().getConfiguration().getClusterName())) {
+continue;
+}
+ClusterResources clusterResources = 
pulsar().getPulsarResources().getClusterResources();
+CompletableFuture createRemoteTopicFuture = new 
CompletableFuture<>();
+tasksForAllClusters.put(cluster, createRemoteTopicFuture);
+
clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> {
+if (ex1 != null) {
+// Unexpected error, such as NPE. Catch all error to avoid 
the "createRemoteTopicFuture" stuck.
+log.error("[{}] An un-expected error occurs when trying to 
create partitioned topic {} in cluster"
++ " {}.", clientAppId(), topicName, 
cluster, ex1);
+createRemoteTopicFuture.completeExceptionally(new 
RestException(ex1));
+return;
+}
+// Get cluster data success.
+TopicsImpl topics =
+(TopicsImpl) 
pulsar().getBrokerService().getClusterPulsarAdmin(cluster, 
clusterData).topics();
+topics.createPartitionedTopicAsync(shortTopicName, 
numPartitions, true, null)
+.whenComplete((ignore, ex2) -> {
+if (ex2 == null) {
+// Create success.
+log.info("[{}] Successfully created partitioned topic 
{} in cluster {}",
+clientAppId(), topicName, cluster);
+createRemoteTopicFuture.complete(null);
+return;
+}
+// Create topic on the remote cluster error.
+Throwable unwrapEx2 = 
FutureUtil.unwrapCompletionException(ex2);
+// The topic has been created before, check 

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   No, it breaks the order of consumption on the remote cluster if they are 
using Key_Shared mode, letting users determine how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   No, it breaks the order of consumption on the remote cluster if they are 
using Key_Shared mode, just ask users to determine how to fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-21 Thread via GitHub


poorbarcode commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+admin2.topics().createPartitionedTopic(topicName, 3);
+admin1.topics().createPartitionedTopic(topicName, 2);
+try {
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+fail("Expected error due to a conflict partitioned topic already 
exists.");
+} catch (Exception ex) {
+Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex);
+assertTrue(unWrapEx.getMessage().contains("with different 
partitions"));
+}
+// Check nothing changed.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 3);
+assertEquals(admin1.topics().getReplicationClusters(topicName, 
true).size(), 1);
+// cleanup.
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+/**
+ * TODO next PR will correct the behavior below, just left this test here.
+ */
+// @Test
+private void testNamespaceLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-19 Thread via GitHub


Demogorgon314 commented on code in PR #22537:
URL: https://github.com/apache/pulsar/pull/22537#discussion_r1572038896


##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() 
throws Exception {

Review Comment:
   When the remote cluster has 1 partition for the topic, and then we try to 
create the topic with 2 partition on the source cluster, should we update the 
remote cluster's topic partition to 2?



##
pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java:
##
@@ -153,4 +172,90 @@ public void 
testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception
 admin2.topics().delete(topicName);
 });
 }
+
+@Test
+public void testPartitionedTopicLevelReplication() throws Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1, cluster2));
+// Check the partitioned topic has been created at the remote cluster.
+PartitionedTopicMetadata topicMetadata2 = 
admin2.topics().getPartitionedTopicMetadata(topicName);
+assertEquals(topicMetadata2.partitions, 2);
+// cleanup.
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asList(cluster1));
+waitReplicatorStopped(partition0);
+waitReplicatorStopped(partition1);
+admin1.topics().deletePartitionedTopic(topicName);
+admin2.topics().deletePartitionedTopic(topicName);
+}
+
+@Test
+public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws 
Exception {
+final String topicName = BrokerTestUtil.newUniqueName("persistent://" 
+ nonReplicatedNamespace + "/tp_");
+final String partition0 = 
TopicName.get(topicName).getPartition(0).toString();
+final String partition1 = 
TopicName.get(topicName).getPartition(1).toString();
+admin1.topics().createPartitionedTopic(topicName, 2);
+admin2.topics().createPartitionedTopic(topicName, 2);
+admin1.topics().setReplicationClusters(topicName, 
Arrays.asLi

Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]

2024-04-18 Thread via GitHub


poorbarcode closed pull request #22537: [improve] [broker] Create partitioned 
topics automatically when enable topic level replication
URL: https://github.com/apache/pulsar/pull/22537


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org