This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new b23134c  Cpp client: add PatternMultiTopicsConsumerImpl to support 
regex subscribe (#2219)
b23134c is described below

commit b23134c422456f45ed14c3c49de53eb8889e45fc
Author: Jia Zhai <zhaiji...@gmail.com>
AuthorDate: Sat Aug 4 14:56:56 2018 +0800

    Cpp client: add PatternMultiTopicsConsumerImpl to support regex subscribe 
(#2219)
    
    In PR #1279 and #1298 we added regex based subscription. This is a catch up 
work to add `PatternMultiTopicsConsumerImpl` in cpp client.
---
 pulsar-client-cpp/include/pulsar/Client.h          |  16 ++
 .../include/pulsar/ConsumerConfiguration.h         |  10 +
 pulsar-client-cpp/lib/BinaryProtoLookupService.cc  |  42 ++++
 pulsar-client-cpp/lib/BinaryProtoLookupService.h   |   9 +
 pulsar-client-cpp/lib/Client.cc                    |  24 ++
 pulsar-client-cpp/lib/ClientConnection.cc          |  86 ++++++-
 pulsar-client-cpp/lib/ClientConnection.h           |  13 +-
 pulsar-client-cpp/lib/ClientImpl.cc                |  58 +++++
 pulsar-client-cpp/lib/ClientImpl.h                 |   8 +
 pulsar-client-cpp/lib/Commands.cc                  |  18 ++
 pulsar-client-cpp/lib/Commands.h                   |   1 +
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |   6 +
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   4 +-
 pulsar-client-cpp/lib/HTTPLookupService.cc         | 112 +++++++--
 pulsar-client-cpp/lib/HTTPLookupService.h          |   9 +-
 pulsar-client-cpp/lib/LookupService.h              |  11 +
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |   2 +-
 .../lib/PatternMultiTopicsConsumerImpl.cc          | 235 +++++++++++++++++++
 .../lib/PatternMultiTopicsConsumerImpl.h           |  76 ++++++
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 257 +++++++++++++++++++++
 pulsar-client-cpp/tests/BinaryLookupServiceTest.cc |  67 ++++++
 21 files changed, 1032 insertions(+), 32 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Client.h 
b/pulsar-client-cpp/include/pulsar/Client.h
index 6a9e487..1913851 100644
--- a/pulsar-client-cpp/include/pulsar/Client.h
+++ b/pulsar-client-cpp/include/pulsar/Client.h
@@ -99,6 +99,9 @@ class Client {
     void subscribeAsync(const std::string& topic, const std::string& 
consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    /**
+     * subscribe for multiple topics under the same namespace.
+     */
     Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
                      Consumer& consumer);
     Result subscribe(const std::vector<std::string>& topics, const 
std::string& subscriptionName,
@@ -109,6 +112,19 @@ class Client {
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
     /**
+     * subscribe for multiple topics, which match given regexPattern, under 
the same namespace.
+     */
+    Result subscribeWithRegex(const std::string& regexPattern, const 
std::string& consumerName,
+                              Consumer& consumer);
+    Result subscribeWithRegex(const std::string& regexPattern, const 
std::string& consumerName,
+                              const ConsumerConfiguration& conf, Consumer& 
consumer);
+
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 SubscribeCallback callback);
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
+    /**
      * Create a topic reader with given {@code ReaderConfiguration} for 
reading messages from the specified
      * topic.
      * <p>
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index c9584e3..36e5808 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -152,6 +152,16 @@ class ConsumerConfiguration {
     bool isReadCompacted() const;
     void setReadCompacted(bool compacted);
 
+    /**
+     * Set the time duration in minutes, for which the 
PatternMultiTopicsConsumer will do a pattern auto
+     * discovery.
+     * The default value is 60 seconds. less than 0 will disable auto 
discovery.
+     *
+     * @param periodInSeconds       period in seconds to do an auto discovery
+     */
+    void setPatternAutoDiscoveryPeriod(int periodInSeconds);
+    int getPatternAutoDiscoveryPeriod() const;
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc 
b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
index c4bef30..296faee 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.cc
@@ -152,4 +152,46 @@ uint64_t BinaryProtoLookupService::newRequestId() {
     Lock lock(mutex_);
     return ++requestIdGenerator_;
 }
+
+Future<Result, NamespaceTopicsPtr> 
BinaryProtoLookupService::getTopicsOfNamespaceAsync(
+    const NamespaceNamePtr& nsName) {
+    NamespaceTopicsPromisePtr promise = boost::make_shared<Promise<Result, 
NamespaceTopicsPtr>>();
+    if (!nsName) {
+        promise->setFailed(ResultInvalidTopicName);
+        return promise->getFuture();
+    }
+    std::string namespaceName = nsName->toString();
+    Future<Result, ClientConnectionWeakPtr> future = 
cnxPool_.getConnectionAsync(serviceUrl_, serviceUrl_);
+    
future.addListener(boost::bind(&BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest,
 this,
+                                   namespaceName, _1, _2, promise));
+    return promise->getFuture();
+}
+
+void BinaryProtoLookupService::sendGetTopicsOfNamespaceRequest(const 
std::string& nsName, Result result,
+                                                               const 
ClientConnectionWeakPtr& clientCnx,
+                                                               
NamespaceTopicsPromisePtr promise) {
+    if (result != ResultOk) {
+        promise->setFailed(ResultConnectError);
+        return;
+    }
+
+    ClientConnectionPtr conn = clientCnx.lock();
+    uint64_t requestId = newRequestId();
+    LOG_DEBUG("sendGetTopicsOfNamespaceRequest. requestId: " << requestId << " 
nsName: " << nsName);
+
+    conn->newGetTopicsOfNamespace(nsName, requestId)
+        .addListener(
+            
boost::bind(&BinaryProtoLookupService::getTopicsOfNamespaceListener, this, _1, 
_2, promise));
+}
+
+void BinaryProtoLookupService::getTopicsOfNamespaceListener(Result result, 
NamespaceTopicsPtr topicsPtr,
+                                                            
NamespaceTopicsPromisePtr promise) {
+    if (result != ResultOk) {
+        promise->setFailed(ResultLookupError);
+        return;
+    }
+
+    promise->setValue(topicsPtr);
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/BinaryProtoLookupService.h 
b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
index f647c62..36fb5e8 100644
--- a/pulsar-client-cpp/lib/BinaryProtoLookupService.h
+++ b/pulsar-client-cpp/lib/BinaryProtoLookupService.h
@@ -40,6 +40,8 @@ class BinaryProtoLookupService : public LookupService {
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr& topicName);
 
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName);
+
    private:
     boost::mutex mutex_;
     uint64_t requestIdGenerator_;
@@ -61,6 +63,13 @@ class BinaryProtoLookupService : public LookupService {
                                        const ClientConnectionWeakPtr& 
clientCnx,
                                        LookupDataResultPromisePtr promise);
 
+    void sendGetTopicsOfNamespaceRequest(const std::string& nsName, Result 
result,
+                                         const ClientConnectionWeakPtr& 
clientCnx,
+                                         NamespaceTopicsPromisePtr promise);
+
+    void getTopicsOfNamespaceListener(Result result, NamespaceTopicsPtr 
topicsPtr,
+                                      NamespaceTopicsPromisePtr promise);
+
     uint64_t newRequestId();
 };
 typedef boost::shared_ptr<BinaryProtoLookupService> 
BinaryProtoLookupServicePtr;
diff --git a/pulsar-client-cpp/lib/Client.cc b/pulsar-client-cpp/lib/Client.cc
index bba3520..5cfe01f 100644
--- a/pulsar-client-cpp/lib/Client.cc
+++ b/pulsar-client-cpp/lib/Client.cc
@@ -114,6 +114,30 @@ void Client::subscribeAsync(const 
std::vector<std::string>& topics, const std::s
     impl_->subscribeAsync(topics, subscriptionName, conf, callback);
 }
 
+Result Client::subscribeWithRegex(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                  Consumer& consumer) {
+    return subscribeWithRegex(regexPattern, subscriptionName, 
ConsumerConfiguration(), consumer);
+}
+
+Result Client::subscribeWithRegex(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                  const ConsumerConfiguration& conf, Consumer& 
consumer) {
+    Promise<Result, Consumer> promise;
+    subscribeWithRegexAsync(regexPattern, subscriptionName, conf, 
WaitForCallbackValue<Consumer>(promise));
+    Future<Result, Consumer> future = promise.getFuture();
+
+    return future.get(consumer);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                     SubscribeCallback callback) {
+    subscribeWithRegexAsync(regexPattern, subscriptionName, 
ConsumerConfiguration(), callback);
+}
+
+void Client::subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& subscriptionName,
+                                     const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    impl_->subscribeWithRegexAsync(regexPattern, subscriptionName, conf, 
callback);
+}
+
 Result Client::createReader(const std::string& topic, const MessageId& 
startMessageId,
                             const ReaderConfiguration& conf, Reader& reader) {
     Promise<Result, Reader> promise;
diff --git a/pulsar-client-cpp/lib/ClientConnection.cc 
b/pulsar-client-cpp/lib/ClientConnection.cc
index 4e6d0f2..da30707 100644
--- a/pulsar-client-cpp/lib/ClientConnection.cc
+++ b/pulsar-client-cpp/lib/ClientConnection.cc
@@ -219,7 +219,7 @@ void ClientConnection::handlePulsarConnected(const 
CommandConnected& cmdConnecte
 }
 
 void ClientConnection::startConsumerStatsTimer(std::vector<uint64_t> 
consumerStatsRequests) {
-    std::vector<Promise<Result, BrokerConsumerStatsImpl> > 
consumerStatsPromises;
+    std::vector<Promise<Result, BrokerConsumerStatsImpl>> 
consumerStatsPromises;
     Lock lock(mutex_);
 
     for (int i = 0; i < consumerStatsRequests.size(); i++) {
@@ -856,6 +856,7 @@ void ClientConnection::handleIncomingCommand() {
                                         << " -- req_id: " << 
error.request_id());
 
                     Lock lock(mutex_);
+
                     PendingRequestsMap::iterator it = 
pendingRequests_.find(error.request_id());
                     if (it != pendingRequests_.end()) {
                         PendingRequestData requestData = it->second;
@@ -865,19 +866,28 @@ void ClientConnection::handleIncomingCommand() {
                         
requestData.promise.setFailed(getResult(error.error()));
                         requestData.timer->cancel();
                     } else {
-                        PendingGetLastMessageIdRequestsMap::iterator it2 =
+                        PendingGetLastMessageIdRequestsMap::iterator it =
                             
pendingGetLastMessageIdRequests_.find(error.request_id());
-                        if (it2 != pendingGetLastMessageIdRequests_.end()) {
-                            Promise<Result, MessageId> getLastMessageIdPromise 
= it2->second;
-                            pendingGetLastMessageIdRequests_.erase(it2);
+                        if (it != pendingGetLastMessageIdRequests_.end()) {
+                            Promise<Result, MessageId> getLastMessageIdPromise 
= it->second;
+                            pendingGetLastMessageIdRequests_.erase(it);
                             lock.unlock();
 
                             
getLastMessageIdPromise.setFailed(getResult(error.error()));
                         } else {
-                            lock.unlock();
+                            PendingGetNamespaceTopicsMap::iterator it =
+                                
pendingGetNamespaceTopicsRequests_.find(error.request_id());
+                            if (it != 
pendingGetNamespaceTopicsRequests_.end()) {
+                                Promise<Result, NamespaceTopicsPtr> 
getNamespaceTopicsPromise = it->second;
+                                pendingGetNamespaceTopicsRequests_.erase(it);
+                                lock.unlock();
+
+                                
getNamespaceTopicsPromise.setFailed(getResult(error.error()));
+                            } else {
+                                lock.unlock();
+                            }
                         }
                     }
-
                     break;
                 }
 
@@ -978,6 +988,51 @@ void ClientConnection::handleIncomingCommand() {
                     break;
                 }
 
+                case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE: {
+                    const CommandGetTopicsOfNamespaceResponse& response =
+                        incomingCmd_.gettopicsofnamespaceresponse();
+
+                    LOG_DEBUG(cnxString_ << "Received 
GetTopicsOfNamespaceResponse from server. req_id: "
+                                         << response.request_id() << " 
topicsSize" << response.topics_size());
+
+                    Lock lock(mutex_);
+                    PendingGetNamespaceTopicsMap::iterator it =
+                        
pendingGetNamespaceTopicsRequests_.find(response.request_id());
+
+                    if (it != pendingGetNamespaceTopicsRequests_.end()) {
+                        Promise<Result, NamespaceTopicsPtr> getTopicsPromise = 
it->second;
+                        pendingGetNamespaceTopicsRequests_.erase(it);
+                        lock.unlock();
+
+                        int numTopics = response.topics_size();
+                        std::set<std::string> topicSet;
+                        // get all topics
+                        for (int i = 0; i < numTopics; i++) {
+                            // remove partition part
+                            const std::string& topicName = response.topics(i);
+                            int pos = topicName.find("-partition-");
+                            std::string filteredName = topicName.substr(0, 
pos);
+
+                            // filter duped topic name
+                            if (topicSet.find(filteredName) == topicSet.end()) 
{
+                                topicSet.insert(filteredName);
+                            }
+                        }
+
+                        NamespaceTopicsPtr topicsPtr =
+                            
boost::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
+
+                        getTopicsPromise.setValue(topicsPtr);
+                    } else {
+                        lock.unlock();
+                        LOG_WARN(
+                            "GetTopicsOfNamespaceResponse command - Received 
unknown request id from "
+                            "server: "
+                            << response.request_id());
+                    }
+                    break;
+                }
+
                 default: {
                     LOG_WARN(cnxString_ << "Received invalid message from 
server");
                     close();
@@ -1281,4 +1336,21 @@ Future<Result, MessageId> 
ClientConnection::newGetLastMessageId(uint64_t consume
     return promise.getFuture();
 }
 
+Future<Result, NamespaceTopicsPtr> 
ClientConnection::newGetTopicsOfNamespace(const std::string& nsName,
+                                                                             
uint64_t requestId) {
+    Lock lock(mutex_);
+    Promise<Result, NamespaceTopicsPtr> promise;
+    if (isClosed()) {
+        lock.unlock();
+        LOG_ERROR(cnxString_ << "Client is not connected to the broker");
+        promise.setFailed(ResultNotConnected);
+        return promise.getFuture();
+    }
+
+    pendingGetNamespaceTopicsRequests_.insert(std::make_pair(requestId, 
promise));
+    lock.unlock();
+    sendCommand(Commands::newGetTopicsOfNamespace(nsName, requestId));
+    return promise.getFuture();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConnection.h 
b/pulsar-client-cpp/lib/ClientConnection.h
index 860ca6a..bdbc8e5 100644
--- a/pulsar-client-cpp/lib/ClientConnection.h
+++ b/pulsar-client-cpp/lib/ClientConnection.h
@@ -70,6 +70,8 @@ struct OpSendMsg;
 
 typedef std::pair<std::string, int64_t> ResponseData;
 
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+
 class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection> {
     enum State
     {
@@ -81,7 +83,7 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
 
    public:
     typedef boost::shared_ptr<boost::asio::ip::tcp::socket> SocketPtr;
-    typedef 
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&> > 
TlsSocketPtr;
+    typedef 
boost::shared_ptr<boost::asio::ssl::stream<boost::asio::ip::tcp::socket&>> 
TlsSocketPtr;
     typedef boost::shared_ptr<ClientConnection> ConnectionPtr;
     typedef boost::function<void(const boost::system::error_code&, 
ConnectionPtr)> ConnectionListener;
     typedef std::vector<ConnectionListener>::iterator ListenerIterator;
@@ -144,6 +146,8 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
 
     Future<Result, MessageId> newGetLastMessageId(uint64_t consumerId, 
uint64_t requestId);
 
+    Future<Result, NamespaceTopicsPtr> newGetTopicsOfNamespace(const 
std::string& nsName, uint64_t requestId);
+
    private:
     struct PendingRequestData {
         Promise<Result, ResponseData> promise;
@@ -264,12 +268,15 @@ class ClientConnection : public 
boost::enable_shared_from_this<ClientConnection>
     typedef std::map<long, ConsumerImplWeakPtr> ConsumersMap;
     ConsumersMap consumers_;
 
-    typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl> > 
PendingConsumerStatsMap;
+    typedef std::map<uint64_t, Promise<Result, BrokerConsumerStatsImpl>> 
PendingConsumerStatsMap;
     PendingConsumerStatsMap pendingConsumerStatsMap_;
 
-    typedef std::map<long, Promise<Result, MessageId> > 
PendingGetLastMessageIdRequestsMap;
+    typedef std::map<long, Promise<Result, MessageId>> 
PendingGetLastMessageIdRequestsMap;
     PendingGetLastMessageIdRequestsMap pendingGetLastMessageIdRequests_;
 
+    typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> 
PendingGetNamespaceTopicsMap;
+    PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;
+
     boost::mutex mutex_;
     typedef boost::unique_lock<boost::mutex> Lock;
 
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc 
b/pulsar-client-cpp/lib/ClientImpl.cc
index 3768926..ec113fc 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -25,6 +25,7 @@
 #include "PartitionedProducerImpl.h"
 #include "PartitionedConsumerImpl.h"
 #include "MultiTopicsConsumerImpl.h"
+#include "PatternMultiTopicsConsumerImpl.h"
 #include "SimpleLoggerImpl.h"
 #include "Log4CxxLogger.h"
 #include <boost/bind.hpp>
@@ -35,6 +36,7 @@
 #include <lib/HTTPLookupService.h>
 #include <lib/TopicName.h>
 #include <algorithm>
+#include <regex>
 
 DECLARE_LOG_OBJECT()
 
@@ -119,6 +121,9 @@ ExecutorServiceProviderPtr 
ClientImpl::getListenerExecutorProvider() { return li
 ExecutorServiceProviderPtr ClientImpl::getPartitionListenerExecutorProvider() {
     return partitionListenerExecutorProvider_;
 }
+
+LookupServicePtr ClientImpl::getLookup() { return lookupServicePtr_; }
+
 void ClientImpl::createProducerAsync(const std::string& topic, 
ProducerConfiguration conf,
                                      CreateProducerCallback callback) {
     TopicNamePtr topicName;
@@ -212,6 +217,59 @@ void ClientImpl::handleReaderMetadataLookup(const Result 
result, const LookupDat
     consumers_.push_back(reader->getConsumer());
 }
 
+void ClientImpl::subscribeWithRegexAsync(const std::string& regexPattern, 
const std::string& consumerName,
+                                         const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
+    TopicNamePtr topicNamePtr = TopicName::get(regexPattern);
+
+    Lock lock(mutex_);
+    if (state_ != Open) {
+        lock.unlock();
+        callback(ResultAlreadyClosed, Consumer());
+        return;
+    } else {
+        lock.unlock();
+        if (!topicNamePtr) {
+            LOG_ERROR("Topic pattern not valid: " << regexPattern);
+            callback(ResultInvalidTopicName, Consumer());
+            return;
+        }
+    }
+
+    NamespaceNamePtr nsName = topicNamePtr->getNamespaceName();
+
+    lookupServicePtr_->getTopicsOfNamespaceAsync(nsName).addListener(
+        boost::bind(&ClientImpl::createPatternMultiTopicsConsumer, 
shared_from_this(), _1, _2, regexPattern,
+                    consumerName, conf, callback));
+}
+
+void ClientImpl::createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
+                                                  const std::string& 
regexPattern,
+                                                  const std::string& 
consumerName,
+                                                  const ConsumerConfiguration& 
conf,
+                                                  SubscribeCallback callback) {
+    if (result == ResultOk) {
+        ConsumerImplBasePtr consumer;
+
+        std::regex pattern(regexPattern);
+
+        NamespaceTopicsPtr matchTopics =
+            PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, 
pattern);
+
+        consumer = boost::make_shared<PatternMultiTopicsConsumerImpl>(
+            shared_from_this(), regexPattern, *matchTopics, consumerName, 
conf, lookupServicePtr_);
+
+        consumer->getConsumerCreatedFuture().addListener(
+            boost::bind(&ClientImpl::handleConsumerCreated, 
shared_from_this(), _1, _2, callback, consumer));
+        Lock lock(mutex_);
+        consumers_.push_back(consumer);
+        lock.unlock();
+        consumer->start();
+    } else {
+        LOG_ERROR("Error Getting topicsOfNameSpace while 
createPatternMultiTopicsConsumer:  " << result);
+        callback(result, Consumer());
+    }
+}
+
 void ClientImpl::subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback) {
     TopicNamePtr topicNamePtr;
diff --git a/pulsar-client-cpp/lib/ClientImpl.h 
b/pulsar-client-cpp/lib/ClientImpl.h
index 550298b..54d459d 100644
--- a/pulsar-client-cpp/lib/ClientImpl.h
+++ b/pulsar-client-cpp/lib/ClientImpl.h
@@ -60,6 +60,9 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
     void subscribeAsync(const std::vector<std::string>& topics, const 
std::string& consumerName,
                         const ConsumerConfiguration& conf, SubscribeCallback 
callback);
 
+    void subscribeWithRegexAsync(const std::string& regexPattern, const 
std::string& consumerName,
+                                 const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
     void createReaderAsync(const std::string& topic, const MessageId& 
startMessageId,
                            const ReaderConfiguration& conf, ReaderCallback 
callback);
 
@@ -82,6 +85,7 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
     ExecutorServiceProviderPtr getIOExecutorProvider();
     ExecutorServiceProviderPtr getListenerExecutorProvider();
     ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
+    LookupServicePtr getLookup();
     friend class PulsarFriend;
 
    private:
@@ -106,6 +110,10 @@ class ClientImpl : public 
boost::enable_shared_from_this<ClientImpl> {
 
     void handleClose(Result result, SharedInt remaining, ResultCallback 
callback);
 
+    void createPatternMultiTopicsConsumer(const Result result, const 
NamespaceTopicsPtr topics,
+                                          const std::string& regexPattern, 
const std::string& consumerName,
+                                          const ConsumerConfiguration& conf, 
SubscribeCallback callback);
+
     enum State
     {
         Open,
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 13bf99a..8a1933b 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -324,6 +324,18 @@ SharedBuffer Commands::newGetLastMessageId(uint64_t 
consumerId, uint64_t request
     return buffer;
 }
 
+SharedBuffer Commands::newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId) {
+    BaseCommand cmd;
+    cmd.set_type(BaseCommand::GET_TOPICS_OF_NAMESPACE);
+    CommandGetTopicsOfNamespace* getTopics = 
cmd.mutable_gettopicsofnamespace();
+    getTopics->set_request_id(requestId);
+    getTopics->set_namespace_(nsName);
+
+    const SharedBuffer buffer = writeMessageWithSize(cmd);
+    cmd.clear_gettopicsofnamespace();
+    return buffer;
+}
+
 std::string Commands::messageType(BaseCommand_Type type) {
     switch (type) {
         case BaseCommand::CONNECT:
@@ -416,6 +428,12 @@ std::string Commands::messageType(BaseCommand_Type type) {
         case BaseCommand::GET_LAST_MESSAGE_ID_RESPONSE:
             return "GET_LAST_MESSAGE_ID_RESPONSE";
             break;
+        case BaseCommand::GET_TOPICS_OF_NAMESPACE:
+            return "GET_TOPICS_OF_NAMESPACE";
+            break;
+        case BaseCommand::GET_TOPICS_OF_NAMESPACE_RESPONSE:
+            return "GET_TOPICS_OF_NAMESPACE_RESPONSE";
+            break;
     };
 }
 
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index 53fb1bb..d9b8589 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -112,6 +112,7 @@ class Commands {
 
     static SharedBuffer newSeek(uint64_t consumerId, uint64_t requestId, const 
MessageId& messageId);
     static SharedBuffer newGetLastMessageId(uint64_t consumerId, uint64_t 
requestId);
+    static SharedBuffer newGetTopicsOfNamespace(const std::string& nsName, 
uint64_t requestId);
 
    private:
     Commands();
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 0c145c1..058ca57 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -105,4 +105,10 @@ bool ConsumerConfiguration::isReadCompacted() const { 
return impl_->readCompacte
 
 void ConsumerConfiguration::setReadCompacted(bool compacted) { 
impl_->readCompacted = compacted; }
 
+void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) 
{
+    impl_->patternAutoDiscoveryPeriod = periodInSeconds;
+}
+
+int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return 
impl_->patternAutoDiscoveryPeriod; }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index eb0c374..0cc0c72 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -35,6 +35,7 @@ struct ConsumerConfigurationImpl {
     CryptoKeyReaderPtr cryptoKeyReader;
     ConsumerCryptoFailureAction cryptoFailureAction;
     bool readCompacted;
+    int patternAutoDiscoveryPeriod;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -45,7 +46,8 @@ struct ConsumerConfigurationImpl {
           maxTotalReceiverQueueSizeAcrossPartitions(50000),
           cryptoKeyReader(),
           cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
-          readCompacted(false) {}
+          readCompacted(false),
+          patternAutoDiscoveryPeriod(60) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.cc 
b/pulsar-client-cpp/lib/HTTPLookupService.cc
index 36d11f5..fe27e8d 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.cc
+++ b/pulsar-client-cpp/lib/HTTPLookupService.cc
@@ -68,8 +68,9 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::lookupAsync(const std::st
                           << topicName->getEncodedLocalName();
     }
 
-    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
-                                                   promise, 
completeUrlStream.str(), Lookup));
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str(),
+                                                   Lookup));
     return promise.getFuture();
 }
 
@@ -89,8 +90,27 @@ Future<Result, LookupDataResultPtr> 
HTTPLookupService::getPartitionMetadataAsync
                           << '/' << PARTITION_METHOD_NAME;
     }
 
-    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest,
 shared_from_this(),
-                                                   promise, 
completeUrlStream.str(), PartitionMetaData));
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleLookupHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str(),
+                                                   PartitionMetaData));
+    return promise.getFuture();
+}
+
+Future<Result, NamespaceTopicsPtr> 
HTTPLookupService::getTopicsOfNamespaceAsync(
+    const NamespaceNamePtr &nsName) {
+    NamespaceTopicsPromise promise;
+    std::stringstream completeUrlStream;
+
+    if (nsName->isV2()) {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' 
<< nsName->toString() << '/'
+                          << "topics";
+    } else {
+        completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' 
<< nsName->toString() << '/'
+                          << "destinations";
+    }
+
+    
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
+                                                   shared_from_this(), 
promise, completeUrlStream.str()));
     return promise.getFuture();
 }
 
@@ -99,19 +119,28 @@ static size_t curlWriteCallback(void *contents, size_t 
size, size_t nmemb, void
     return size * nmemb;
 }
 
-void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const 
std::string completeUrl,
-                                        RequestType requestType) {
+void 
HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise 
promise,
+                                                         const std::string 
completeUrl) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result != ResultOk) {
+        promise.setFailed(result);
+    } else {
+        promise.setValue(parseNamespaceTopicsData(responseData));
+    }
+}
+
+Result HTTPLookupService::sendHTTPRequest(const std::string completeUrl, 
std::string &responseData) {
     CURL *handle;
     CURLcode res;
-    std::string responseData;
     std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_;
     handle = curl_easy_init();
 
     if (!handle) {
         LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
-        promise.setFailed(ResultLookupError);
         // No curl_easy_cleanup required since handle not initialized
-        return;
+        return ResultLookupError;
     }
     // set URL
     curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
@@ -148,9 +177,8 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
             "All Authentication methods should have AuthenticationData and 
return true on getAuthData for "
             "url "
             << completeUrl);
-        promise.setFailed(authResult);
         curl_easy_cleanup(handle);
-        return;
+        return authResult;
     }
     struct curl_slist *list = NULL;
     if (authDataContent->hasDataForHttp()) {
@@ -158,7 +186,7 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
     }
     curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
 
-    LOG_INFO("Curl Lookup Request sent for" << completeUrl);
+    LOG_INFO("Curl Lookup Request sent for " << completeUrl);
 
     // Make get call to server
     res = curl_easy_perform(handle);
@@ -166,16 +194,17 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
     // Free header list
     curl_slist_free_all(list);
 
+    Result retResult = ResultOk;
+
     switch (res) {
         case CURLE_OK:
             long response_code;
             curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
             LOG_INFO("Response received for url " << completeUrl << " code " 
<< response_code);
             if (response_code == 200) {
-                promise.setValue((requestType == PartitionMetaData) ? 
parsePartitionData(responseData)
-                                                                    : 
parseLookupData(responseData));
+                retResult = ResultOk;
             } else {
-                promise.setFailed(ResultLookupError);
+                retResult = ResultLookupError;
             }
             break;
         case CURLE_COULDNT_CONNECT:
@@ -183,22 +212,23 @@ void HTTPLookupService::sendHTTPRequest(LookupPromise 
promise, const std::string
         case CURLE_COULDNT_RESOLVE_HOST:
         case CURLE_HTTP_RETURNED_ERROR:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultConnectError);
+            retResult = ResultConnectError;
             break;
         case CURLE_READ_ERROR:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultReadError);
+            retResult = ResultReadError;
             break;
         case CURLE_OPERATION_TIMEDOUT:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultTimeout);
+            retResult = ResultTimeout;
             break;
         default:
             LOG_ERROR("Response failed for url " << completeUrl << ". Error 
Code " << res);
-            promise.setFailed(ResultLookupError);
+            retResult = ResultLookupError;
             break;
     }
     curl_easy_cleanup(handle);
+    return retResult;
 }
 
 LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string 
&json) {
@@ -243,4 +273,48 @@ LookupDataResultPtr 
HTTPLookupService::parseLookupData(const std::string &json)
     LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
     return lookupDataResultPtr;
 }
+
+NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const 
std::string &json) {
+    Json::Value root;
+    Json::Reader reader;
+    if (!reader.parse(json, root, false)) {
+        LOG_ERROR("Failed to parse json of Topics of Namespace: " << 
reader.getFormatedErrorMessages()
+                                                                  << "\nInput 
Json = " << json);
+        return NamespaceTopicsPtr();
+    }
+
+    Json::Value topicsArray = root["topics"];
+    std::set<std::string> topicSet;
+    // get all topics
+    for (int i = 0; i < topicsArray.size(); i++) {
+        // remove partition part
+        const std::string &topicName = topicsArray[i].asString();
+        int pos = topicName.find("-partition-");
+        std::string filteredName = topicName.substr(0, pos);
+
+        // filter duped topic name
+        if (topicSet.find(filteredName) == topicSet.end()) {
+            topicSet.insert(filteredName);
+        }
+    }
+
+    NamespaceTopicsPtr topicsResultPtr =
+        boost::make_shared<std::vector<std::string>>(topicSet.begin(), 
topicSet.end());
+
+    return topicsResultPtr;
+}
+
+void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const 
std::string completeUrl,
+                                                RequestType requestType) {
+    std::string responseData;
+    Result result = sendHTTPRequest(completeUrl, responseData);
+
+    if (result != ResultOk) {
+        promise.setFailed(result);
+    } else {
+        promise.setValue((requestType == PartitionMetaData) ? 
parsePartitionData(responseData)
+                                                            : 
parseLookupData(responseData));
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/HTTPLookupService.h 
b/pulsar-client-cpp/lib/HTTPLookupService.h
index d7fff33..66cd251 100644
--- a/pulsar-client-cpp/lib/HTTPLookupService.h
+++ b/pulsar-client-cpp/lib/HTTPLookupService.h
@@ -52,7 +52,12 @@ class HTTPLookupService : public LookupService, public 
boost::enable_shared_from
 
     static LookupDataResultPtr parsePartitionData(const std::string&);
     static LookupDataResultPtr parseLookupData(const std::string&);
-    void sendHTTPRequest(LookupPromise, const std::string, RequestType);
+    static NamespaceTopicsPtr parseNamespaceTopicsData(const std::string&);
+
+    void handleLookupHTTPRequest(LookupPromise, const std::string, 
RequestType);
+    void handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise, 
const std::string completeUrl);
+
+    Result sendHTTPRequest(const std::string completeUrl, std::string& 
responseData);
 
    public:
     HTTPLookupService(const std::string&, const ClientConfiguration&, const 
AuthenticationPtr&);
@@ -60,6 +65,8 @@ class HTTPLookupService : public LookupService, public 
boost::enable_shared_from
     Future<Result, LookupDataResultPtr> lookupAsync(const std::string&);
 
     Future<Result, LookupDataResultPtr> getPartitionMetadataAsync(const 
TopicNamePtr&);
+
+    Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName);
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/LookupService.h 
b/pulsar-client-cpp/lib/LookupService.h
index 48d5595..1221263 100644
--- a/pulsar-client-cpp/lib/LookupService.h
+++ b/pulsar-client-cpp/lib/LookupService.h
@@ -26,6 +26,10 @@
 #include <lib/TopicName.h>
 
 namespace pulsar {
+typedef boost::shared_ptr<std::vector<std::string>> NamespaceTopicsPtr;
+typedef Promise<Result, NamespaceTopicsPtr> NamespaceTopicsPromise;
+typedef boost::shared_ptr<Promise<Result, NamespaceTopicsPtr>> 
NamespaceTopicsPromisePtr;
+
 class LookupService {
    public:
     /*
@@ -42,6 +46,13 @@ class LookupService {
      */
     virtual Future<Result, LookupDataResultPtr> 
getPartitionMetadataAsync(const TopicNamePtr& topicName) = 0;
 
+    /**
+     * @param   namespace - namespace-name
+     *
+     * Returns all the topics name for a given namespace.
+     */
+    virtual Future<Result, NamespaceTopicsPtr> getTopicsOfNamespaceAsync(const 
NamespaceNamePtr& nsName) = 0;
+
     virtual ~LookupService() {}
 };
 
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 6425687..3b1d985 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -80,7 +80,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     // not supported
     virtual void seekAsync(const MessageId& msgId, ResultCallback callback);
 
-   private:
+   protected:
     const ClientImplPtr client_;
     const std::string subscriptionName_;
     std::string consumerStr_;
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
new file mode 100644
index 0000000..95f8a36
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.cc
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "PatternMultiTopicsConsumerImpl.h"
+
+DECLARE_LOG_OBJECT()
+
+using namespace pulsar;
+
+PatternMultiTopicsConsumerImpl::PatternMultiTopicsConsumerImpl(ClientImplPtr 
client,
+                                                               const 
std::string pattern,
+                                                               const 
std::vector<std::string>& topics,
+                                                               const 
std::string& subscriptionName,
+                                                               const 
ConsumerConfiguration& conf,
+                                                               const 
LookupServicePtr lookupServicePtr_)
+    : MultiTopicsConsumerImpl(client, topics, subscriptionName, 
TopicName::get(pattern), conf,
+                              lookupServicePtr_),
+      patternString_(pattern),
+      pattern_(std::regex(pattern)),
+      autoDiscoveryTimer_(),
+      autoDiscoveryRunning_(false) {}
+
+const std::regex PatternMultiTopicsConsumerImpl::getPattern() { return 
pattern_; }
+
+void PatternMultiTopicsConsumerImpl::resetAutoDiscoveryTimer() {
+    autoDiscoveryRunning_ = false;
+    
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+    autoDiscoveryTimer_->async_wait(
+        boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, 
this, _1));
+}
+
+void PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask(const 
boost::system::error_code& err) {
+    if (err == boost::asio::error::operation_aborted) {
+        LOG_DEBUG(getName() << "Timer cancelled: " << err.message());
+        return;
+    } else if (err) {
+        LOG_ERROR(getName() << "Timer error: " << err.message());
+        return;
+    }
+
+    if (state_ != Ready) {
+        LOG_ERROR("Error in autoDiscoveryTimerTask consumer state not ready: " 
<< state_);
+        resetAutoDiscoveryTimer();
+        return;
+    }
+
+    if (autoDiscoveryRunning_) {
+        LOG_DEBUG("autoDiscoveryTimerTask still running, cancel this running. 
");
+        return;
+    }
+
+    autoDiscoveryRunning_ = true;
+
+    // already get namespace from pattern.
+    assert(namespaceName_);
+
+    lookupServicePtr_->getTopicsOfNamespaceAsync(namespaceName_)
+        
.addListener(boost::bind(&PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace,
 this, _1, _2));
+}
+
+void PatternMultiTopicsConsumerImpl::timerGetTopicsOfNamespace(const Result 
result,
+                                                               const 
NamespaceTopicsPtr topics) {
+    if (result != ResultOk) {
+        LOG_ERROR("Error in Getting topicsOfNameSpace. result: " << result);
+        resetAutoDiscoveryTimer();
+        return;
+    }
+
+    NamespaceTopicsPtr newTopics = 
PatternMultiTopicsConsumerImpl::topicsPatternFilter(*topics, pattern_);
+    // get old topics in consumer:
+    NamespaceTopicsPtr oldTopics = 
boost::make_shared<std::vector<std::string>>();
+    for (std::map<std::string, int>::iterator it = topicsPartitions_.begin(); 
it != topicsPartitions_.end();
+         it++) {
+        oldTopics->push_back(it->first);
+    }
+    NamespaceTopicsPtr topicsAdded = topicsListsMinus(*newTopics, *oldTopics);
+    NamespaceTopicsPtr topicsRemoved = topicsListsMinus(*oldTopics, 
*newTopics);
+
+    // callback method when removed topics all un-subscribed.
+    ResultCallback topicsRemovedCallback = [this](Result result) {
+        if (result != ResultOk) {
+            LOG_ERROR("Failed to unsubscribe topics: " << result);
+        }
+        resetAutoDiscoveryTimer();
+    };
+
+    // callback method when added topics all subscribed.
+    ResultCallback topicsAddedCallback = [this, topicsRemoved, 
topicsRemovedCallback](Result result) {
+        if (result == ResultOk) {
+            // call to unsubscribe all removed topics.
+            onTopicsRemoved(topicsRemoved, topicsRemovedCallback);
+        } else {
+            resetAutoDiscoveryTimer();
+        }
+    };
+
+    // call to subscribe new added topics, then in its callback do unsubscribe
+    onTopicsAdded(topicsAdded, topicsAddedCallback);
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsAdded(NamespaceTopicsPtr 
addedTopics, ResultCallback callback) {
+    // start call subscribeOneTopicAsync for each single topic
+
+    if (addedTopics->empty()) {
+        LOG_DEBUG("no topics need subscribe");
+        callback(ResultOk);
+        return;
+    }
+    int topicsNumber = addedTopics->size();
+
+    boost::shared_ptr<std::atomic<int>> topicsNeedCreate = 
boost::make_shared<std::atomic<int>>(topicsNumber);
+    // subscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = addedTopics->begin(); 
itr != addedTopics->end();
+         itr++) {
+        MultiTopicsConsumerImpl::subscribeOneTopicAsync(*itr).addListener(
+            boost::bind(&PatternMultiTopicsConsumerImpl::handleOneTopicAdded, 
this, _1, *itr,
+                        topicsNeedCreate, callback));
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::handleOneTopicAdded(const Result result, 
const std::string& topic,
+                                                         
boost::shared_ptr<std::atomic<int>> topicsNeedCreate,
+                                                         ResultCallback 
callback) {
+    int previous = topicsNeedCreate->fetch_sub(1);
+    assert(previous > 0);
+
+    if (result != ResultOk) {
+        LOG_ERROR("Failed when subscribed to topic " << topic << "  Error - " 
<< result);
+        callback(result);
+        return;
+    }
+
+    if (topicsNeedCreate->load() == 0) {
+        LOG_DEBUG("Subscribed all new added topics");
+        callback(result);
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::onTopicsRemoved(NamespaceTopicsPtr 
removedTopics,
+                                                     ResultCallback callback) {
+    // start call subscribeOneTopicAsync for each single topic
+    if (removedTopics->empty()) {
+        LOG_DEBUG("no topics need unsubscribe");
+        callback(ResultOk);
+        return;
+    }
+    int topicsNumber = removedTopics->size();
+
+    boost::shared_ptr<std::atomic<int>> topicsNeedUnsub = 
boost::make_shared<std::atomic<int>>(topicsNumber);
+    ResultCallback oneTopicUnsubscribedCallback = [this, topicsNeedUnsub, 
callback](Result result) {
+        int previous = topicsNeedUnsub->fetch_sub(1);
+        assert(previous > 0);
+
+        if (result != ResultOk) {
+            LOG_ERROR("Failed when unsubscribe to one topic.  Error - " << 
result);
+            callback(result);
+            return;
+        }
+
+        if (topicsNeedUnsub->load() == 0) {
+            LOG_DEBUG("unSubscribed all needed topics");
+            callback(result);
+        }
+    };
+
+    // unsubscribe for each passed in topic
+    for (std::vector<std::string>::const_iterator itr = 
removedTopics->begin(); itr != removedTopics->end();
+         itr++) {
+        MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(*itr, 
oneTopicUnsubscribedCallback);
+    }
+}
+
+NamespaceTopicsPtr PatternMultiTopicsConsumerImpl::topicsPatternFilter(const 
std::vector<std::string>& topics,
+                                                                       const 
std::regex& pattern) {
+    NamespaceTopicsPtr topicsResultPtr = 
boost::make_shared<std::vector<std::string>>();
+
+    for (std::vector<std::string>::const_iterator itr = topics.begin(); itr != 
topics.end(); itr++) {
+        if (std::regex_match(*itr, pattern)) {
+            topicsResultPtr->push_back(*itr);
+        }
+    }
+    return topicsResultPtr;
+}
+
+NamespaceTopicsPtr 
PatternMultiTopicsConsumerImpl::topicsListsMinus(std::vector<std::string>& 
list1,
+                                                                    
std::vector<std::string>& list2) {
+    NamespaceTopicsPtr topicsResultPtr = 
boost::make_shared<std::vector<std::string>>();
+    std::remove_copy_if(list1.begin(), list1.end(), 
std::back_inserter(*topicsResultPtr),
+                        [&list2](const std::string& arg) {
+                            return (std::find(list2.begin(), list2.end(), arg) 
!= list2.end());
+                        });
+
+    return topicsResultPtr;
+}
+
+void PatternMultiTopicsConsumerImpl::start() {
+    MultiTopicsConsumerImpl::start();
+
+    LOG_DEBUG("PatternMultiTopicsConsumerImpl start autoDiscoveryTimer_.");
+
+    // Init autoDiscoveryTimer task only once, wait for the timeout to happen
+    if (!autoDiscoveryTimer_ && conf_.getPatternAutoDiscoveryPeriod() > 0) {
+        autoDiscoveryTimer_ = 
client_->getIOExecutorProvider()->get()->createDeadlineTimer();
+        
autoDiscoveryTimer_->expires_from_now(seconds(conf_.getPatternAutoDiscoveryPeriod()));
+        autoDiscoveryTimer_->async_wait(
+            
boost::bind(&PatternMultiTopicsConsumerImpl::autoDiscoveryTimerTask, this, _1));
+    }
+}
+
+void PatternMultiTopicsConsumerImpl::shutdown() {
+    Lock lock(mutex_);
+    state_ = Closed;
+    autoDiscoveryTimer_->cancel();
+    multiTopicsConsumerCreatedPromise_.setFailed(ResultAlreadyClosed);
+}
+
+void PatternMultiTopicsConsumerImpl::closeAsync(ResultCallback callback) {
+    MultiTopicsConsumerImpl::closeAsync(callback);
+    autoDiscoveryTimer_->cancel();
+}
diff --git a/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h 
b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
new file mode 100644
index 0000000..503dc99
--- /dev/null
+++ b/pulsar-client-cpp/lib/PatternMultiTopicsConsumerImpl.h
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#define PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
+#include "ConsumerImpl.h"
+#include "ClientImpl.h"
+#include <regex>
+#include "boost/enable_shared_from_this.hpp"
+#include <lib/TopicName.h>
+#include <lib/NamespaceName.h>
+#include "MultiTopicsConsumerImpl.h"
+
+namespace pulsar {
+
+class PatternMultiTopicsConsumerImpl;
+
+class PatternMultiTopicsConsumerImpl : public MultiTopicsConsumerImpl {
+   public:
+    // currently we support topics under same namespace, so `patternString` is 
a regex,
+    // which only contains after namespace part.
+    // when subscribe, client will first get all topics that match given 
pattern.
+    // `topics` contains the topics that match `patternString`.
+    PatternMultiTopicsConsumerImpl(ClientImplPtr client, const std::string 
patternString,
+                                   const std::vector<std::string>& topics,
+                                   const std::string& subscriptionName, const 
ConsumerConfiguration& conf,
+                                   const LookupServicePtr lookupServicePtr_);
+
+    const std::regex getPattern();
+
+    void autoDiscoveryTimerTask(const boost::system::error_code& err);
+
+    // filter input `topics` with given `pattern`, return matched topics
+    static NamespaceTopicsPtr topicsPatternFilter(const 
std::vector<std::string>& topics,
+                                                  const std::regex& pattern);
+
+    // Find out topics, which are in `list1` but not in `list2`.
+    static NamespaceTopicsPtr topicsListsMinus(std::vector<std::string>& list1,
+                                               std::vector<std::string>& 
list2);
+
+    virtual void closeAsync(ResultCallback callback);
+    virtual void start();
+    virtual void shutdown();
+
+   private:
+    const std::string patternString_;
+    const std::regex pattern_;
+    typedef boost::shared_ptr<boost::asio::deadline_timer> TimerPtr;
+    TimerPtr autoDiscoveryTimer_;
+    bool autoDiscoveryRunning_;
+
+    void resetAutoDiscoveryTimer();
+    void timerGetTopicsOfNamespace(const Result result, const 
NamespaceTopicsPtr topics);
+    void onTopicsAdded(NamespaceTopicsPtr addedTopics, ResultCallback 
callback);
+    void onTopicsRemoved(NamespaceTopicsPtr removedTopics, ResultCallback 
callback);
+    void handleOneTopicAdded(const Result result, const std::string& topic,
+                             boost::shared_ptr<std::atomic<int>> 
topicsNeedCreate, ResultCallback callback);
+};
+
+}  // namespace pulsar
+#endif  // PULSAR_PATTERN_MULTI_TOPICS_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index cf28b34..d4c1df8 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -34,6 +34,7 @@
 #include <set>
 #include <vector>
 #include <lib/MultiTopicsConsumerImpl.h>
+#include <lib/PatternMultiTopicsConsumerImpl.h>
 #include "lib/Future.h"
 #include "lib/Utils.h"
 DECLARE_LOG_OBJECT()
@@ -1679,3 +1680,259 @@ TEST(BasicEndToEndTest, testMultiTopicsConsumerPubSub) {
 
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testPatternTopicsConsumerInvalid) {
+    Client client(lookupUrl);
+
+    // invalid namespace
+    std::string pattern = 
"invalidDomain://prop/unit/ns/patternMultiTopicsConsumerInvalid.*";
+    std::string subName = "testPatternMultiTopicsConsumerInvalid";
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultInvalidTopicName, result);
+
+    client.shutdown();
+}
+
+// create 4 topics, in which 3 topics match the pattern,
+// verify PatternMultiTopicsConsumer subscribed matched topics,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
+    Client client(lookupUrl);
+    std::string pattern = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumer.*";
+
+    std::string subName = "testPatternMultiTopicsConsumer";
+    std::string topicName1 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub1";
+    std::string topicName2 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub2";
+    std::string topicName3 = 
"persistent://prop/unit/ns1/patternMultiTopicsConsumerPubSub3";
+    // This will not match pattern
+    std::string topicName4 = 
"persistent://prop/unit/ns1/patternMultiTopicsNotMatchPubSub4";
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub1/partitions";
+    std::string url2 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub2/partitions";
+    std::string url3 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsConsumerPubSub3/partitions";
+    std::string url4 =
+        adminUrl + 
"admin/persistent/prop/unit/ns1/patternMultiTopicsNotMatchPubSub4/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url4, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    Result result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 
producer not match");
+
+    int messageNumber = 100;
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);  // size for each sub-consumer
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, consConfig,
+                                   
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created topics consumer on a pattern that match 3 topics");
+
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    LOG_INFO("Consuming and acking 300 messages by multiTopicsConsumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+    LOG_INFO("Consumed and acked 300 messages by multiTopicsConsumer");
+
+    // verify no more to receive, because producer4 not match pattern
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
+
+// create a pattern consumer, which contains no match topics at beginning.
+// create 4 topics, in which 3 topics match the pattern.
+// verify PatternMultiTopicsConsumer subscribed matched topics, after a while,
+// and only receive messages from matched topics.
+TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
+    Client client(lookupUrl);
+    std::string pattern = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumer.*";
+    Result result;
+    std::string subName = "testPatternTopicsAutoConsumer";
+
+    // 1.  create a pattern consumer, which contains no match topics at 
beginning.
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerShared);
+    consConfig.setReceiverQueueSize(10);          // size for each sub-consumer
+    consConfig.setPatternAutoDiscoveryPeriod(1);  // set waiting time for auto 
discovery
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeWithRegexAsync(pattern, subName, consConfig,
+                                   
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(consumer.getSubscriptionName(), subName);
+    LOG_INFO("created pattern consumer with not match topics at beginning");
+
+    // 2. create 4 topics, in which 3 match the pattern.
+    std::string topicName1 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub1";
+    std::string topicName2 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub2";
+    std::string topicName3 = 
"persistent://prop/unit/ns2/patternTopicsAutoConsumerPubSub3";
+    // This will not match pattern
+    std::string topicName4 = 
"persistent://prop/unit/ns2/patternMultiTopicsNotMatchPubSub4";
+
+    // call admin api to make topics partitioned
+    std::string url1 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub1/partitions";
+    std::string url2 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub2/partitions";
+    std::string url3 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternTopicsAutoConsumerPubSub3/partitions";
+    std::string url4 =
+        adminUrl + 
"admin/persistent/prop/unit/ns2/patternMultiTopicsNotMatchPubSub4/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url4, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+    LOG_INFO("created 3 producers that match, with partitions: 2, 3, 4, and 1 
producer not match");
+
+    // 3. wait enough time to trigger auto discovery
+    usleep(2 * 1000 * 1000);
+
+    // 4. produce data.
+    int messageNumber = 100;
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 100 messages by producer 1 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer1.send(msg));
+    }
+
+    msgContent = "msg-content2";
+    LOG_INFO("Publishing 100 messages by producer 2 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer2.send(msg));
+    }
+
+    msgContent = "msg-content3";
+    LOG_INFO("Publishing 100 messages by producer 3 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer3.send(msg));
+    }
+
+    msgContent = "msg-content4";
+    LOG_INFO("Publishing 100 messages by producer 4 synchronously");
+    for (int msgNum = 0; msgNum < messageNumber; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer4.send(msg));
+    }
+
+    // 5. pattern consumer already subscribed 3 topics
+    LOG_INFO("Consuming and acking 300 messages by pattern topics consumer");
+    for (int i = 0; i < 3 * messageNumber; i++) {
+        Message m;
+        ASSERT_EQ(ResultOk, consumer.receive(m, 1000));
+        ASSERT_EQ(ResultOk, consumer.acknowledge(m));
+    }
+    LOG_INFO("Consumed and acked 300 messages by pattern topics consumer");
+
+    // verify no more to receive, because producer4 not match pattern
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer.receive(m, 1000));
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+
+    client.shutdown();
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc 
b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
index 67e6af5..f706eb8 100644
--- a/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
+++ b/pulsar-client-cpp/tests/BinaryLookupServiceTest.cc
@@ -25,9 +25,12 @@
 #include <Future.h>
 #include <Utils.h>
 #include "ConnectionPool.h"
+#include "HttpHelper.h"
 #include <pulsar/Authentication.h>
 #include <boost/exception/all.hpp>
 
+DECLARE_LOG_OBJECT()
+
 using namespace pulsar;
 
 TEST(BinaryLookupServiceTest, basicLookup) {
@@ -56,3 +59,67 @@ TEST(BinaryLookupServiceTest, basicLookup) {
     ASSERT_TRUE(lookupData != NULL);
     ASSERT_EQ(url, lookupData->getBrokerUrl());
 }
+
+TEST(BinaryLookupServiceTest, basicGetNamespaceTopics) {
+    std::string url = "pulsar://localhost:8885";
+    std::string adminUrl = "http://localhost:8765/";;
+    Result result;
+    // 1. create some topics under same namespace
+    Client client(url);
+
+    std::string topicName1 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics1";
+    std::string topicName2 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics2";
+    std::string topicName3 = 
"persistent://prop/unit/ns4/basicGetNamespaceTopics3";
+    // This is not in same namespace.
+    std::string topicName4 = 
"persistent://prop/unit/ns2/basicGetNamespaceTopics4";
+
+    // call admin api to make topics partitioned
+    std::string url1 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics1/partitions";
+    std::string url2 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics2/partitions";
+    std::string url3 = adminUrl + 
"admin/persistent/prop/unit/ns4/basicGetNamespaceTopics3/partitions";
+
+    int res = makePutRequest(url1, "2");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url2, "3");
+    ASSERT_FALSE(res != 204 && res != 409);
+    res = makePutRequest(url3, "4");
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer1;
+    result = client.createProducer(topicName1, producer1);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer2;
+    result = client.createProducer(topicName2, producer2);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer3;
+    result = client.createProducer(topicName3, producer3);
+    ASSERT_EQ(ResultOk, result);
+    Producer producer4;
+    result = client.createProducer(topicName4, producer4);
+    ASSERT_EQ(ResultOk, result);
+
+    // 2.  call getTopicsOfNamespaceAsync
+    ExecutorServiceProviderPtr service = 
boost::make_shared<ExecutorServiceProvider>(1);
+    AuthenticationPtr authData = AuthFactory::Disabled();
+    ClientConfiguration conf;
+    ExecutorServiceProviderPtr 
ioExecutorProvider_(boost::make_shared<ExecutorServiceProvider>(1));
+    ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
+    BinaryProtoLookupService lookupService(pool_, url);
+
+    TopicNamePtr topicName = TopicName::get(topicName1);
+    NamespaceNamePtr nsName = topicName->getNamespaceName();
+
+    Future<Result, NamespaceTopicsPtr> getTopicsFuture = 
lookupService.getTopicsOfNamespaceAsync(nsName);
+    NamespaceTopicsPtr topicsData;
+    result = getTopicsFuture.get(topicsData);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_TRUE(topicsData != NULL);
+
+    // 3. verify result contains first 3 topic
+    ASSERT_EQ(topicsData->size(), 3);
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) 
!= topicsData->end());
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) 
!= topicsData->end());
+    ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) 
!= topicsData->end());
+
+    client.shutdown();
+}

Reply via email to