sijie closed pull request #1535: Issue #1536: Introduce Short Topic Name URL: https://github.com/apache/incubator-pulsar/pull/1535
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index acb5b6db7f..80e1a5c398 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -20,13 +20,19 @@ import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; +import com.google.common.collect.Lists; import java.io.IOException; +import java.util.List; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; @@ -143,8 +149,54 @@ public static void main(String[] args) throws Exception { // Ignore } + // Create public tenant + PropertyAdmin publicProperty = new PropertyAdmin(); + byte[] publicPropertyDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty); + try { + ZkUtils.createFullPathOptimistic( + globalZk, + POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY, + publicPropertyDataJson, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } + + // Create default namespace + Policies policies = new Policies(); + policies.bundles = getBundles(4); + byte[] defaultNamespaceDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies); + try { + ZkUtils.createFullPathOptimistic( + globalZk, + POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE, + defaultNamespaceDataJson, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } + log.info("Cluster metadata for '{}' setup correctly", arguments.cluster); } + private static BundlesData getBundles(int numBundles) { + Long maxVal = ((long) 1) << 32; + Long segSize = maxVal / numBundles; + List<String> partitions = Lists.newArrayList(); + partitions.add(String.format("0x%08x", 0l)); + Long curPartition = segSize; + for (int i = 0; i < numBundles; i++) { + if (i != numBundles - 1) { + partitions.add(String.format("0x%08x", curPartition)); + } else { + partitions.add(String.format("0x%08x", maxVal - 1)); + } + curPartition += segSize; + } + return new BundlesData(partitions); + } + private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index 68c307aa33..a5d0a43e4b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -31,6 +31,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -203,13 +204,14 @@ void start() throws Exception { broker = new PulsarService(config, Optional.ofNullable(fnWorkerService)); broker.start(); - // Create a sample namespace URL webServiceUrl = new URL( String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort())); final String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(), config.getBrokerServicePort()); admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication( config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build(); + + // Create a sample namespace final String property = "sample"; final String cluster = config.getClusterName(); final String globalCluster = "global"; @@ -240,6 +242,23 @@ void start() throws Exception { log.info(e.getMessage()); } + // Create a public tenant and default namespace + final String publicTenant = TopicName.PUBLIC_PROPERTY; + final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE; + try { + if (!admin.properties().getProperties().contains(publicTenant)) { + admin.properties().createProperty( + publicTenant, + new PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster))); + } + if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) { + admin.namespaces().createNamespace(defaultNamespace); + admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(config.getClusterName())); + } + } catch (PulsarAdminException e) { + log.info(e.getMessage()); + } + if (null != fnWorkerService) { fnWorkerService.start(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index aa7dd4ad86..e0a8d2e55b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -22,6 +22,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,9 @@ private static final Logger log = LoggerFactory.getLogger(TopicName.class); + public static final String PUBLIC_PROPERTY = "public"; + public static final String DEFAULT_NAMESPACE = "default"; + private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; private final String completeTopicName; @@ -98,15 +102,29 @@ public static boolean isValid(String topic) { } private TopicName(String completeTopicName) { - this.completeTopicName = completeTopicName; try { - // The topic name can be in two different forms: - // new: persistent://property/namespace/topic - // legacy: persistent://property/cluster/namespace/topic + // The topic name can be in two different forms, one is fully qualified topic name, + // the other one is short topic name if (!completeTopicName.contains("://")) { - throw new IllegalArgumentException( - "Invalid topic name: " + completeTopicName + " -- Domain is missing"); + // The short topic name can be: + // - <topic> + // - <property>/<namespace>/<topic> + String[] parts = StringUtils.split(completeTopicName, '/'); + if (parts.length == 3) { + completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName; + } else if (parts.length == 1) { + completeTopicName = TopicDomain.persistent.name() + "://" + PUBLIC_PROPERTY + "/" + DEFAULT_NAMESPACE + "/" + parts[0]; + } else { + throw new IllegalArgumentException( + "Invalid short topic name '" + completeTopicName + "', it should be in the format of " + + "<tenant>/<namespace>/<topic> or <topic>"); + } } + this.completeTopicName = completeTopicName; + + // The fully qualified topic name can be in two different forms: + // new: persistent://property/namespace/topic + // legacy: persistent://property/cluster/namespace/topic List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName); this.domain = TopicDomain.getEnum(parts.get(0)); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 70ba953e88..138f2dcf67 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -22,8 +22,6 @@ import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Codec; import org.testng.annotations.Test; @@ -33,7 +31,7 @@ @Test void topic() { try { - assertEquals(TopicName.get("property.namespace:topic").getNamespace(), "property.namespace"); + TopicName.get("://property.namespace:topic").getNamespace(); fail("Should have thrown exception"); } catch (IllegalArgumentException e) { // Expected @@ -70,42 +68,42 @@ void topic() { "topic"); try { - TopicName.get("property.namespace:my-topic").getDomain(); + TopicName.get("://property.namespace:my-topic").getDomain(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getProperty(); + TopicName.get("://property.namespace:my-topic").getProperty(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getCluster(); + TopicName.get("://property.namespace:my-topic").getCluster(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getNamespacePortion(); + TopicName.get("://property.namespace:my-topic").getNamespacePortion(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getLocalName(); + TopicName.get("://property.namespace:my-topic").getLocalName(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace"); + TopicName.get("://property.namespace"); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok @@ -164,14 +162,14 @@ void topic() { .getPersistenceNamingEncoding(), "property/cluster/namespace/persistent/topic"); try { - TopicName.get("property.namespace"); + TopicName.get("://property.namespace"); fail("Should have raied exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property/cluster/namespace"); + TopicName.get("://property/cluster/namespace"); fail("Should have raied exception"); } catch (IllegalArgumentException e) { // Ok @@ -240,4 +238,33 @@ public void testTopicNameWithoutCluster() throws Exception { assertEquals(topicName.getPartitionedTopicName(), "persistent://property/namespace/topic"); assertEquals(topicName.getPersistenceNamingEncoding(), "property/namespace/persistent/topic"); } + + @Test + public void testShortTopicName() throws Exception { + TopicName tn = TopicName.get("short-topic"); + assertEquals(TopicDomain.persistent, tn.getDomain()); + assertEquals(TopicName.PUBLIC_PROPERTY, tn.getProperty()); + assertEquals(TopicName.DEFAULT_NAMESPACE, tn.getNamespacePortion()); + assertEquals("short-topic", tn.getLocalName()); + + tn = TopicName.get("test-tenant/test-namespace/test-short-topic"); + assertEquals(TopicDomain.persistent, tn.getDomain()); + assertEquals("test-tenant", tn.getProperty()); + assertEquals("test-namespace", tn.getNamespacePortion()); + assertEquals("test-short-topic", tn.getLocalName()); + + try { + TopicName.get("pulsar/cluster/namespace/test"); + fail("Should have raised exception"); + } catch (IllegalArgumentException e) { + // Ok + } + + try { + TopicName.get("pulsar/cluster"); + fail("Should have raised exception"); + } catch (IllegalArgumentException e) { + // Ok + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services