This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 547c421 Add more config for auto-topic-creation (#4963) 547c421 is described below commit 547c4218124a6055a468152304d539d587071e2d Author: Xiaobing Fang <bin...@qq.com> AuthorDate: Tue Sep 17 09:01:12 2019 -0500 Add more config for auto-topic-creation (#4963) Master Issue: #4926 ### Motivation Curently the partitioned-topic and non-partitioned topic is a little confuse for users. in PR #3450 we add config for auto-topic-creation. We could leverage this config to provide some more config for auto-topic-creation. ### Modifications - Add `allowAutoTopicCreationType` and `allowAutoTopicCreationNumPartitions` to configuration. - Users can use both configurations when they decide to create a topic automatically. - Add test. - Update doc. --- conf/broker.conf | 6 ++ conf/standalone.conf | 9 ++ .../apache/pulsar/broker/ServiceConfiguration.java | 13 ++- .../apache/pulsar/broker/admin/AdminResource.java | 116 +++++++++++++++++++- .../pulsar/broker/admin/impl/BrokersBase.java | 7 ++ .../broker/admin/impl/PersistentTopicsBase.java | 52 +++++---- .../broker/admin/v1/NonPersistentTopics.java | 18 +++- .../pulsar/broker/admin/v1/PersistentTopics.java | 5 +- .../broker/admin/v2/NonPersistentTopics.java | 19 +++- .../pulsar/broker/admin/v2/PersistentTopics.java | 6 +- .../pulsar/broker/admin/PersistentTopicsTest.java | 6 +- .../broker/auth/MockedPulsarServiceBaseTest.java | 1 + .../broker/service/BacklogQuotaManagerTest.java | 1 + .../broker/service/BrokerBkEnsemblesTests.java | 1 + .../broker/service/BrokerBookieIsolationTest.java | 4 + .../BrokerServiceAutoTopicCreationTest.java | 120 +++++++++++++++++++++ .../pulsar/broker/service/ReplicatorTestBase.java | 3 + .../pulsar/client/api/NonPersistentTopicTest.java | 3 + .../client/api/SimpleProducerConsumerTest.java | 4 +- .../client/api/v1/V1_ProducerConsumerTest.java | 4 +- .../worker/PulsarFunctionE2ESecurityTest.java | 3 +- .../worker/PulsarFunctionLocalRunTest.java | 3 +- .../worker/PulsarFunctionPublishTest.java | 3 +- .../functions/worker/PulsarFunctionStateTest.java | 3 +- .../worker/PulsarWorkerAssignmentTest.java | 2 +- .../apache/pulsar/io/PulsarFunctionAdminTest.java | 2 +- .../apache/pulsar/io/PulsarFunctionE2ETest.java | 3 +- .../configurations/pulsar_broker_test.conf | 3 + pulsar-client-cpp/test-conf/standalone-ssl.conf | 9 ++ pulsar-client-cpp/test-conf/standalone.conf | 9 ++ pulsar-client-cpp/tests/standalone.conf | 9 ++ .../pulsar/client/impl/HttpLookupService.java | 3 +- .../pulsar/functions/worker/WorkerService.java | 3 + site2/docs/reference-configuration.md | 2 + .../pulsar/tests/integration/cli/CLITest.java | 6 ++ .../integration/functions/PulsarFunctionsTest.java | 27 ++++- 36 files changed, 442 insertions(+), 46 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index b491336..dcd39f3 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -88,6 +88,12 @@ ttlDurationDefaultInSeconds=0 # Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) allowAutoTopicCreation=true +# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) +allowAutoTopicCreationType=partitioned + +# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. +defaultNumPartitions=1 + # Enable the deletion of inactive topics brokerDeleteInactiveTopicsEnabled=true diff --git a/conf/standalone.conf b/conf/standalone.conf index 388c185..b25f21f 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -592,3 +592,12 @@ allowLoopback=true # will heart performance. It is better to give a higher number of gc # interval if there is enough disk capacity. gcWaitTime=300000 + +# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) +allowAutoTopicCreation=true + +# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) +allowAutoTopicCreationType=non-partitioned + +# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. +defaultNumPartitions=1 \ No newline at end of file diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index bff2b46..ffea06d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -895,10 +895,21 @@ public class ServiceConfiguration implements PulsarConfiguration { private double managedLedgerDefaultMarkDeleteRateLimit = 1.0; @FieldContext( category = CATEGORY_STORAGE_ML, - doc = "Allow automated creation of non-partition topics if set to true (default value)." + doc = "Allow automated creation of topics if set to true (default value)." ) private boolean allowAutoTopicCreation = true; @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "The type of topic that is allowed to be automatically created.(partitioned/non-partitioned)" + ) + private String allowAutoTopicCreationType = "partitioned"; + @FieldContext( + category = CATEGORY_STORAGE_ML, + doc = "The number of partitioned topics that is allowed to be automatically created" + + "if allowAutoTopicCreationType is partitioned." + ) + private int defaultNumPartitions = 1; + @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Number of threads to be used for managed ledger tasks dispatching" ) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java index 94409e1..ed497e8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java @@ -18,8 +18,10 @@ */ package org.apache.pulsar.broker.admin; +import com.fasterxml.jackson.core.JsonProcessingException; import static com.google.common.base.Preconditions.checkArgument; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import org.apache.pulsar.common.api.proto.PulsarApi; import static org.apache.pulsar.common.util.Codec.decode; import java.net.MalformedURLException; @@ -81,6 +83,7 @@ import com.google.common.collect.Lists; public abstract class AdminResource extends PulsarWebResource { private static final Logger log = LoggerFactory.getLogger(AdminResource.class); private static final String POLICIES_READONLY_FLAG_PATH = "/admin/flags/policies-readonly"; + private static final int PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS = 1000; public static final String PARTITIONED_TOPIC_PATH_ZNODE = "partitioned-topics"; protected ZooKeeper globalZk() { @@ -500,7 +503,7 @@ public abstract class AdminResource extends PulsarWebResource { } protected PartitionedTopicMetadata getPartitionedTopicMetadata(TopicName topicName, - boolean authoritative) { + boolean authoritative, boolean checkAllowAutoCreation) { validateClusterOwnership(topicName.getCluster()); // validates global-namespace contains local/peer cluster: if peer/local cluster present then lookup can // serve/redirect request else fail partitioned-metadata-request so, client fails while creating @@ -519,7 +522,12 @@ public abstract class AdminResource extends PulsarWebResource { } String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName()); - PartitionedTopicMetadata partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path); + PartitionedTopicMetadata partitionMetadata; + if (checkAllowAutoCreation) { + partitionMetadata = fetchPartitionedTopicMetadataCheckAllowAutoCreation(pulsar(), path, topicName); + } else { + partitionMetadata = fetchPartitionedTopicMetadata(pulsar(), path); + } if (log.isDebugEnabled()) { log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId(), topicName, @@ -566,6 +574,96 @@ public abstract class AdminResource extends PulsarWebResource { return metadataFuture; } + protected static PartitionedTopicMetadata fetchPartitionedTopicMetadataCheckAllowAutoCreation( + PulsarService pulsar, String path, TopicName topicName) { + try { + return fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName) + .get(); + } catch (Exception e) { + if (e.getCause() instanceof RestException) { + throw (RestException) e; + } + throw new RestException(e); + } + } + + protected static CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( + PulsarService pulsar, String path, TopicName topicName) { + CompletableFuture<PartitionedTopicMetadata> metadataFuture = new CompletableFuture<>(); + try { + boolean allowAutoTopicCreation = pulsar.getConfiguration().isAllowAutoTopicCreation(); + String topicType = pulsar.getConfiguration().getAllowAutoTopicCreationType(); + boolean topicExist; + try { + topicExist = pulsar.getNamespaceService() + .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .contains(topicName.toString()); + } catch (Exception e) { + log.warn("Unexpected error while getting list of topics. topic={}. Error: {}", + topicName, e.getMessage(), e); + throw new RestException(e); + } + fetchPartitionedTopicMetadataAsync(pulsar, path).whenCompleteAsync((metadata, ex) -> { + if (ex != null) { + metadataFuture.completeExceptionally(ex); + // If topic is already exist, creating partitioned topic is not allowed. + } else if (metadata.partitions == 0 && !topicExist && allowAutoTopicCreation && + TopicType.PARTITIONED.toString().equals(topicType)) { + createDefaultPartitionedTopicAsync(pulsar, path).whenComplete((defaultMetadata, e) -> { + if (e == null) { + metadataFuture.complete(defaultMetadata); + } else if (e instanceof KeeperException) { + try { + Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + if (!pulsar.getGlobalZkCache().exists(path)){ + metadataFuture.completeExceptionally(e); + return; + } + } catch (InterruptedException | KeeperException exc) { + metadataFuture.completeExceptionally(exc); + return; + } + fetchPartitionedTopicMetadataAsync(pulsar, path).whenComplete((metadata2, ex2) -> { + if (ex2 != null) { + metadataFuture.completeExceptionally(ex2); + } else { + metadataFuture.complete(metadata2); + } + }); + } else { + metadataFuture.completeExceptionally(e); + } + }); + } else { + metadataFuture.complete(metadata); + } + }); + } catch (Exception e) { + metadataFuture.completeExceptionally(e); + } + return metadataFuture; + } + + protected static CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync( + PulsarService pulsar, String path) { + int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); + checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); + PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); + CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>(); + try { + byte[] content = jsonMapper().writeValueAsBytes(configMetadata); + ZkUtils.createFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), path, content, + ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + // we wait for the data to be synced in all quorums and the observers + Thread.sleep(PARTITIONED_TOPIC_WAIT_SYNC_TIME_MS); + partitionedTopicFuture.complete(configMetadata); + } catch (JsonProcessingException | KeeperException | InterruptedException e) { + log.error("Failed to create default partitioned topic.", e); + partitionedTopicFuture.completeExceptionally(e); + } + return partitionedTopicFuture; + } + protected void validateClusterExists(String cluster) { try { if (!clustersCache().get(path("clusters", cluster)).isPresent()) { @@ -627,4 +725,18 @@ public abstract class AdminResource extends PulsarWebResource { partitionedTopics.sort(null); return partitionedTopics; } + + enum TopicType { + PARTITIONED("partitioned"), + NON_PARTITIONED("non-partitioned"); + private String type; + + TopicType(String type) { + this.type = type; + } + + public String toString() { + return type; + } + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java index 63e392a..fc529e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/BrokersBase.java @@ -262,6 +262,13 @@ public class BrokersBase extends AdminResource { PulsarClient client = pulsar().getClient(); String messageStr = UUID.randomUUID().toString(); + // create non-partitioned topic manually + try { + pulsar().getBrokerService().getTopic(topic, true).get(); + } catch (Exception e) { + asyncResponse.resume(new RestException(e)); + return; + } CompletableFuture<Producer<String>> producerFuture = client.newProducer(Schema.STRING).topic(topic).createAsync(); CompletableFuture<Reader<String>> readerFuture = client.newReader(Schema.STRING) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java index 9150f99..509b41d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java @@ -20,6 +20,7 @@ package org.apache.pulsar.broker.admin.impl; import static com.google.common.base.Preconditions.checkNotNull; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; +import org.apache.pulsar.common.api.proto.PulsarApi; import static org.apache.pulsar.common.util.Codec.decode; import com.github.zafarkhaja.semver.Version; @@ -375,6 +376,18 @@ public class PersistentTopicsBase extends AdminResource { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } try { + boolean topicExist = pulsar().getNamespaceService() + .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .contains(topicName.toString()); + if (topicExist) { + log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); + throw new RestException(Status.CONFLICT, "This topic already exists"); + } + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + throw new RestException(e); + } + try { String path = ZkAdminPaths.partitionedTopicPath(topicName); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); zkCreateOptimistic(path, data); @@ -444,8 +457,8 @@ public class PersistentTopicsBase extends AdminResource { } } - protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative) { - PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative); + protected PartitionedTopicMetadata internalGetPartitionedMetadata(boolean authoritative, boolean checkAllowAutoCreation) { + PartitionedTopicMetadata metadata = getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation); if (metadata.partitions > 1) { validateClientVersion(); } @@ -457,7 +470,7 @@ public class PersistentTopicsBase extends AdminResource { final CompletableFuture<Void> future = new CompletableFuture<>(); - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final AtomicInteger count = new AtomicInteger(numPartitions); @@ -590,7 +603,7 @@ public class PersistentTopicsBase extends AdminResource { final List<String> subscriptions = Lists.newArrayList(); - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { try { // get the subscriptions only from the 1st partition since all the other partitions will have the same @@ -685,7 +698,7 @@ public class PersistentTopicsBase extends AdminResource { protected void internalGetPartitionedStats(AsyncResponse asyncResponse, boolean authoritative, boolean perPartition) { - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions == 0) { throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); } @@ -743,7 +756,7 @@ public class PersistentTopicsBase extends AdminResource { } protected void internalGetPartitionedStatsInternal(AsyncResponse asyncResponse, boolean authoritative) { - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions == 0) { throw new RestException(Status.NOT_FOUND, "Partitioned Topic not found"); } @@ -786,7 +799,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { final List<CompletableFuture<Void>> futures = Lists.newArrayList(); @@ -855,7 +868,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { final List<CompletableFuture<Void>> futures = Lists.newArrayList(); @@ -920,7 +933,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Skip messages on a partitioned topic is not allowed"); } @@ -952,7 +965,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { final List<CompletableFuture<Void>> futures = Lists.newArrayList(); @@ -1027,7 +1040,7 @@ public class PersistentTopicsBase extends AdminResource { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final CompletableFuture<Void> future = new CompletableFuture<>(); @@ -1141,7 +1154,7 @@ public class PersistentTopicsBase extends AdminResource { log.info("[{}][{}] Creating subscription {} at message id {}", clientAppId(), topicName, subscriptionName, targetMessageId); - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); final int numPartitions = partitionMetadata.partitions; if (numPartitions > 0) { final CompletableFuture<Void> future = new CompletableFuture<>(); @@ -1249,7 +1262,7 @@ public class PersistentTopicsBase extends AdminResource { log.info("[{}][{}] received reset cursor on subscription {} to position {}", clientAppId(), topicName, subName, messageId); - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { log.warn("[{}] Not supported operation on partitioned-topic {} {}", clientAppId(), topicName, @@ -1288,7 +1301,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Peek messages on a partitioned topic is not allowed"); } @@ -1413,7 +1426,7 @@ public class PersistentTopicsBase extends AdminResource { if (topicName.isGlobal()) { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { throw new RestException(Status.METHOD_NOT_ALLOWED, "Termination of a partitioned topic is not allowed"); } @@ -1433,7 +1446,7 @@ public class PersistentTopicsBase extends AdminResource { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { final List<CompletableFuture<Void>> futures = Lists.newArrayList(); @@ -1489,7 +1502,7 @@ public class PersistentTopicsBase extends AdminResource { validateGlobalNamespaceOwnership(namespaceName); } - PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative); + PartitionedTopicMetadata partitionMetadata = getPartitionedTopicMetadata(topicName, authoritative, false); if (partitionMetadata.partitions > 0) { String msg = "This method should not be called for partitioned topic"; log.error("[{}] {} {} {}", clientAppId(), msg, topicName, subName); @@ -1595,7 +1608,8 @@ public class PersistentTopicsBase extends AdminResource { // serve/redirect request else fail partitioned-metadata-request so, client fails while creating // producer/consumer checkLocalOrGetPeerReplicationCluster(pulsar, topicName.getNamespaceObject()) - .thenCompose(res -> fetchPartitionedTopicMetadataAsync(pulsar, path)).thenAccept(metadata -> { + .thenCompose(res -> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(pulsar, path, topicName)) + .thenAccept(metadata -> { if (log.isDebugEnabled()) { log.debug("[{}] Total number of partitions for topic {} is {}", clientAppId, topicName, metadata.partitions); @@ -1632,7 +1646,7 @@ public class PersistentTopicsBase extends AdminResource { } PartitionedTopicMetadata partitionedTopicMetadata = getPartitionedTopicMetadata( - TopicName.get(topicName.getPartitionedTopicName()), false); + TopicName.get(topicName.getPartitionedTopicName()), false, false); if (partitionedTopicMetadata == null || partitionedTopicMetadata.partitions == 0) { final String topicErrorType = partitionedTopicMetadata == null ? "has no metadata" : "has zero partitions"; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java index f1347e3..96bc084 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java @@ -47,6 +47,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.naming.Constants; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; @@ -76,9 +77,10 @@ public class NonPersistentTopics extends PersistentTopics { public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) { validateTopicName(property, cluster, namespace, encodedTopic); - return getPartitionedTopicMetadata(topicName, authoritative); + return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation); } @GET @@ -125,6 +127,18 @@ public class NonPersistentTopics extends PersistentTopics { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } try { + boolean topicExist = pulsar().getNamespaceService() + .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .contains(topicName.toString()); + if (topicExist) { + log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); + throw new RestException(Status.CONFLICT, "This topic already exists"); + } + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + throw new RestException(e); + } + try { String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java index f895ce1..ebece5e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/PersistentTopics.java @@ -181,9 +181,10 @@ public class PersistentTopics extends PersistentTopicsBase { public PartitionedTopicMetadata getPartitionedMetadata(@PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("topic") @Encoded String encodedTopic, - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) { validateTopicName(property, cluster, namespace, encodedTopic); - return internalGetPartitionedMetadata(authoritative); + return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation); } @DELETE diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java index 8125f5b..10dc5ee 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java @@ -48,6 +48,7 @@ import org.apache.pulsar.broker.PulsarServerException; import org.apache.pulsar.broker.service.Topic; import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; import org.apache.pulsar.broker.web.RestException; +import org.apache.pulsar.common.api.proto.PulsarApi; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -86,9 +87,11 @@ public class NonPersistentTopics extends PersistentTopics { @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Is check configuration required to automatically create topic") + @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) { validateTopicName(tenant, namespace, encodedTopic); - return getPartitionedTopicMetadata(topicName, authoritative); + return getPartitionedTopicMetadata(topicName, authoritative, checkAllowAutoCreation); } @GET @@ -169,6 +172,18 @@ public class NonPersistentTopics extends PersistentTopics { throw new RestException(Status.NOT_ACCEPTABLE, "Number of partitions should be more than 0"); } try { + boolean topicExist = pulsar().getNamespaceService() + .getListOfTopics(topicName.getNamespaceObject(), PulsarApi.CommandGetTopicsOfNamespace.Mode.ALL) + .contains(topicName.toString()); + if (topicExist) { + log.warn("[{}] Failed to create already existing topic {}", clientAppId(), topicName); + throw new RestException(Status.CONFLICT, "This topic already exists"); + } + } catch (Exception e) { + log.error("[{}] Failed to create partitioned topic {}", clientAppId(), topicName, e); + throw new RestException(e); + } + try { String path = path(PARTITIONED_TOPIC_PATH_ZNODE, namespaceName.toString(), domain(), topicName.getEncodedLocalName()); byte[] data = jsonMapper().writeValueAsBytes(new PartitionedTopicMetadata(numPartitions)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index e09c012..70e0624 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -288,9 +288,11 @@ public class PersistentTopics extends PersistentTopicsBase { @ApiParam(value = "Specify topic name", required = true) @PathParam("topic") @Encoded String encodedTopic, @ApiParam(value = "Is authentication required to perform this operation") - @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { + @QueryParam("authoritative") @DefaultValue("false") boolean authoritative, + @ApiParam(value = "Is check configuration required to automatically create topic") + @QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) { validateTopicName(tenant, namespace, encodedTopic); - return internalGetPartitionedMetadata(authoritative); + return internalGetPartitionedMetadata(authoritative, checkAllowAutoCreation); } @DELETE diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java index fc89acc..3d8a29a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java @@ -211,8 +211,8 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String nonPartitionTopic2 = "secondary-non-partitioned-topic"; persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, nonPartitionTopic2, true); - Assert.assertEquals( - persistentTopics.getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true) .partitions, + Assert.assertEquals(persistentTopics + .getPartitionedMetadata(testTenant, testNamespace, nonPartitionTopic, true, false).partitions, 0); } @@ -221,7 +221,7 @@ public class PersistentTopicsTest extends MockedPulsarServiceBaseTest { final String topicName = "standard-topic"; persistentTopics.createNonPartitionedTopic(testTenant, testNamespace, topicName, true); PartitionedTopicMetadata pMetadata = persistentTopics.getPartitionedMetadata( - testTenant, testNamespace, topicName, true); + testTenant, testNamespace, topicName, true, false); Assert.assertEquals(pMetadata.partitions, 0); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java index 341b96a..868d591 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java @@ -104,6 +104,7 @@ public abstract class MockedPulsarServiceBaseTest { this.conf.setDefaultNumberOfNamespaceBundles(1); this.conf.setZookeeperServers("localhost:2181"); this.conf.setConfigurationStoreServers("localhost:3181"); + this.conf.setAllowAutoTopicCreationType("non-persistent"); } protected final void internalSetup() throws Exception { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index c4a387c..88a6603 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -88,6 +88,7 @@ public class BacklogQuotaManagerTest { config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config.setManagedLedgerMaxEntriesPerLedger(5); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); + config.setAllowAutoTopicCreationType("non-partitioned"); pulsar = new PulsarService(config); pulsar.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java index 940002d..f880734 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBkEnsemblesTests.java @@ -104,6 +104,7 @@ public class BrokerBkEnsemblesTests { config.setManagedLedgerMaxEntriesPerLedger(5); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); config.setAdvertisedAddress("127.0.0.1"); + config.setAllowAutoTopicCreationType("non-partitioned"); pulsar = new PulsarService(config); pulsar.start(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java index 3b9edc9..68f72b9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerBookieIsolationTest.java @@ -158,6 +158,8 @@ public class BrokerBookieIsolationTest { config.setManagedLedgerDefaultWriteQuorum(2); config.setManagedLedgerDefaultAckQuorum(2); + config.setAllowAutoTopicCreationType("non-partitioned"); + int totalEntriesPerLedger = 20; int totalLedgers = totalPublish / totalEntriesPerLedger; config.setManagedLedgerMaxEntriesPerLedger(totalEntriesPerLedger); @@ -288,6 +290,7 @@ public class BrokerBookieIsolationTest { config.setManagedLedgerDefaultEnsembleSize(2); config.setManagedLedgerDefaultWriteQuorum(2); config.setManagedLedgerDefaultAckQuorum(2); + config.setAllowAutoTopicCreationType("non-partitioned"); int totalEntriesPerLedger = 20; int totalLedgers = totalPublish / totalEntriesPerLedger; @@ -410,6 +413,7 @@ public class BrokerBookieIsolationTest { config.setManagedLedgerDefaultEnsembleSize(2); config.setManagedLedgerDefaultWriteQuorum(2); config.setManagedLedgerDefaultAckQuorum(2); + config.setAllowAutoTopicCreationType("non-partitioned"); config.setManagedLedgerMinLedgerRolloverTimeMinutes(0); pulsarService = new PulsarService(config); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java new file mode 100644 index 0000000..cd1aec9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.service; + +import org.apache.pulsar.client.api.PulsarClientException; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{ + + @BeforeClass + @Override + protected void setup() throws Exception { + super.baseSetup(); + } + + @AfterClass + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Test + public void testAutoNonPartitionedTopicCreation() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setAllowAutoTopicCreationType("non-partitioned"); + + final String topicName = "persistent://prop/ns-abc/non-partitioned-topic"; + final String subscriptionName = "non-partitioned-topic-sub"; + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + + assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); + assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName)); + } + + @Test + public void testAutoPartitionedTopicCreation() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned"); + pulsar.getConfiguration().setDefaultNumPartitions(3); + + final String topicName = "persistent://prop/ns-abc/partitioned-topic"; + final String subscriptionName = "partitioned-topic-sub"; + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + + assertTrue(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName)); + for (int i = 0; i < 3; i++) { + assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i)); + } + } + + @Test + public void testAutoTopicCreationDisable() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(false); + + final String topicName = "persistent://prop/ns-abc/test-topic"; + final String subscriptionName = "test-topic-sub"; + try { + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + } catch (Exception e) { + assertTrue(e instanceof PulsarClientException); + } + assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); + } + + @Test + public void testAutoTopicCreationDisableIfNonPartitionedTopicAlreadyExist() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned"); + pulsar.getConfiguration().setDefaultNumPartitions(3); + + final String topicName = "persistent://prop/ns-abc/test-topic-2"; + final String subscriptionName = "partitioned-topic-sub"; + admin.topics().createNonPartitionedTopic(topicName); + pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe(); + + assertFalse(admin.topics().getPartitionedTopicList("prop/ns-abc").contains(topicName)); + for (int i = 0; i < 3; i++) { + assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName + "-partition-" + i)); + } + assertTrue(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); + } + + /** + * CheckAllowAutoCreation's default value is false. + * So using getPartitionedTopicMetadata() directly will not produce partitioned topic + * even if the option to automatically create partitioned topic is configured + */ + @Test + public void testGetPartitionedMetadataWithoutCheckAllowAutoCreation() throws Exception{ + pulsar.getConfiguration().setAllowAutoTopicCreation(true); + pulsar.getConfiguration().setAllowAutoTopicCreationType("partitioned"); + pulsar.getConfiguration().setDefaultNumPartitions(3); + + final String topicName = "persistent://prop/ns-abc/test-topic-3"; + int partitions = admin.topics().getPartitionedTopicMetadata(topicName).partitions; + assertEquals(partitions, 0); + assertFalse(admin.namespaces().getTopics("prop/ns-abc").contains(topicName)); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java index ca5ba68..5864f48 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTestBase.java @@ -132,6 +132,7 @@ public class ReplicatorTestBase { config1.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config1.setDefaultNumberOfNamespaceBundles(1); + config1.setAllowAutoTopicCreationType("non-partitioned"); pulsar1 = new PulsarService(config1); pulsar1.start(); ns1 = pulsar1.getBrokerService(); @@ -165,6 +166,7 @@ public class ReplicatorTestBase { config2.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); config2.setDefaultNumberOfNamespaceBundles(1); + config2.setAllowAutoTopicCreationType("non-partitioned"); pulsar2 = new PulsarService(config2); pulsar2.start(); ns2 = pulsar2.getBrokerService(); @@ -198,6 +200,7 @@ public class ReplicatorTestBase { config3.setTlsKeyFilePath(TLS_SERVER_KEY_FILE_PATH); config3.setTlsTrustCertsFilePath(TLS_SERVER_CERT_FILE_PATH); config3.setDefaultNumberOfNamespaceBundles(1); + config3.setAllowAutoTopicCreationType("non-partitioned"); pulsar3 = new PulsarService(config3); pulsar3.start(); ns3 = pulsar3.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java index f70e6a1..b54265a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/NonPersistentTopicTest.java @@ -891,6 +891,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config1.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); config1.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config1.setAllowAutoTopicCreationType("non-partitioned"); pulsar1 = new PulsarService(config1); pulsar1.start(); ns1 = pulsar1.getBrokerService(); @@ -917,6 +918,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config2.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); config2.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA); + config2.setAllowAutoTopicCreationType("non-partitioned"); pulsar2 = new PulsarService(config2); pulsar2.start(); ns2 = pulsar2.getBrokerService(); @@ -942,6 +944,7 @@ public class NonPersistentTopicTest extends ProducerConsumerBase { config3.setBrokerServicePurgeInactiveFrequencyInSeconds( inSec(getBrokerServicePurgeInactiveFrequency(), TimeUnit.SECONDS)); config3.setBrokerServicePort(Optional.ofNullable(PortManager.nextFreePort())); + config3.setAllowAutoTopicCreationType("non-partitioned"); pulsar3 = new PulsarService(config3); pulsar3.start(); ns3 = pulsar3.getBrokerService(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 35d4c77..c2fad94 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -2309,7 +2309,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // (1) simple consumers Consumer<byte[]> consumer = pulsarClient.newConsumer() - .topic("persistent://my-property/my-ns/failAsyncReceive").subscriptionName("my-subscriber-name") + .topic("persistent://my-property/my-ns/failAsyncReceive-1").subscriptionName("my-subscriber-name") .subscribe(); consumer.close(); // receive messages @@ -2322,7 +2322,7 @@ public class SimpleProducerConsumerTest extends ProducerConsumerBase { // (2) Partitioned-consumer int numPartitions = 4; - TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive"); + TopicName topicName = TopicName.get("persistent://my-property/my-ns/failAsyncReceive-2"); admin.topics().createPartitionedTopic(topicName.toString(), numPartitions); Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber").subscribe(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java index 24fb5a7..7d9a8cd 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v1/V1_ProducerConsumerTest.java @@ -2040,7 +2040,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { // (1) simple consumers Consumer<byte[]> consumer = pulsarClient.newConsumer() - .topic("persistent://my-property/use/my-ns/failAsyncReceive") + .topic("persistent://my-property/use/my-ns/failAsyncReceive-1") .subscriptionName("my-subscriber-name") .subscribe(); consumer.close(); @@ -2054,7 +2054,7 @@ public class V1_ProducerConsumerTest extends V1_ProducerConsumerBase { // (2) Partitioned-consumer int numPartitions = 4; - TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive"); + TopicName topicName = TopicName.get("persistent://my-property/use/my-ns/failAsyncReceive-2"); admin.topics().createPartitionedTopic(topicName.toString(), numPartitions); Consumer<byte[]> partitionedConsumer = pulsarClient.newConsumer().topic(topicName.toString()) .subscriptionName("my-partitioned-subscriber") diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java index 54f05d8..3782286 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java @@ -89,7 +89,7 @@ public class PulsarFunctionE2ESecurityTest { final String TENANT2 = "tenant2"; final String NAMESPACE = "test-ns"; - String pulsarFunctionsNamespace = TENANT + "/use/pulsar-function-admin"; + String pulsarFunctionsNamespace = TENANT + "/pulsar-function-admin"; String primaryHost; String workerId; @@ -132,6 +132,7 @@ public class PulsarFunctionE2ESecurityTest { config.setBrokerServicePort(Optional.of(brokerServicePort)); config.setLoadManagerClassName(SimpleLoadManagerImpl.class.getName()); config.setAdvertisedAddress("localhost"); + config.setAllowAutoTopicCreationType("non-partitioned"); Set<String> providers = new HashSet<>(); providers.add(AuthenticationProviderToken.class.getName()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java index 97cabc3..8c2525e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java @@ -102,7 +102,7 @@ public class PulsarFunctionLocalRunTest { BrokerStats brokerStatsClient; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - String pulsarFunctionsNamespace = tenant + "/" + CLUSTER + "/pulsar-function-admin"; + String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; String workerId; @@ -180,6 +180,7 @@ public class PulsarFunctionLocalRunTest { "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setBrokerClientTlsEnabled(true); + config.setAllowAutoTopicCreationType("non-partitioned"); functionsWorkerService = createPulsarFunctionWorker(config); urlTls = new URL(brokerServiceUrl); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java index 9c2b7b7..efa0695 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java @@ -91,7 +91,7 @@ public class PulsarFunctionPublishTest { BrokerStats brokerStatsClient; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; String workerId; @@ -169,6 +169,7 @@ public class PulsarFunctionPublishTest { "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setBrokerClientTlsEnabled(true); + config.setAllowAutoTopicCreationType("non-partitioned"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java index 809fca7..d0b85c3 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionStateTest.java @@ -90,7 +90,7 @@ public class PulsarFunctionStateTest { BrokerStats brokerStatsClient; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; String workerId; @@ -168,6 +168,7 @@ public class PulsarFunctionStateTest { "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setBrokerClientTlsEnabled(true); + config.setAllowAutoTopicCreationType("non-partitioned"); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java index 55e4a70..27a7d69 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java @@ -70,7 +70,7 @@ public class PulsarWorkerAssignmentTest { BrokerStats brokerStatsClient; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - final String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + final String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; String workerId; diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java index ecfce1c..0c5db19 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java @@ -72,7 +72,7 @@ public class PulsarFunctionAdminTest { WorkerServer functionsWorkerServer; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; private final int ZOOKEEPER_PORT = PortManager.nextFreePort(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java index 847958f..d8f9cec 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java @@ -118,7 +118,7 @@ public class PulsarFunctionE2ETest { BrokerStats brokerStatsClient; WorkerService functionsWorkerService; final String tenant = "external-repl-prop"; - String pulsarFunctionsNamespace = tenant + "/use/pulsar-function-admin"; + String pulsarFunctionsNamespace = tenant + "/pulsar-function-admin"; String primaryHost; String workerId; @@ -194,6 +194,7 @@ public class PulsarFunctionE2ETest { "tlsCertFile:" + TLS_CLIENT_CERT_FILE_PATH + "," + "tlsKeyFile:" + TLS_CLIENT_KEY_FILE_PATH); config.setBrokerClientTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH); config.setBrokerClientTlsEnabled(true); + config.setAllowAutoTopicCreationType("non-partitioned"); diff --git a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf index c55955f..75b6d03 100644 --- a/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf +++ b/pulsar-broker/src/test/resources/configurations/pulsar_broker_test.conf @@ -33,6 +33,9 @@ backlogQuotaCheckIntervalInSeconds=60 backlogQuotaDefaultLimitGB=50 brokerDeleteInactiveTopicsEnabled=true brokerDeleteInactiveTopicsFrequencySeconds=60 +allowAutoTopicCreation=true +allowAutoTopicCreationType=non-partitioned +defaultNumPartitions=1 messageExpiryCheckIntervalInMinutes=5 clientLibraryVersionCheckEnabled=false clientLibraryVersionCheckAllowUnversioned=true diff --git a/pulsar-client-cpp/test-conf/standalone-ssl.conf b/pulsar-client-cpp/test-conf/standalone-ssl.conf index 426ba43..6ab4406 100644 --- a/pulsar-client-cpp/test-conf/standalone-ssl.conf +++ b/pulsar-client-cpp/test-conf/standalone-ssl.conf @@ -278,3 +278,12 @@ keepAliveIntervalSeconds=30 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) brokerServicePurgeInactiveFrequencyInSeconds=60 + +# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) +allowAutoTopicCreation=true + +# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) +allowAutoTopicCreationType=non-partitioned + +# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. +defaultNumPartitions=1 \ No newline at end of file diff --git a/pulsar-client-cpp/test-conf/standalone.conf b/pulsar-client-cpp/test-conf/standalone.conf index 6f799c1..2de6a37 100644 --- a/pulsar-client-cpp/test-conf/standalone.conf +++ b/pulsar-client-cpp/test-conf/standalone.conf @@ -263,3 +263,12 @@ keepAliveIntervalSeconds=30 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) brokerServicePurgeInactiveFrequencyInSeconds=60 + +# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) +allowAutoTopicCreation=true + +# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) +allowAutoTopicCreationType=non-partitioned + +# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. +defaultNumPartitions=1 \ No newline at end of file diff --git a/pulsar-client-cpp/tests/standalone.conf b/pulsar-client-cpp/tests/standalone.conf index 8a01642..857285a 100644 --- a/pulsar-client-cpp/tests/standalone.conf +++ b/pulsar-client-cpp/tests/standalone.conf @@ -266,3 +266,12 @@ keepAliveIntervalSeconds=30 # How often broker checks for inactive topics to be deleted (topics with no subscriptions and no one connected) brokerServicePurgeInactiveFrequencyInSeconds=60 + +# Enable topic auto creation if new producer or consumer connected (disable auto creation with value false) +allowAutoTopicCreation=true + +# The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) +allowAutoTopicCreationType=non-partitioned + +# The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned. +defaultNumPartitions=1 diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java index 4fe030d..e5334df 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java @@ -103,7 +103,8 @@ class HttpLookupService implements LookupService { public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName) { String format = topicName.isV2() ? "admin/v2/%s/partitions" : "admin/%s/partitions"; - return httpClient.get(String.format(format, topicName.getLookupName()), PartitionedTopicMetadata.class); + return httpClient.get(String.format(format, topicName.getLookupName()) + "?checkAllowAutoCreation=true", + PartitionedTopicMetadata.class); } public String getServiceUrl() { diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java index 9ec9688..0d73915 100644 --- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java +++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerService.java @@ -148,6 +148,9 @@ public class WorkerService { } log.info("Created Pulsar client"); + brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionAssignmentTopic()); + brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getClusterCoordinationTopic()); + brokerAdmin.topics().createNonPartitionedTopic(workerConfig.getFunctionMetadataTopic()); //create scheduler manager this.schedulerManager = new SchedulerManager(this.workerConfig, this.client, this.brokerAdmin, this.executor); diff --git a/site2/docs/reference-configuration.md b/site2/docs/reference-configuration.md index bb549ae..5261bae 100644 --- a/site2/docs/reference-configuration.md +++ b/site2/docs/reference-configuration.md @@ -127,6 +127,8 @@ Pulsar brokers are responsible for handling incoming messages from producers, di |backlogQuotaCheckIntervalInSeconds| How often to check for topics that have reached the quota |60| |backlogQuotaDefaultLimitGB| Default per-topic backlog quota limit |10| |allowAutoTopicCreation| Enable topic auto creation if new producer or consumer connected |true| +|allowAutoTopicCreationType| The type of topic that is allowed to be automatically created.(partitioned/non-partitioned) |non-partitioned| +|defaultNumPartitions| The number of partitioned topics that is allowed to be automatically created if allowAutoTopicCreationType is partitioned |1| |brokerDeleteInactiveTopicsEnabled| Enable the deletion of inactive topics |true| |brokerDeleteInactiveTopicsFrequencySeconds| How often to check for inactive topics |60| |messageExpiryCheckIntervalInMinutes| How frequently to proactively check and purge expired messages |5| diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java index 3af34d7..ca39cd4 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/cli/CLITest.java @@ -93,6 +93,12 @@ public class CLITest extends PulsarTestSuite { public void testTopicTerminationOnTopicsWithoutConnectedConsumers() throws Exception { String topicName = "persistent://public/default/test-topic-termination"; BrokerContainer container = pulsarCluster.getAnyBroker(); + container.execCmd( + PulsarCluster.ADMIN_SCRIPT, + "topics", + "create", + topicName); + ContainerExecResult result = container.execCmd( PulsarCluster.CLIENT_SCRIPT, "produce", diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 23beec8..f1e2d1a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -547,6 +547,9 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { .build(); @Cleanup + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); + admin.topics().createNonPartitionedTopic(outputTopicName); + @Cleanup Consumer<String> consumer = client.newConsumer(Schema.STRING) .topic(outputTopicName) .subscriptionName("source-tester") @@ -869,6 +872,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); retryStrategically((test) -> { try { return admin.topics().getStats(inputTopicName).subscriptions.size() == 1; @@ -957,7 +962,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-input-" + randomName(8); String outputTopicName = "test-" + type + "-count-window-" + functionRuntimeType + "-output-" + randomName(8); - + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); + } CommandGenerator generator = CommandGenerator.createDefaultGenerator( inputTopicName, @@ -1114,6 +1122,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName = "persistent://public/default/test-neg-ack-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-neg-ack-" + runtime + "-output-" + randomName(8); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); + } String functionName = "test-neg-ack-fn-" + randomName(8); final int numMessages = 20; @@ -1290,6 +1302,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName = "persistent://public/default/test-publish-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-publish-" + runtime + "-output-" + randomName(8); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); + } String functionName = "test-publish-fn-" + randomName(8); final int numMessages = 10; @@ -1416,6 +1432,10 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { String inputTopicName = "persistent://public/default/test-exclamation-" + runtime + "-input-" + randomName(8); String outputTopicName = "test-exclamation-" + runtime + "-output-" + randomName(8); + try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build()) { + admin.topics().createNonPartitionedTopic(inputTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); + } if (isTopicPattern) { @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) @@ -1928,6 +1948,11 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { .build(); @Cleanup + PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(pulsarCluster.getHttpServiceUrl()).build(); + admin.topics().createNonPartitionedTopic(consumeTopicName); + admin.topics().createNonPartitionedTopic(outputTopicName); + + @Cleanup Consumer<KeyValue<byte[], byte[]>> consumer = client.newConsumer(KeyValueSchema.kvBytes()) .topic(consumeTopicName) .subscriptionName("debezium-source-tester")