This is an automated email from the ASF dual-hosted git repository. eolivelli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 56a7b89be5f [fix][broker] Reject create non existent persistent partitions. (#19086) 56a7b89be5f is described below commit 56a7b89be5fecd41fc200379c96b15e3c0ace7c3 Author: Qiang Zhao <mattisonc...@apache.org> AuthorDate: Mon Jan 9 19:34:19 2023 +0800 [fix][broker] Reject create non existent persistent partitions. (#19086) --- .../broker/admin/impl/PersistentTopicsBase.java | 266 ++++++++++----------- .../pulsar/broker/service/BrokerService.java | 19 +- .../apache/pulsar/broker/admin/AdminApi2Test.java | 12 +- .../nonpersistent/NonPersistentTopicTest.java | 23 ++ .../service/persistent/PersistentTopicTest.java | 20 ++ 5 files changed, 193 insertions(+), 147 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 3fb551967b9..81c9638632e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -411,67 +411,85 @@ public class PersistentTopicsBase extends AdminResource { * recreate them at application so, newly created producers and consumers can connect to newly added partitions as * well. Therefore, it can violate partition ordering at producers until all producers are restarted at application. * - * @param numPartitions + * @param expectPartitions * @param updateLocalTopicOnly * @param authoritative * @param force */ - protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int numPartitions, + protected CompletableFuture<Void> internalUpdatePartitionedTopicAsync(int expectPartitions, boolean updateLocalTopicOnly, boolean authoritative, boolean force) { - if (numPartitions <= 0) { - return FutureUtil.failedFuture(new RestException(Status.NOT_ACCEPTABLE, - "Number of partitions should be more than 0")); + if (expectPartitions <= 0) { + return FutureUtil.failedFuture( + new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0")); } return validateTopicOwnershipAsync(topicName, authoritative) - .thenCompose(__ -> validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, - PolicyOperation.WRITE)) + .thenCompose(__ -> + validateTopicPolicyOperationAsync(topicName, PolicyName.PARTITION, PolicyOperation.WRITE)) .thenCompose(__ -> { if (!updateLocalTopicOnly && !force) { - return validatePartitionTopicUpdateAsync(topicName.getLocalName(), numPartitions); + return validatePartitionTopicUpdateAsync(topicName.getLocalName(), expectPartitions); } else { return CompletableFuture.completedFuture(null); } }).thenCompose(__ -> pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName)) .thenCompose(topicMetadata -> { final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); - if (maxPartitions > 0 && numPartitions > maxPartitions) { + if (maxPartitions > 0 && expectPartitions > maxPartitions) { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be less than or equal to " + maxPartitions); } - // Only do the validation if it's the first hop. - if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { - return getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject()) - .thenApply(clusters -> { - if (!clusters.contains(pulsar().getConfig().getClusterName())) { - log.error("[{}] local cluster is not part of replicated cluster for namespace {}", - clientAppId(), topicName); - throw new RestException(Status.FORBIDDEN, "Local cluster is not part of replicate" - + " cluster list"); - } - return clusters; - }) - .thenCompose(clusters -> - tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions) - .thenApply(ignore -> clusters)) - .thenCompose(clusters -> createSubscriptions(topicName, numPartitions, force).thenApply( - ignore -> clusters)) - .thenCompose(clusters -> { - if (!updateLocalTopicOnly) { - return updatePartitionInOtherCluster(numPartitions, clusters) - .thenCompose(v -> namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, p -> - new PartitionedTopicMetadata(numPartitions, - p.properties) - )); - } else { - return CompletableFuture.completedFuture(null); - } - }); - } else { - return tryCreateExtendedPartitionsAsync(topicMetadata.partitions, numPartitions) - .thenCompose(ignore -> updatePartitionedTopic(topicName, numPartitions, force)); + final PulsarAdmin adminClient; + try { + adminClient = pulsar().getAdminClient(); + } catch (PulsarServerException e) { + throw new RuntimeException(e); } + return adminClient.topics().getListAsync(topicName.getNamespace()) + .thenCompose(topics -> { + long existPartitions = topics.stream() + .filter(t -> TopicName.get(t).getPartitionedTopicName() + .equals(topicName.getPartitionedTopicName())) + .count(); + if (existPartitions >= expectPartitions) { + throw new RestException(Status.CONFLICT, + "Number of new partitions must be greater than existing number of partitions"); + } + // Only do the validation if it's the first hop. + if (topicName.isGlobal() && isNamespaceReplicated(topicName.getNamespaceObject())) { + return getNamespaceReplicatedClustersAsync(topicName.getNamespaceObject()) + .thenApply(clusters -> { + if (!clusters.contains(pulsar().getConfig().getClusterName())) { + log.error("[{}] local cluster is not part of" + + " replicated cluster for namespace {}", + clientAppId(), topicName); + throw new RestException(Status.FORBIDDEN, + "Local cluster is not part of replicate cluster list"); + } + return clusters; + }) + .thenCompose(clusters -> + tryCreatePartitionsAsync(expectPartitions) + .thenApply(ignore -> clusters)) + .thenCompose(clusters -> { + if (!updateLocalTopicOnly) { + return namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, p -> + new PartitionedTopicMetadata(expectPartitions, + p.properties)) + .thenCompose(__ -> + updatePartitionInOtherCluster(expectPartitions, + clusters)); + } else { + return CompletableFuture.completedFuture(null); + } + }).thenCompose(clusters -> createSubscriptions(topicName, + expectPartitions)); + } else { + return tryCreatePartitionsAsync(expectPartitions) + .thenCompose(ignore -> updatePartitionedTopic(topicName, expectPartitions)); + } + }); }); } @@ -4363,124 +4381,100 @@ public class PersistentTopicsBase extends AdminResource { } } - private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int numPartitions, boolean force) { - CompletableFuture<Void> result = new CompletableFuture<>(); - createSubscriptions(topicName, numPartitions, force).thenCompose(__ -> { - CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources() - .updatePartitionedTopicAsync(topicName, p -> - new PartitionedTopicMetadata(numPartitions, p.properties)); - future.exceptionally(ex -> { - // If the update operation fails, clean up the partitions that were created - getPartitionedTopicMetadataAsync(topicName, false, false).thenAccept(metadata -> { - int oldPartition = metadata.partitions; - for (int i = oldPartition; i < numPartitions; i++) { - topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> { - log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName, - ex1.getCause()); - return null; - }); - } - }).exceptionally(e -> { - log.warn("[{}] Failed to clean up managedLedger", topicName, e); - return null; - }); + private CompletableFuture<Void> updatePartitionedTopic(TopicName topicName, int expectPartitions) { + CompletableFuture<Void> future = namespaceResources().getPartitionedTopicResources() + .updatePartitionedTopicAsync(topicName, p -> + new PartitionedTopicMetadata(expectPartitions, p.properties)); + future.exceptionally(ex -> { + // If the update operation fails, clean up the partitions that were created + getPartitionedTopicMetadataAsync(topicName, false, false) + .thenAccept(metadata -> { + int oldPartition = metadata.partitions; + for (int i = oldPartition; i < expectPartitions; i++) { + topicResources().deletePersistentTopicAsync(topicName.getPartition(i)).exceptionally(ex1 -> { + log.warn("[{}] Failed to clean up managedLedger {}", clientAppId(), topicName, + ex1.getCause()); + return null; + }); + } + }).exceptionally(e -> { + log.warn("[{}] Failed to clean up managedLedger", topicName, e); return null; }); - return future; - }).thenAccept(__ -> result.complete(null)).exceptionally(ex -> { - result.completeExceptionally(ex); return null; }); - return result; + return future.thenCompose(__ -> createSubscriptions(topicName, expectPartitions)); } /** * It creates subscriptions for new partitions of existing partitioned-topics. * * @param topicName : topic-name: persistent://prop/cluster/ns/topic - * @param numPartitions : number partitions for the topics - * @param ignoreConflictException : If true, ignore ConflictException: subscription already exists for topic + * @param expectPartitions : number of expected partitions * */ - private CompletableFuture<Void> createSubscriptions(TopicName topicName, int numPartitions, - boolean ignoreConflictException) { + private CompletableFuture<Void> createSubscriptions(TopicName topicName, int expectPartitions) { CompletableFuture<Void> result = new CompletableFuture<>(); - pulsar().getBrokerService().fetchPartitionedTopicMetadataAsync(topicName).thenAccept(partitionMetadata -> { - if (partitionMetadata.partitions < 1) { - result.completeExceptionally(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); - return; - } - - if (partitionMetadata.partitions >= numPartitions) { - result.completeExceptionally(new RestException(Status.CONFLICT, - "number of partitions must be more than existing " + partitionMetadata.partitions)); - return; - } - - PulsarAdmin admin; - try { - admin = pulsar().getAdminClient(); - } catch (PulsarServerException e1) { - result.completeExceptionally(e1); - return; - } + if (expectPartitions < 1) { + return FutureUtil.failedFuture(new RestException(Status.CONFLICT, "Topic is not partitioned topic")); + } + PulsarAdmin admin; + try { + admin = pulsar().getAdminClient(); + } catch (PulsarServerException e1) { + return FutureUtil.failedFuture(e1); + } - admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { - List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>(); + admin.topics().getStatsAsync(topicName.getPartition(0).toString()).thenAccept(stats -> { + List<CompletableFuture<Void>> subscriptionFutures = new ArrayList<>(); - stats.getSubscriptions().entrySet().forEach(e -> { - String subscription = e.getKey(); - SubscriptionStats ss = e.getValue(); - if (!ss.isDurable()) { - // We must not re-create non-durable subscriptions on the new partitions - return; - } - boolean replicated = ss.isReplicated(); - - for (int i = partitionMetadata.partitions; i < numPartitions; i++) { - final String topicNamePartition = topicName.getPartition(i).toString(); - CompletableFuture<Void> future = new CompletableFuture<>(); - admin.topics().createSubscriptionAsync(topicNamePartition, - subscription, MessageId.latest, replicated).whenComplete((__, ex) -> { - if (ex == null) { + stats.getSubscriptions().entrySet().forEach(e -> { + String subscription = e.getKey(); + SubscriptionStats ss = e.getValue(); + if (!ss.isDurable()) { + // We must not re-create non-durable subscriptions on the new partitions + return; + } + boolean replicated = ss.isReplicated(); + + for (int i = 0; i < expectPartitions; i++) { + final String topicNamePartition = topicName.getPartition(i).toString(); + CompletableFuture<Void> future = new CompletableFuture<>(); + admin.topics().createSubscriptionAsync(topicNamePartition, + subscription, MessageId.latest, replicated).whenComplete((__, ex) -> { + if (ex == null) { + future.complete(null); + } else { + Throwable realCause = FutureUtil.unwrapCompletionException(ex); + if (realCause instanceof PulsarAdminException.ConflictException) { future.complete(null); } else { - if (ignoreConflictException - && ex instanceof PulsarAdminException.ConflictException) { - future.complete(null); - } else { - future.completeExceptionally(ex); - } + future.completeExceptionally(realCause); } - }); - subscriptionFutures.add(future); - } - }); + } + }); + subscriptionFutures.add(future); + } + }); - FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { - log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName); - result.complete(null); - }).exceptionally(ex -> { - log.warn("[{}] Failed to create subscriptions on new partitions for {}", - clientAppId(), topicName, ex); - result.completeExceptionally(ex); - return null; - }); + FutureUtil.waitForAll(subscriptionFutures).thenRun(() -> { + log.info("[{}] Successfully created subscriptions on new partitions {}", clientAppId(), topicName); + result.complete(null); }).exceptionally(ex -> { - if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { - // The first partition doesn't exist, so there are currently to subscriptions to recreate - result.complete(null); - } else { - log.warn("[{}] Failed to get list of subscriptions of {}", - clientAppId(), topicName.getPartition(0), ex); - result.completeExceptionally(ex); - } + log.warn("[{}] Failed to create subscriptions on new partitions for {}", + clientAppId(), topicName, ex); + result.completeExceptionally(ex); return null; }); }).exceptionally(ex -> { - log.warn("[{}] Failed to get partition metadata for {}", - clientAppId(), topicName.toString()); - result.completeExceptionally(ex); + if (ex.getCause() instanceof PulsarAdminException.NotFoundException) { + // The first partition doesn't exist, so there are currently to subscriptions to recreate + result.complete(null); + } else { + log.warn("[{}] Failed to get list of subscriptions of {}", + clientAppId(), topicName.getPartition(0), ex); + result.completeExceptionally(ex); + } return null; }); return result; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index ee0ad6e103b..65a5795d59e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1017,11 +1017,22 @@ public class BrokerService implements Closeable { } final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent); if (isPersistentTopic) { - return topics.computeIfAbsent(topicName.toString(), (k) -> { - return this.loadOrCreatePersistentTopic(k, createIfMissing, properties); + return topics.computeIfAbsent(topicName.toString(), (tpName) -> { + if (topicName.isPartitioned()) { + return fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName())) + .thenCompose((metadata) -> { + // Allow crate non-partitioned persistent topic that name includes `partition` + if (metadata.partitions == 0 + || topicName.getPartitionIndex() < metadata.partitions) { + return loadOrCreatePersistentTopic(tpName, createIfMissing, properties); + } + return CompletableFuture.completedFuture(Optional.empty()); + }); + } + return loadOrCreatePersistentTopic(tpName, createIfMissing, properties); }); } else { - return topics.computeIfAbsent(topicName.toString(), (name) -> { + return topics.computeIfAbsent(topicName.toString(), (name) -> { if (topicName.isPartitioned()) { final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { @@ -1035,7 +1046,7 @@ public class BrokerService implements Closeable { } else { return CompletableFuture.completedFuture(Optional.empty()); } - }); + }); } } catch (IllegalArgumentException e) { log.warn("[{}] Illegalargument exception when loading topic", topicName, e); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java index 11c84d990f6..e35e9311b9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java @@ -2676,15 +2676,13 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest { assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions); // create a subscription for few new partition which can fail - admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, - MessageId.earliest); - try { - admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, false); - } catch (PulsarAdminException.PreconditionFailedException e) { - // Ok + admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1, + MessageId.earliest); + fail("Unexpected behaviour"); + } catch (PulsarAdminException.PreconditionFailedException ex) { + // OK } - assertEquals(admin.topics().getPartitionedTopicMetadata(partitionedTopicName).partitions, startPartitions); admin.topics().updatePartitionedTopic(partitionedTopicName, newPartitions, false, true); // validate subscription is created for new partition. diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java index 71caa1edb52..73a1084f30f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopicTest.java @@ -18,13 +18,18 @@ */ package org.apache.pulsar.broker.service.nonpersistent; +import lombok.Cleanup; import org.apache.pulsar.broker.service.BrokerTestBase; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.TopicStats; +import org.junit.Assert; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -32,6 +37,7 @@ import org.testng.annotations.Test; import static org.testng.Assert.assertTrue; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.fail; @Test(groups = "broker") public class NonPersistentTopicTest extends BrokerTestBase { @@ -96,4 +102,21 @@ public class NonPersistentTopicTest extends BrokerTestBase { assertEquals(statsAfterUnsubscribe.getBytesOutCounter(), statsBeforeUnsubscribe.getBytesOutCounter()); assertEquals(statsAfterUnsubscribe.getMsgOutCounter(), statsBeforeUnsubscribe.getMsgOutCounter()); } + + @Test + public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException { + final String topicName = "non-persistent://prop/ns-abc/testCreateNonExistentPartitions"; + admin.topics().createPartitionedTopic(topicName, 4); + TopicName partition = TopicName.get(topicName).getPartition(4); + try { + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(partition.toString()) + .create(); + fail("unexpected behaviour"); + } catch (PulsarClientException.TopicDoesNotExistException ignored) { + + } + Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java index aa05624a5b0..19c5bd5c9aa 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java @@ -29,6 +29,7 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; +import static org.testng.Assert.fail; import java.io.ByteArrayOutputStream; import java.lang.reflect.Field; import java.nio.charset.StandardCharsets; @@ -46,6 +47,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.pulsar.broker.service.BrokerTestBase; import org.apache.pulsar.broker.stats.PrometheusMetricsTest; import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator; +import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.client.api.*; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; @@ -402,4 +404,22 @@ public class PersistentTopicTest extends BrokerTestBase { assertTrue(persistentSubscription.getCursor().getLastActive() > beforeRemoveConsumerTimestamp); assertTrue(persistentSubscription2.getCursor().getLastActive() > beforeRemoveConsumerTimestamp); } + + + @Test + public void testCreateNonExistentPartitions() throws PulsarAdminException, PulsarClientException { + final String topicName = "persistent://prop/ns-abc/testCreateNonExistentPartitions"; + admin.topics().createPartitionedTopic(topicName, 4); + TopicName partition = TopicName.get(topicName).getPartition(4); + try { + @Cleanup + Producer<byte[]> producer = pulsarClient.newProducer() + .topic(partition.toString()) + .create(); + fail("unexpected behaviour"); + } catch (PulsarClientException.TopicDoesNotExistException ignored) { + + } + Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4); + } }