This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b37d7b7 Support short topic name in cpp client (#1564) b37d7b7 is described below commit b37d7b7a64340659e1994b568d7f4ff562865505 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Sun Apr 15 18:27:10 2018 -0700 Support short topic name in cpp client (#1564) * Support short topic name in cpp client * Fix format issue * fix test * fix test failures * Fix format issue --- pulsar-client-cpp/lib/TopicName.cc | 86 ++++++++++++++++++++++------ pulsar-client-cpp/lib/TopicName.h | 4 +- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 4 +- pulsar-client-cpp/tests/TopicNameTest.cc | 36 ++++++++++++ 4 files changed, 111 insertions(+), 19 deletions(-) diff --git a/pulsar-client-cpp/lib/TopicName.cc b/pulsar-client-cpp/lib/TopicName.cc index 1a79a5d..0ad1313 100644 --- a/pulsar-client-cpp/lib/TopicName.cc +++ b/pulsar-client-cpp/lib/TopicName.cc @@ -52,10 +52,30 @@ TopicName::TopicName() {} bool TopicName::init(const std::string& topicName) { topicName_ = topicName; if (topicName.find("://") == std::string::npos) { - LOG_ERROR("Topic name is not valid, domain not present - " << topicName); + std::string topicNameCopy_ = topicName; + std::vector<std::string> pathTokens; + boost::algorithm::split(pathTokens, topicNameCopy_, boost::algorithm::is_any_of("/")); + if (pathTokens.size() == 3) { + topicName_ = "persistent://" + pathTokens[0] + "/" + pathTokens[1] + "/" + pathTokens[2]; + } else if (pathTokens.size() == 1) { + topicName_ = "persistent://public/default/" + pathTokens[0]; + } else { + LOG_ERROR( + "Topic name is not valid, short topic name should be in the format of '<topic>' or " + "'<property>/<namespace>/<topic>' - " + << topicName); + return false; + } + } + isV2Topic_ = parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_); + if (isV2Topic_ && !cluster_.empty()) { + LOG_ERROR("V2 Topic name is not valid, cluster is not empty - " << topicName_ << " : cluster " + << cluster_); + return false; + } else if (!isV2Topic_ && cluster_.empty()) { + LOG_ERROR("V1 Topic name is not valid, cluster is empty - " << topicName_); return false; } - parse(topicName_, domain_, property_, cluster_, namespacePortion_, localName_); if (localName_.empty()) { LOG_ERROR("Topic name is not valid, topic name is empty - " << topicName_); return false; @@ -63,28 +83,45 @@ bool TopicName::init(const std::string& topicName) { namespaceName_ = NamespaceName::get(property_, cluster_, namespacePortion_); return true; } -void TopicName::parse(const std::string& topicName, std::string& domain, std::string& property, +bool TopicName::parse(const std::string& topicName, std::string& domain, std::string& property, std::string& cluster, std::string& namespacePortion, std::string& localName) { std::string topicNameCopy = topicName; boost::replace_first(topicNameCopy, "://", "/"); std::vector<std::string> pathTokens; boost::algorithm::split(pathTokens, topicNameCopy, boost::algorithm::is_any_of("/")); - if (pathTokens.size() < 5) { + if (pathTokens.size() < 4) { LOG_ERROR("Topic name is not valid, does not have enough parts - " << topicName); - return; + return false; } domain = pathTokens[0]; - property = pathTokens[1]; - cluster = pathTokens[2]; - namespacePortion = pathTokens[3]; + size_t numSlashIndexes; + bool isV2Topic; + if (pathTokens.size() == 4) { + // New topic name without cluster name + property = pathTokens[1]; + cluster = ""; + namespacePortion = pathTokens[2]; + localName = pathTokens[3]; + numSlashIndexes = 3; + isV2Topic = true; + } else { + // Legacy topic name that includes cluster name + property = pathTokens[1]; + cluster = pathTokens[2]; + namespacePortion = pathTokens[3]; + localName = pathTokens[4]; + numSlashIndexes = 4; + isV2Topic = false; + } size_t slashIndex = -1; - // find four '/', whatever is left is topic local name - for (int i = 0; i < 4; i++) { + // find `numSlashIndexes` '/', whatever is left is topic local name + for (int i = 0; i < numSlashIndexes; i++) { slashIndex = topicNameCopy.find('/', slashIndex + 1); } // get index to next char to '/' slashIndex++; localName = topicNameCopy.substr(slashIndex, (topicNameCopy.size() - slashIndex)); + return isV2Topic; } std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) { Lock lock(curlHandleMutex); @@ -104,6 +141,8 @@ std::string TopicName::getEncodedName(const std::string& nameBeforeEncoding) { return nameAfterEncoding; } +bool TopicName::isV2Topic() { return isV2Topic_; } + std::string TopicName::getDomain() { return domain_; } std::string TopicName::getProperty() { return property_; } @@ -125,9 +164,15 @@ bool TopicName::validate() { if (domain_.compare("persistent") != 0) { return false; } - if (!property_.empty() && !cluster_.empty() && !namespacePortion_.empty() && !localName_.empty()) { + // cluster_ can be empty + if (!isV2Topic_ && !property_.empty() && !cluster_.empty() && !namespacePortion_.empty() && + !localName_.empty()) { + // v1 topic format return NamedEntity::checkName(property_) && NamedEntity::checkName(cluster_) && NamedEntity::checkName(namespacePortion_); + } else if (isV2Topic_ && !property_.empty() && !namespacePortion_.empty() && !localName_.empty()) { + // v2 topic format + return NamedEntity::checkName(property_) && NamedEntity::checkName(namespacePortion_); } else { return false; } @@ -142,7 +187,7 @@ boost::shared_ptr<TopicName> TopicName::get(const std::string& topicName) { if (ptr->validate()) { return ptr; } else { - LOG_ERROR("Topic name validation Failed"); + LOG_ERROR("Topic name validation Failed - " << topicName); return boost::shared_ptr<TopicName>(); } } @@ -151,16 +196,25 @@ boost::shared_ptr<TopicName> TopicName::get(const std::string& topicName) { std::string TopicName::getLookupName() { std::stringstream ss; std::string seperator("/"); - ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_ - << seperator << getEncodedLocalName(); + if (isV2Topic_ && cluster_.empty()) { + ss << domain_ << seperator << property_ << seperator << namespacePortion_ << seperator + << getEncodedLocalName(); + } else { + ss << domain_ << seperator << property_ << seperator << cluster_ << seperator << namespacePortion_ + << seperator << getEncodedLocalName(); + } return ss.str(); } std::string TopicName::toString() { std::stringstream ss; std::string seperator("/"); - ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_ - << seperator << localName_; + if (isV2Topic_ && cluster_.empty()) { + ss << domain_ << "://" << property_ << seperator << namespacePortion_ << seperator << localName_; + } else { + ss << domain_ << "://" << property_ << seperator << cluster_ << seperator << namespacePortion_ + << seperator << localName_; + } return ss.str(); } diff --git a/pulsar-client-cpp/lib/TopicName.h b/pulsar-client-cpp/lib/TopicName.h index b8d610c..91b7994 100644 --- a/pulsar-client-cpp/lib/TopicName.h +++ b/pulsar-client-cpp/lib/TopicName.h @@ -38,9 +38,11 @@ class TopicName : public ServiceUnitId { std::string cluster_; std::string namespacePortion_; std::string localName_; + bool isV2Topic_; boost::shared_ptr<NamespaceName> namespaceName_; public: + bool isV2Topic(); std::string getLookupName(); std::string getDomain(); std::string getProperty(); @@ -58,7 +60,7 @@ class TopicName : public ServiceUnitId { static CURL* getCurlHandle(); static CURL* curl; static boost::mutex curlHandleMutex; - static void parse(const std::string& topicName, std::string& domain, std::string& property, + static bool parse(const std::string& topicName, std::string& domain, std::string& property, std::string& cluster, std::string& namespacePortion, std::string& localName); TopicName(); bool validate(); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index dfbf44e..f89096c 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -240,11 +240,11 @@ TEST(BasicEndToEndTest, testLookupThrottling) { TEST(BasicEndToEndTest, testNonExistingTopic) { Client client(lookupUrl); Producer producer; - Result result = client.createProducer("persistent://prop/unit/ns1", producer); + Result result = client.createProducer("persistent://prop//unit/ns1/testNonExistingTopic", producer); ASSERT_EQ(ResultInvalidTopicName, result); Consumer consumer; - result = client.subscribe("persistent://prop/unit/ns1", "my-sub-name", consumer); + result = client.subscribe("persistent://prop//unit/ns1/testNonExistingTopic", "my-sub-name", consumer); ASSERT_EQ(ResultInvalidTopicName, result); } diff --git a/pulsar-client-cpp/tests/TopicNameTest.cc b/pulsar-client-cpp/tests/TopicNameTest.cc index 373bbef..5c9410a 100644 --- a/pulsar-client-cpp/tests/TopicNameTest.cc +++ b/pulsar-client-cpp/tests/TopicNameTest.cc @@ -42,6 +42,42 @@ TEST(TopicNameTest, testTopicName) { ASSERT_TRUE(*topicName1 == *topicName2); } +TEST(TopicNameTest, testShortTopicName) { + // "short-topic" + boost::shared_ptr<TopicName> tn1 = TopicName::get("short-topic"); + ASSERT_EQ("public", tn1->getProperty()); + ASSERT_EQ("", tn1->getCluster()); + ASSERT_EQ("default", tn1->getNamespacePortion()); + ASSERT_EQ("persistent", tn1->getDomain()); + ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName()); + + // tenant/namespace/topic + boost::shared_ptr<TopicName> tn2 = TopicName::get("tenant/namespace/short-topic"); + ASSERT_EQ("tenant", tn2->getProperty()); + ASSERT_EQ("", tn2->getCluster()); + ASSERT_EQ("namespace", tn2->getNamespacePortion()); + ASSERT_EQ("persistent", tn2->getDomain()); + ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn2->getLocalName()); + + // tenant/cluster/namespace/topic + boost::shared_ptr<TopicName> tn3 = TopicName::get("tenant/cluster/namespace/short-topic"); + ASSERT_FALSE(tn3); + + // tenant/cluster + boost::shared_ptr<TopicName> tn4 = TopicName::get("tenant/cluster"); + ASSERT_FALSE(tn4); +} + +TEST(TopicNameTest, testTopicNameV2) { + // v2 topic names doesn't have "cluster" + boost::shared_ptr<TopicName> tn1 = TopicName::get("persistent://tenant/namespace/short-topic"); + ASSERT_EQ("tenant", tn1->getProperty()); + ASSERT_EQ("", tn1->getCluster()); + ASSERT_EQ("namespace", tn1->getNamespacePortion()); + ASSERT_EQ("persistent", tn1->getDomain()); + ASSERT_EQ(TopicName::getEncodedName("short-topic"), tn1->getLocalName()); +} + TEST(TopicNameTest, testTopicNameWithSlashes) { // Compare getters and setters boost::shared_ptr<TopicName> topicName = -- To stop receiving notification emails like this one, please contact mme...@apache.org.