This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 61de30d Issue #1536: Introduce Short Topic Name (#1535) 61de30d is described below commit 61de30d09488395e3c19cf9232e755528a52d342 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Thu Apr 12 14:17:33 2018 -0700 Issue #1536: Introduce Short Topic Name (#1535) This closes #1536 --- .../apache/pulsar/PulsarClusterMetadataSetup.java | 52 ++++++++++++++++++++++ .../org/apache/pulsar/PulsarStandaloneStarter.java | 21 ++++++++- .../org/apache/pulsar/common/naming/TopicName.java | 30 ++++++++++--- .../apache/pulsar/common/naming/TopicNameTest.java | 49 +++++++++++++++----- 4 files changed, 134 insertions(+), 18 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java index acb5b6d..80e1a5c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java @@ -20,13 +20,19 @@ package org.apache.pulsar; import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT; +import com.google.common.collect.Lists; import java.io.IOException; +import java.util.List; import org.apache.bookkeeper.client.BookKeeperAdmin; import org.apache.bookkeeper.conf.ServerConfiguration; import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory; import org.apache.bookkeeper.util.ZkUtils; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.policies.data.BundlesData; import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.Policies; +import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType; @@ -143,8 +149,54 @@ public class PulsarClusterMetadataSetup { // Ignore } + // Create public tenant + PropertyAdmin publicProperty = new PropertyAdmin(); + byte[] publicPropertyDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty); + try { + ZkUtils.createFullPathOptimistic( + globalZk, + POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY, + publicPropertyDataJson, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } + + // Create default namespace + Policies policies = new Policies(); + policies.bundles = getBundles(4); + byte[] defaultNamespaceDataJson = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies); + try { + ZkUtils.createFullPathOptimistic( + globalZk, + POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE, + defaultNamespaceDataJson, + ZooDefs.Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + } catch (NodeExistsException e) { + // Ignore + } + log.info("Cluster metadata for '{}' setup correctly", arguments.cluster); } + private static BundlesData getBundles(int numBundles) { + Long maxVal = ((long) 1) << 32; + Long segSize = maxVal / numBundles; + List<String> partitions = Lists.newArrayList(); + partitions.add(String.format("0x%08x", 0l)); + Long curPartition = segSize; + for (int i = 0; i < numBundles; i++) { + if (i != numBundles - 1) { + partitions.add(String.format("0x%08x", curPartition)); + } else { + partitions.add(String.format("0x%08x", maxVal - 1)); + } + curPartition += segSize; + } + return new BundlesData(partitions); + } + private static final Logger log = LoggerFactory.getLogger(PulsarClusterMetadataSetup.class); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java index 68c307a..a5d0a43 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java @@ -31,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.PropertyAdmin; import org.apache.pulsar.functions.worker.WorkerConfig; @@ -203,13 +204,14 @@ public class PulsarStandaloneStarter { broker = new PulsarService(config, Optional.ofNullable(fnWorkerService)); broker.start(); - // Create a sample namespace URL webServiceUrl = new URL( String.format("http://%s:%d", config.getAdvertisedAddress(), config.getWebServicePort())); final String brokerServiceUrl = String.format("pulsar://%s:%d", config.getAdvertisedAddress(), config.getBrokerServicePort()); admin = PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication( config.getBrokerClientAuthenticationPlugin(), config.getBrokerClientAuthenticationParameters()).build(); + + // Create a sample namespace final String property = "sample"; final String cluster = config.getClusterName(); final String globalCluster = "global"; @@ -240,6 +242,23 @@ public class PulsarStandaloneStarter { log.info(e.getMessage()); } + // Create a public tenant and default namespace + final String publicTenant = TopicName.PUBLIC_PROPERTY; + final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + TopicName.DEFAULT_NAMESPACE; + try { + if (!admin.properties().getProperties().contains(publicTenant)) { + admin.properties().createProperty( + publicTenant, + new PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), Sets.newHashSet(cluster))); + } + if (!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) { + admin.namespaces().createNamespace(defaultNamespace); + admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, Sets.newHashSet(config.getClusterName())); + } + } catch (PulsarAdminException e) { + log.info(e.getMessage()); + } + if (null != fnWorkerService) { fnWorkerService.start(); } diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java index aa7dd4a..e0a8d2e 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.Codec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +41,9 @@ public class TopicName implements ServiceUnitId { private static final Logger log = LoggerFactory.getLogger(TopicName.class); + public static final String PUBLIC_PROPERTY = "public"; + public static final String DEFAULT_NAMESPACE = "default"; + private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-"; private final String completeTopicName; @@ -98,15 +102,29 @@ public class TopicName implements ServiceUnitId { } private TopicName(String completeTopicName) { - this.completeTopicName = completeTopicName; try { - // The topic name can be in two different forms: - // new: persistent://property/namespace/topic - // legacy: persistent://property/cluster/namespace/topic + // The topic name can be in two different forms, one is fully qualified topic name, + // the other one is short topic name if (!completeTopicName.contains("://")) { - throw new IllegalArgumentException( - "Invalid topic name: " + completeTopicName + " -- Domain is missing"); + // The short topic name can be: + // - <topic> + // - <property>/<namespace>/<topic> + String[] parts = StringUtils.split(completeTopicName, '/'); + if (parts.length == 3) { + completeTopicName = TopicDomain.persistent.name() + "://" + completeTopicName; + } else if (parts.length == 1) { + completeTopicName = TopicDomain.persistent.name() + "://" + PUBLIC_PROPERTY + "/" + DEFAULT_NAMESPACE + "/" + parts[0]; + } else { + throw new IllegalArgumentException( + "Invalid short topic name '" + completeTopicName + "', it should be in the format of " + + "<tenant>/<namespace>/<topic> or <topic>"); + } } + this.completeTopicName = completeTopicName; + + // The fully qualified topic name can be in two different forms: + // new: persistent://property/namespace/topic + // legacy: persistent://property/cluster/namespace/topic List<String> parts = Splitter.on("://").limit(2).splitToList(completeTopicName); this.domain = TopicDomain.getEnum(parts.get(0)); diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java index 70ba953..138f2dc 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java @@ -22,8 +22,6 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.fail; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.Codec; import org.testng.annotations.Test; @@ -33,7 +31,7 @@ public class TopicNameTest { @Test void topic() { try { - assertEquals(TopicName.get("property.namespace:topic").getNamespace(), "property.namespace"); + TopicName.get("://property.namespace:topic").getNamespace(); fail("Should have thrown exception"); } catch (IllegalArgumentException e) { // Expected @@ -70,42 +68,42 @@ public class TopicNameTest { "topic"); try { - TopicName.get("property.namespace:my-topic").getDomain(); + TopicName.get("://property.namespace:my-topic").getDomain(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getProperty(); + TopicName.get("://property.namespace:my-topic").getProperty(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getCluster(); + TopicName.get("://property.namespace:my-topic").getCluster(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getNamespacePortion(); + TopicName.get("://property.namespace:my-topic").getNamespacePortion(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace:my-topic").getLocalName(); + TopicName.get("://property.namespace:my-topic").getLocalName(); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property.namespace"); + TopicName.get("://property.namespace"); fail("Should have raised exception"); } catch (IllegalArgumentException e) { // Ok @@ -164,14 +162,14 @@ public class TopicNameTest { .getPersistenceNamingEncoding(), "property/cluster/namespace/persistent/topic"); try { - TopicName.get("property.namespace"); + TopicName.get("://property.namespace"); fail("Should have raied exception"); } catch (IllegalArgumentException e) { // Ok } try { - TopicName.get("property/cluster/namespace"); + TopicName.get("://property/cluster/namespace"); fail("Should have raied exception"); } catch (IllegalArgumentException e) { // Ok @@ -240,4 +238,33 @@ public class TopicNameTest { assertEquals(topicName.getPartitionedTopicName(), "persistent://property/namespace/topic"); assertEquals(topicName.getPersistenceNamingEncoding(), "property/namespace/persistent/topic"); } + + @Test + public void testShortTopicName() throws Exception { + TopicName tn = TopicName.get("short-topic"); + assertEquals(TopicDomain.persistent, tn.getDomain()); + assertEquals(TopicName.PUBLIC_PROPERTY, tn.getProperty()); + assertEquals(TopicName.DEFAULT_NAMESPACE, tn.getNamespacePortion()); + assertEquals("short-topic", tn.getLocalName()); + + tn = TopicName.get("test-tenant/test-namespace/test-short-topic"); + assertEquals(TopicDomain.persistent, tn.getDomain()); + assertEquals("test-tenant", tn.getProperty()); + assertEquals("test-namespace", tn.getNamespacePortion()); + assertEquals("test-short-topic", tn.getLocalName()); + + try { + TopicName.get("pulsar/cluster/namespace/test"); + fail("Should have raised exception"); + } catch (IllegalArgumentException e) { + // Ok + } + + try { + TopicName.get("pulsar/cluster"); + fail("Should have raised exception"); + } catch (IllegalArgumentException e) { + // Ok + } + } } -- To stop receiving notification emails like this one, please contact si...@apache.org.