This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new ffde1bc  [client] add properties to consumer for cpp & python client 
(#2423)
ffde1bc is described below

commit ffde1bcd45c73e94d5f41a05fc6fb2c6f31d3764
Author: Sijie Guo <guosi...@gmail.com>
AuthorDate: Wed Aug 22 02:41:00 2018 -0700

    [client] add properties to consumer for cpp & python client (#2423)
    
    * [client] add properties to consumer for cpp & python client
    
     ### Motivation
    
    This is a caught-up change to enable properties for consumer as java 
clients.
    
     ### Changes
    
    Enable properties on consumer for both cpp & python client
    
     ### Results
    
    Properties are added as metadata for CommandSubscribe. However there is no 
way
    to verify the consumer properties. so I didn't add any specific tests, just
    adding properties for both cpp and python clients in the tests, that should
    excerise the corresponding code path.
    
    * remove "make format"
---
 .../include/pulsar/ConsumerConfiguration.h         | 34 ++++++++++++++++++++++
 pulsar-client-cpp/lib/Commands.cc                  | 12 +++++++-
 pulsar-client-cpp/lib/Commands.h                   |  2 +-
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     | 32 ++++++++++++++++++++
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |  4 ++-
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  2 +-
 pulsar-client-cpp/python/pulsar/__init__.py        | 11 ++++++-
 pulsar-client-cpp/python/src/config.cc             |  1 +
 pulsar-client-cpp/python/test_consumer.py          |  6 +++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |  6 +++-
 10 files changed, 103 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index 36e5808..0687166 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -162,6 +162,40 @@ class ConsumerConfiguration {
     void setPatternAutoDiscoveryPeriod(int periodInSeconds);
     int getPatternAutoDiscoveryPeriod() const;
 
+    /**
+     * Check whether the message has a specific property attached.
+     *
+     * @param name the name of the property to check
+     * @return true if the message has the specified property
+     * @return false if the property is not defined
+     */
+    bool hasProperty(const std::string& name) const;
+
+    /**
+     * Get the value of a specific property
+     *
+     * @param name the name of the property
+     * @return the value of the property or null if the property was not 
defined
+     */
+    const std::string& getProperty(const std::string& name) const;
+
+    /**
+     * Get all the properties attached to this producer.
+     */
+    std::map<std::string, std::string>& getProperties() const;
+
+    /**
+     * Sets a new property on a message.
+     * @param name   the name of the property
+     * @param value  the associated value
+     */
+    ConsumerConfiguration& setProperty(const std::string& name, const 
std::string& value);
+
+    /**
+     * Add all the properties in the provided map
+     */
+    ConsumerConfiguration& setProperties(const std::map<std::string, 
std::string>& properties);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/lib/Commands.cc 
b/pulsar-client-cpp/lib/Commands.cc
index 8a1933b..8bd0128 100644
--- a/pulsar-client-cpp/lib/Commands.cc
+++ b/pulsar-client-cpp/lib/Commands.cc
@@ -20,6 +20,7 @@
 #include "MessageImpl.h"
 #include "Version.h"
 #include "pulsar/MessageBuilder.h"
+#include "PulsarApi.pb.h"
 #include "LogUtils.h"
 #include "Utils.h"
 #include "Url.h"
@@ -27,6 +28,7 @@
 #include <algorithm>
 #include <boost/thread/mutex.hpp>
 
+using namespace pulsar;
 namespace pulsar {
 
 using namespace pulsar::proto;
@@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& 
authentication, const
 SharedBuffer Commands::newSubscribe(const std::string& topic, const 
std::string& subscription,
                                     uint64_t consumerId, uint64_t requestId, 
CommandSubscribe_SubType subType,
                                     const std::string& consumerName, 
SubscriptionMode subscriptionMode,
-                                    Optional<MessageId> startMessageId, bool 
readCompacted) {
+                                    Optional<MessageId> startMessageId, bool 
readCompacted,
+                                    const std::map<std::string, std::string>& 
metadata) {
     BaseCommand cmd;
     cmd.set_type(BaseCommand::SUBSCRIBE);
     CommandSubscribe* subscribe = cmd.mutable_subscribe();
@@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& 
topic, const std::string&
             messageIdData.set_batch_index(startMessageId.value().batchIndex());
         }
     }
+    for (std::map<std::string, std::string>::const_iterator it = 
metadata.begin(); it != metadata.end();
+         it++) {
+        proto::KeyValue* keyValue = proto::KeyValue().New();
+        keyValue->set_key(it->first);
+        keyValue->set_value(it->second);
+        subscribe->mutable_metadata()->AddAllocated(keyValue);
+    }
 
     return writeMessageWithSize(cmd);
 }
diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h
index d9b8589..ef1e280 100644
--- a/pulsar-client-cpp/lib/Commands.h
+++ b/pulsar-client-cpp/lib/Commands.h
@@ -78,7 +78,7 @@ class Commands {
                                      uint64_t consumerId, uint64_t requestId,
                                      proto::CommandSubscribe_SubType subType, 
const std::string& consumerName,
                                      SubscriptionMode subscriptionMode, 
Optional<MessageId> startMessageId,
-                                     bool readCompacted);
+                                     bool readCompacted, const 
std::map<std::string, std::string>& metadata);
 
     static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t 
requestId);
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 058ca57..4014ad2 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -20,6 +20,8 @@
 
 namespace pulsar {
 
+const static std::string emptyString;
+
 ConsumerConfiguration::ConsumerConfiguration() : 
impl_(boost::make_shared<ConsumerConfigurationImpl>()) {}
 
 ConsumerConfiguration::~ConsumerConfiguration() {}
@@ -111,4 +113,34 @@ void 
ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) {
 
 int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return 
impl_->patternAutoDiscoveryPeriod; }
 
+bool ConsumerConfiguration::hasProperty(const std::string& name) const {
+    const std::map<std::string, std::string>& m = impl_->properties;
+    return m.find(name) != m.end();
+}
+
+const std::string& ConsumerConfiguration::getProperty(const std::string& name) 
const {
+    if (hasProperty(name)) {
+        const std::map<std::string, std::string>& m = impl_->properties;
+        return m.at(name);
+    } else {
+        return emptyString;
+    }
+}
+
+std::map<std::string, std::string>& ConsumerConfiguration::getProperties() 
const { return impl_->properties; }
+
+ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& 
name, const std::string& value) {
+    impl_->properties.insert(std::make_pair(name, value));
+    return *this;
+}
+
+ConsumerConfiguration& ConsumerConfiguration::setProperties(
+    const std::map<std::string, std::string>& properties) {
+    for (std::map<std::string, std::string>::const_iterator it = 
properties.begin(); it != properties.end();
+         it++) {
+        setProperty(it->first, it->second);
+    }
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 0cc0c72..16e91c8 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -36,6 +36,7 @@ struct ConsumerConfigurationImpl {
     ConsumerCryptoFailureAction cryptoFailureAction;
     bool readCompacted;
     int patternAutoDiscoveryPeriod;
+    std::map<std::string, std::string> properties;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -47,7 +48,8 @@ struct ConsumerConfigurationImpl {
           cryptoKeyReader(),
           cryptoFailureAction(ConsumerCryptoFailureAction::FAIL),
           readCompacted(false),
-          patternAutoDiscoveryPeriod(60) {}
+          patternAutoDiscoveryPeriod(60),
+          properties() {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index f0a9cca..06eb43f 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -139,7 +139,7 @@ void ConsumerImpl::connectionOpened(const 
ClientConnectionPtr& cnx) {
     uint64_t requestId = client->newRequestId();
     SharedBuffer cmd =
         Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, 
getSubType(), consumerName_,
-                               subscriptionMode_, startMessageId_, 
readCompacted_);
+                               subscriptionMode_, startMessageId_, 
readCompacted_, config_.getProperties());
     cnx->sendRequestWithId(cmd, requestId)
         .addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, 
shared_from_this(), cnx, _1));
 }
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py 
b/pulsar-client-cpp/python/pulsar/__init__.py
index f3b560b..222c29f 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -402,7 +402,8 @@ class Client:
                   consumer_name=None,
                   unacked_messages_timeout_ms=None,
                   broker_consumer_stats_cache_time_ms=30000,
-                  is_read_compacted=False
+                  is_read_compacted=False,
+                  properties=None
                   ):
         """
         Subscribe to the given topic and subscription combination.
@@ -455,6 +456,9 @@ class Client:
         * `broker_consumer_stats_cache_time_ms`:
           Sets the time duration for which the broker-side consumer stats will
           be cached in the client.
+        * `properties`:
+          Sets the properties for the consumer. The properties associated with 
a consumer
+          can be used for identify a consumer at broker side.
         """
         _check_type(str, topic, 'topic')
         _check_type(str, subscription_name, 'subscription_name')
@@ -466,6 +470,7 @@ class Client:
         _check_type_or_none(int, unacked_messages_timeout_ms, 
'unacked_messages_timeout_ms')
         _check_type(int, broker_consumer_stats_cache_time_ms, 
'broker_consumer_stats_cache_time_ms')
         _check_type(bool, is_read_compacted, 'is_read_compacted')
+        _check_type_or_none(dict, properties, 'properties')
 
         conf = _pulsar.ConsumerConfiguration()
         conf.consumer_type(consumer_type)
@@ -479,6 +484,10 @@ class Client:
         if unacked_messages_timeout_ms:
             conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms)
         
conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms)
+        if properties:
+            for k, v in properties.items():
+                conf.property(k, v)
+
         c = Consumer()
         c._consumer = self._client.subscribe(topic, subscription_name, conf)
         c._client = self
diff --git a/pulsar-client-cpp/python/src/config.cc 
b/pulsar-client-cpp/python/src/config.cc
index 9deee9a..7b4459a 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -140,6 +140,7 @@ void export_config() {
             .def("broker_consumer_stats_cache_time_ms", 
&ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs)
             .def("read_compacted", &ConsumerConfiguration::isReadCompacted)
             .def("read_compacted", &ConsumerConfiguration::setReadCompacted)
+            .def("property", &ConsumerConfiguration::setProperty, 
return_self<>())
             ;
 
     class_<ReaderConfiguration>("ReaderConfiguration")
diff --git a/pulsar-client-cpp/python/test_consumer.py 
b/pulsar-client-cpp/python/test_consumer.py
index dd0f937..495dfc0 100755
--- a/pulsar-client-cpp/python/test_consumer.py
+++ b/pulsar-client-cpp/python/test_consumer.py
@@ -22,7 +22,11 @@
 import pulsar
 
 client = pulsar.Client('pulsar://localhost:6650')
-consumer = client.subscribe('my-topic', "my-subscription")
+consumer = client.subscribe('my-topic', "my-subscription",
+                            properties={
+                                "consumer-name": "test-consumer-name",
+                                "consumer-id": "test-consumer-id"
+                            })
 
 while True:
     msg = consumer.receive()
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d4c1df8..b1b05ef 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -134,8 +134,12 @@ TEST(BasicEndToEndTest, testBatchMessages) {
     ASSERT_EQ(ResultOk, result);
 
     Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
     Promise<Result, Consumer> consumerPromise;
-    client.subscribeAsync(topicName, subName, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
     Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
     result = consumerFuture.get(consumer);
     ASSERT_EQ(ResultOk, result);

Reply via email to