sijie closed pull request #1535: Issue #1536: Introduce Short Topic Name
URL: https://github.com/apache/incubator-pulsar/pull/1535
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index acb5b6db7f..80e1a5c398 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -20,13 +20,19 @@
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -143,8 +149,54 @@ public static void main(String[] args) throws Exception {
             // Ignore
         }
 
+        // Create public tenant
+        PropertyAdmin publicProperty = new PropertyAdmin();
+        byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
+        try {
+            ZkUtils.createFullPathOptimistic(
+                globalZk,
+                POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY,
+                publicPropertyDataJson,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        } catch (NodeExistsException e) {
+            // Ignore
+        }
+
+        // Create default namespace
+        Policies policies = new Policies();
+        policies.bundles = getBundles(4);
+        byte[] defaultNamespaceDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
+        try {
+            ZkUtils.createFullPathOptimistic(
+                globalZk,
+                POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE,
+                defaultNamespaceDataJson,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        } catch (NodeExistsException e) {
+            // Ignore
+        }
+
         log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);
     }
 
+    private static BundlesData getBundles(int numBundles) {
+        Long maxVal = ((long) 1) << 32;
+        Long segSize = maxVal / numBundles;
+        List<String> partitions = Lists.newArrayList();
+        partitions.add(String.format("0x%08x", 0l));
+        Long curPartition = segSize;
+        for (int i = 0; i < numBundles; i++) {
+            if (i != numBundles - 1) {
+                partitions.add(String.format("0x%08x", curPartition));
+            } else {
+                partitions.add(String.format("0x%08x", maxVal - 1));
+            }
+            curPartition += segSize;
+        }
+        return new BundlesData(partitions);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 68c307aa33..a5d0a43e4b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -203,13 +204,14 @@ void start() throws Exception {
         broker = new PulsarService(config, 
Optional.ofNullable(fnWorkerService));
         broker.start();
 
-        // Create a sample namespace
         URL webServiceUrl = new URL(
                 String.format("http://%s:%d";, config.getAdvertisedAddress(), 
config.getWebServicePort()));
         final String brokerServiceUrl = String.format("pulsar://%s:%d", 
config.getAdvertisedAddress(),
                 config.getBrokerServicePort());
         admin = 
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
                 config.getBrokerClientAuthenticationPlugin(), 
config.getBrokerClientAuthenticationParameters()).build();
+
+        // Create a sample namespace
         final String property = "sample";
         final String cluster = config.getClusterName();
         final String globalCluster = "global";
@@ -240,6 +242,23 @@ void start() throws Exception {
             log.info(e.getMessage());
         }
 
+        // Create a public tenant and default namespace
+        final String publicTenant = TopicName.PUBLIC_PROPERTY;
+        final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE;
+        try {
+            if (!admin.properties().getProperties().contains(publicTenant)) {
+                admin.properties().createProperty(
+                    publicTenant,
+                    new 
PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+            }
+            if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+                admin.namespaces().createNamespace(defaultNamespace);
+                
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(config.getClusterName()));
+            }
+        } catch (PulsarAdminException e) {
+            log.info(e.getMessage());
+        }
+
         if (null != fnWorkerService) {
             fnWorkerService.start();
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index aa7dd4ad86..e0a8d2e55b 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,9 @@
 
     private static final Logger log = LoggerFactory.getLogger(TopicName.class);
 
+    public static final String PUBLIC_PROPERTY = "public";
+    public static final String DEFAULT_NAMESPACE = "default";
+
     private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
 
     private final String completeTopicName;
@@ -98,15 +102,29 @@ public static boolean isValid(String topic) {
     }
 
     private TopicName(String completeTopicName) {
-        this.completeTopicName = completeTopicName;
         try {
-            // The topic name can be in two different forms:
-            // new:    persistent://property/namespace/topic
-            // legacy: persistent://property/cluster/namespace/topic
+            // The topic name can be in two different forms, one is fully 
qualified topic name,
+            // the other one is short topic name
             if (!completeTopicName.contains("://")) {
-                throw new IllegalArgumentException(
-                        "Invalid topic name: " + completeTopicName + " -- 
Domain is missing");
+                // The short topic name can be:
+                // - <topic>
+                // - <property>/<namespace>/<topic>
+                String[] parts = StringUtils.split(completeTopicName, '/');
+                if (parts.length == 3) {
+                    completeTopicName = TopicDomain.persistent.name() + "://" 
+ completeTopicName;
+                } else if (parts.length == 1) {
+                    completeTopicName = TopicDomain.persistent.name() + "://" 
+ PUBLIC_PROPERTY + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
+                } else {
+                    throw new IllegalArgumentException(
+                        "Invalid short topic name '" + completeTopicName + "', 
it should be in the format of "
+                        + "<tenant>/<namespace>/<topic> or <topic>");
+                }
             }
+            this.completeTopicName = completeTopicName;
+
+            // The fully qualified topic name can be in two different forms:
+            // new:    persistent://property/namespace/topic
+            // legacy: persistent://property/cluster/namespace/topic
 
             List<String> parts = 
Splitter.on("://").limit(2).splitToList(completeTopicName);
             this.domain = TopicDomain.getEnum(parts.get(0));
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 70ba953e88..138f2dcf67 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -22,8 +22,6 @@
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.fail;
 
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.testng.annotations.Test;
 
@@ -33,7 +31,7 @@
     @Test
     void topic() {
         try {
-            
assertEquals(TopicName.get("property.namespace:topic").getNamespace(), 
"property.namespace");
+            TopicName.get("://property.namespace:topic").getNamespace();
             fail("Should have thrown exception");
         } catch (IllegalArgumentException e) {
             // Expected
@@ -70,42 +68,42 @@ void topic() {
                 "topic");
 
         try {
-            TopicName.get("property.namespace:my-topic").getDomain();
+            TopicName.get("://property.namespace:my-topic").getDomain();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getProperty();
+            TopicName.get("://property.namespace:my-topic").getProperty();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getCluster();
+            TopicName.get("://property.namespace:my-topic").getCluster();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getNamespacePortion();
+            
TopicName.get("://property.namespace:my-topic").getNamespacePortion();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getLocalName();
+            TopicName.get("://property.namespace:my-topic").getLocalName();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace");
+            TopicName.get("://property.namespace");
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
@@ -164,14 +162,14 @@ void topic() {
                 .getPersistenceNamingEncoding(), 
"property/cluster/namespace/persistent/topic");
 
         try {
-            TopicName.get("property.namespace");
+            TopicName.get("://property.namespace");
             fail("Should have raied exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property/cluster/namespace");
+            TopicName.get("://property/cluster/namespace");
             fail("Should have raied exception");
         } catch (IllegalArgumentException e) {
             // Ok
@@ -240,4 +238,33 @@ public void testTopicNameWithoutCluster() throws Exception 
{
         assertEquals(topicName.getPartitionedTopicName(), 
"persistent://property/namespace/topic");
         assertEquals(topicName.getPersistenceNamingEncoding(), 
"property/namespace/persistent/topic");
     }
+
+    @Test
+    public void testShortTopicName() throws Exception {
+        TopicName tn = TopicName.get("short-topic");
+        assertEquals(TopicDomain.persistent, tn.getDomain());
+        assertEquals(TopicName.PUBLIC_PROPERTY, tn.getProperty());
+        assertEquals(TopicName.DEFAULT_NAMESPACE, tn.getNamespacePortion());
+        assertEquals("short-topic", tn.getLocalName());
+
+        tn = TopicName.get("test-tenant/test-namespace/test-short-topic");
+        assertEquals(TopicDomain.persistent, tn.getDomain());
+        assertEquals("test-tenant", tn.getProperty());
+        assertEquals("test-namespace", tn.getNamespacePortion());
+        assertEquals("test-short-topic", tn.getLocalName());
+
+        try {
+            TopicName.get("pulsar/cluster/namespace/test");
+            fail("Should have raised exception");
+        } catch (IllegalArgumentException e) {
+            // Ok
+        }
+
+        try {
+            TopicName.get("pulsar/cluster");
+            fail("Should have raised exception");
+        } catch (IllegalArgumentException e) {
+            // Ok
+        }
+    }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to