This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 61de30d  Issue #1536: Introduce Short Topic Name (#1535)
61de30d is described below

commit 61de30d09488395e3c19cf9232e755528a52d342
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Thu Apr 12 14:17:33 2018 -0700

    Issue #1536: Introduce Short Topic Name (#1535)
    
    This closes #1536
---
 .../apache/pulsar/PulsarClusterMetadataSetup.java  | 52 ++++++++++++++++++++++
 .../org/apache/pulsar/PulsarStandaloneStarter.java | 21 ++++++++-
 .../org/apache/pulsar/common/naming/TopicName.java | 30 ++++++++++---
 .../apache/pulsar/common/naming/TopicNameTest.java | 49 +++++++++++++++-----
 4 files changed, 134 insertions(+), 18 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
index acb5b6d..80e1a5c 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarClusterMetadataSetup.java
@@ -20,13 +20,19 @@ package org.apache.pulsar;
 
 import static 
org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES_ROOT;
 
+import com.google.common.collect.Lists;
 import java.io.IOException;
 
+import java.util.List;
 import org.apache.bookkeeper.client.BookKeeperAdmin;
 import org.apache.bookkeeper.conf.ServerConfiguration;
 import org.apache.bookkeeper.meta.HierarchicalLedgerManagerFactory;
 import org.apache.bookkeeper.util.ZkUtils;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.BundlesData;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory;
 import org.apache.pulsar.zookeeper.ZooKeeperClientFactory.SessionType;
@@ -143,8 +149,54 @@ public class PulsarClusterMetadataSetup {
             // Ignore
         }
 
+        // Create public tenant
+        PropertyAdmin publicProperty = new PropertyAdmin();
+        byte[] publicPropertyDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(publicProperty);
+        try {
+            ZkUtils.createFullPathOptimistic(
+                globalZk,
+                POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY,
+                publicPropertyDataJson,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        } catch (NodeExistsException e) {
+            // Ignore
+        }
+
+        // Create default namespace
+        Policies policies = new Policies();
+        policies.bundles = getBundles(4);
+        byte[] defaultNamespaceDataJson = 
ObjectMapperFactory.getThreadLocal().writeValueAsBytes(policies);
+        try {
+            ZkUtils.createFullPathOptimistic(
+                globalZk,
+                POLICIES_ROOT + "/" + TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE,
+                defaultNamespaceDataJson,
+                ZooDefs.Ids.OPEN_ACL_UNSAFE,
+                CreateMode.PERSISTENT);
+        } catch (NodeExistsException e) {
+            // Ignore
+        }
+
         log.info("Cluster metadata for '{}' setup correctly", 
arguments.cluster);
     }
 
+    private static BundlesData getBundles(int numBundles) {
+        Long maxVal = ((long) 1) << 32;
+        Long segSize = maxVal / numBundles;
+        List<String> partitions = Lists.newArrayList();
+        partitions.add(String.format("0x%08x", 0l));
+        Long curPartition = segSize;
+        for (int i = 0; i < numBundles; i++) {
+            if (i != numBundles - 1) {
+                partitions.add(String.format("0x%08x", curPartition));
+            } else {
+                partitions.add(String.format("0x%08x", maxVal - 1));
+            }
+            curPartition += segSize;
+        }
+        return new BundlesData(partitions);
+    }
+
     private static final Logger log = 
LoggerFactory.getLogger(PulsarClusterMetadataSetup.class);
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
index 68c307a..a5d0a43 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/PulsarStandaloneStarter.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.ClusterData;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -203,13 +204,14 @@ public class PulsarStandaloneStarter {
         broker = new PulsarService(config, 
Optional.ofNullable(fnWorkerService));
         broker.start();
 
-        // Create a sample namespace
         URL webServiceUrl = new URL(
                 String.format("http://%s:%d";, config.getAdvertisedAddress(), 
config.getWebServicePort()));
         final String brokerServiceUrl = String.format("pulsar://%s:%d", 
config.getAdvertisedAddress(),
                 config.getBrokerServicePort());
         admin = 
PulsarAdmin.builder().serviceHttpUrl(webServiceUrl.toString()).authentication(
                 config.getBrokerClientAuthenticationPlugin(), 
config.getBrokerClientAuthenticationParameters()).build();
+
+        // Create a sample namespace
         final String property = "sample";
         final String cluster = config.getClusterName();
         final String globalCluster = "global";
@@ -240,6 +242,23 @@ public class PulsarStandaloneStarter {
             log.info(e.getMessage());
         }
 
+        // Create a public tenant and default namespace
+        final String publicTenant = TopicName.PUBLIC_PROPERTY;
+        final String defaultNamespace = TopicName.PUBLIC_PROPERTY + "/" + 
TopicName.DEFAULT_NAMESPACE;
+        try {
+            if (!admin.properties().getProperties().contains(publicTenant)) {
+                admin.properties().createProperty(
+                    publicTenant,
+                    new 
PropertyAdmin(Sets.newHashSet(config.getSuperUserRoles()), 
Sets.newHashSet(cluster)));
+            }
+            if 
(!admin.namespaces().getNamespaces(publicTenant).contains(defaultNamespace)) {
+                admin.namespaces().createNamespace(defaultNamespace);
+                
admin.namespaces().setNamespaceReplicationClusters(defaultNamespace, 
Sets.newHashSet(config.getClusterName()));
+            }
+        } catch (PulsarAdminException e) {
+            log.info(e.getMessage());
+        }
+
         if (null != fnWorkerService) {
             fnWorkerService.start();
         }
diff --git 
a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java 
b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
index aa7dd4a..e0a8d2e 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/TopicName.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -40,6 +41,9 @@ public class TopicName implements ServiceUnitId {
 
     private static final Logger log = LoggerFactory.getLogger(TopicName.class);
 
+    public static final String PUBLIC_PROPERTY = "public";
+    public static final String DEFAULT_NAMESPACE = "default";
+
     private static final String PARTITIONED_TOPIC_SUFFIX = "-partition-";
 
     private final String completeTopicName;
@@ -98,15 +102,29 @@ public class TopicName implements ServiceUnitId {
     }
 
     private TopicName(String completeTopicName) {
-        this.completeTopicName = completeTopicName;
         try {
-            // The topic name can be in two different forms:
-            // new:    persistent://property/namespace/topic
-            // legacy: persistent://property/cluster/namespace/topic
+            // The topic name can be in two different forms, one is fully 
qualified topic name,
+            // the other one is short topic name
             if (!completeTopicName.contains("://")) {
-                throw new IllegalArgumentException(
-                        "Invalid topic name: " + completeTopicName + " -- 
Domain is missing");
+                // The short topic name can be:
+                // - <topic>
+                // - <property>/<namespace>/<topic>
+                String[] parts = StringUtils.split(completeTopicName, '/');
+                if (parts.length == 3) {
+                    completeTopicName = TopicDomain.persistent.name() + "://" 
+ completeTopicName;
+                } else if (parts.length == 1) {
+                    completeTopicName = TopicDomain.persistent.name() + "://" 
+ PUBLIC_PROPERTY + "/" + DEFAULT_NAMESPACE + "/" + parts[0];
+                } else {
+                    throw new IllegalArgumentException(
+                        "Invalid short topic name '" + completeTopicName + "', 
it should be in the format of "
+                        + "<tenant>/<namespace>/<topic> or <topic>");
+                }
             }
+            this.completeTopicName = completeTopicName;
+
+            // The fully qualified topic name can be in two different forms:
+            // new:    persistent://property/namespace/topic
+            // legacy: persistent://property/cluster/namespace/topic
 
             List<String> parts = 
Splitter.on("://").limit(2).splitToList(completeTopicName);
             this.domain = TopicDomain.getEnum(parts.get(0));
diff --git 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
index 70ba953..138f2dc 100644
--- 
a/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
+++ 
b/pulsar-common/src/test/java/org/apache/pulsar/common/naming/TopicNameTest.java
@@ -22,8 +22,6 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.fail;
 
-import org.apache.pulsar.common.naming.TopicDomain;
-import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.util.Codec;
 import org.testng.annotations.Test;
 
@@ -33,7 +31,7 @@ public class TopicNameTest {
     @Test
     void topic() {
         try {
-            
assertEquals(TopicName.get("property.namespace:topic").getNamespace(), 
"property.namespace");
+            TopicName.get("://property.namespace:topic").getNamespace();
             fail("Should have thrown exception");
         } catch (IllegalArgumentException e) {
             // Expected
@@ -70,42 +68,42 @@ public class TopicNameTest {
                 "topic");
 
         try {
-            TopicName.get("property.namespace:my-topic").getDomain();
+            TopicName.get("://property.namespace:my-topic").getDomain();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getProperty();
+            TopicName.get("://property.namespace:my-topic").getProperty();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getCluster();
+            TopicName.get("://property.namespace:my-topic").getCluster();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getNamespacePortion();
+            
TopicName.get("://property.namespace:my-topic").getNamespacePortion();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace:my-topic").getLocalName();
+            TopicName.get("://property.namespace:my-topic").getLocalName();
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property.namespace");
+            TopicName.get("://property.namespace");
             fail("Should have raised exception");
         } catch (IllegalArgumentException e) {
             // Ok
@@ -164,14 +162,14 @@ public class TopicNameTest {
                 .getPersistenceNamingEncoding(), 
"property/cluster/namespace/persistent/topic");
 
         try {
-            TopicName.get("property.namespace");
+            TopicName.get("://property.namespace");
             fail("Should have raied exception");
         } catch (IllegalArgumentException e) {
             // Ok
         }
 
         try {
-            TopicName.get("property/cluster/namespace");
+            TopicName.get("://property/cluster/namespace");
             fail("Should have raied exception");
         } catch (IllegalArgumentException e) {
             // Ok
@@ -240,4 +238,33 @@ public class TopicNameTest {
         assertEquals(topicName.getPartitionedTopicName(), 
"persistent://property/namespace/topic");
         assertEquals(topicName.getPersistenceNamingEncoding(), 
"property/namespace/persistent/topic");
     }
+
+    @Test
+    public void testShortTopicName() throws Exception {
+        TopicName tn = TopicName.get("short-topic");
+        assertEquals(TopicDomain.persistent, tn.getDomain());
+        assertEquals(TopicName.PUBLIC_PROPERTY, tn.getProperty());
+        assertEquals(TopicName.DEFAULT_NAMESPACE, tn.getNamespacePortion());
+        assertEquals("short-topic", tn.getLocalName());
+
+        tn = TopicName.get("test-tenant/test-namespace/test-short-topic");
+        assertEquals(TopicDomain.persistent, tn.getDomain());
+        assertEquals("test-tenant", tn.getProperty());
+        assertEquals("test-namespace", tn.getNamespacePortion());
+        assertEquals("test-short-topic", tn.getLocalName());
+
+        try {
+            TopicName.get("pulsar/cluster/namespace/test");
+            fail("Should have raised exception");
+        } catch (IllegalArgumentException e) {
+            // Ok
+        }
+
+        try {
+            TopicName.get("pulsar/cluster");
+            fail("Should have raised exception");
+        } catch (IllegalArgumentException e) {
+            // Ok
+        }
+    }
 }

-- 
To stop receiving notification emails like this one, please contact
si...@apache.org.

Reply via email to