Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode merged PR #22537: URL: https://github.com/apache/pulsar/pull/22537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on PR #22537: URL: https://github.com/apache/pulsar/pull/22537#issuecomment-2074019426 Rebase master -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
gaoran10 commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1576003194 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, check
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, che
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574733176 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, che
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
gaoran10 commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1574055874 ## pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java: ## @@ -621,35 +625,82 @@ protected void internalCreatePartitionedTopic(AsyncResponse asyncResponse, int n private void internalCreatePartitionedTopicToReplicatedClustersInBackground(int numPartitions) { getNamespaceReplicatedClustersAsync(namespaceName) -.thenAccept(clusters -> { -for (String cluster : clusters) { -if (!cluster.equals(pulsar().getConfiguration().getClusterName())) { -// this call happens in the background without async composition. completion is logged. -pulsar().getPulsarResources().getClusterResources() -.getClusterAsync(cluster) -.thenCompose(clusterDataOp -> -((TopicsImpl) pulsar().getBrokerService() - .getClusterPulsarAdmin(cluster, - clusterDataOp).topics()) - .createPartitionedTopicAsync( - topicName.getPartitionedTopicName(), -numPartitions, -true, null)) -.whenComplete((__, ex) -> { -if (ex != null) { -log.error( -"[{}] Failed to create partitioned topic {} in cluster {}.", -clientAppId(), topicName, cluster, ex); -} else { -log.info( -"[{}] Successfully created partitioned topic {} in " -+ "cluster {}", -clientAppId(), topicName, cluster); -} -}); -} +.thenAccept(clusters -> { +// this call happens in the background without async composition. completion is logged. + internalCreatePartitionedTopicToReplicatedClustersInBackground(clusters, numPartitions); +}); +} + +protected Map> internalCreatePartitionedTopicToReplicatedClustersInBackground( +Set clusters, int numPartitions) { +final String shortTopicName = topicName.getPartitionedTopicName(); +Map> tasksForAllClusters = new HashMap<>(); +for (String cluster : clusters) { +if (cluster.equals(pulsar().getConfiguration().getClusterName())) { +continue; +} +ClusterResources clusterResources = pulsar().getPulsarResources().getClusterResources(); +CompletableFuture createRemoteTopicFuture = new CompletableFuture<>(); +tasksForAllClusters.put(cluster, createRemoteTopicFuture); + clusterResources.getClusterAsync(cluster).whenComplete((clusterData, ex1) -> { +if (ex1 != null) { +// Unexpected error, such as NPE. Catch all error to avoid the "createRemoteTopicFuture" stuck. +log.error("[{}] An un-expected error occurs when trying to create partitioned topic {} in cluster" ++ " {}.", clientAppId(), topicName, cluster, ex1); +createRemoteTopicFuture.completeExceptionally(new RestException(ex1)); +return; +} +// Get cluster data success. +TopicsImpl topics = +(TopicsImpl) pulsar().getBrokerService().getClusterPulsarAdmin(cluster, clusterData).topics(); +topics.createPartitionedTopicAsync(shortTopicName, numPartitions, true, null) +.whenComplete((ignore, ex2) -> { +if (ex2 == null) { +// Create success. +log.info("[{}] Successfully created partitioned topic {} in cluster {}", +clientAppId(), topicName, cluster); +createRemoteTopicFuture.complete(null); +return; +} +// Create topic on the remote cluster error. +Throwable unwrapEx2 = FutureUtil.unwrapCompletionException(ex2); +// The topic has been created before, check
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: No, it breaks the order of consumption on the remote cluster if they are using Key_Shared mode, letting users determine how to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573843227 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: No, it breaks the order of consumption on the remote cluster if they are using Key_Shared mode, just ask users to determine how to fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1573842742 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +admin2.topics().createPartitionedTopic(topicName, 3); +admin1.topics().createPartitionedTopic(topicName, 2); +try { +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +fail("Expected error due to a conflict partitioned topic already exists."); +} catch (Exception ex) { +Throwable unWrapEx = FutureUtil.unwrapCompletionException(ex); +assertTrue(unWrapEx.getMessage().contains("with different partitions")); +} +// Check nothing changed. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 3); +assertEquals(admin1.topics().getReplicationClusters(topicName, true).size(), 1); +// cleanup. +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +/** + * TODO next PR will correct the behavior below, just left this test here. + */ +// @Test +private void testNamespaceLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
Demogorgon314 commented on code in PR #22537: URL: https://github.com/apache/pulsar/pull/22537#discussion_r1572038896 ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteConflictTopicExist() throws Exception { Review Comment: When the remote cluster has 1 partition for the topic, and then we try to create the topic with 2 partition on the source cluster, should we update the remote cluster's topic partition to 2? ## pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorTest.java: ## @@ -153,4 +172,90 @@ public void testTopicCloseWhenInternalProducerCloseErrorOnce() throws Exception admin2.topics().delete(topicName); }); } + +@Test +public void testPartitionedTopicLevelReplication() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1, cluster2)); +// Check the partitioned topic has been created at the remote cluster. +PartitionedTopicMetadata topicMetadata2 = admin2.topics().getPartitionedTopicMetadata(topicName); +assertEquals(topicMetadata2.partitions, 2); +// cleanup. +admin1.topics().setReplicationClusters(topicName, Arrays.asList(cluster1)); +waitReplicatorStopped(partition0); +waitReplicatorStopped(partition1); +admin1.topics().deletePartitionedTopic(topicName); +admin2.topics().deletePartitionedTopic(topicName); +} + +@Test +public void testPartitionedTopicLevelReplicationRemoteTopicExist() throws Exception { +final String topicName = BrokerTestUtil.newUniqueName("persistent://" + nonReplicatedNamespace + "/tp_"); +final String partition0 = TopicName.get(topicName).getPartition(0).toString(); +final String partition1 = TopicName.get(topicName).getPartition(1).toString(); +admin1.topics().createPartitionedTopic(topicName, 2); +admin2.topics().createPartitionedTopic(topicName, 2); +admin1.topics().setReplicationClusters(topicName, Arrays.asLi
Re: [PR] [improve] [broker] Create partitioned topics automatically when enable topic level replication [pulsar]
poorbarcode closed pull request #22537: [improve] [broker] Create partitioned topics automatically when enable topic level replication URL: https://github.com/apache/pulsar/pull/22537 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org