This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new ffde1bc [client] add properties to consumer for cpp & python client (#2423) ffde1bc is described below commit ffde1bcd45c73e94d5f41a05fc6fb2c6f31d3764 Author: Sijie Guo <guosi...@gmail.com> AuthorDate: Wed Aug 22 02:41:00 2018 -0700 [client] add properties to consumer for cpp & python client (#2423) * [client] add properties to consumer for cpp & python client ### Motivation This is a caught-up change to enable properties for consumer as java clients. ### Changes Enable properties on consumer for both cpp & python client ### Results Properties are added as metadata for CommandSubscribe. However there is no way to verify the consumer properties. so I didn't add any specific tests, just adding properties for both cpp and python clients in the tests, that should excerise the corresponding code path. * remove "make format" --- .../include/pulsar/ConsumerConfiguration.h | 34 ++++++++++++++++++++++ pulsar-client-cpp/lib/Commands.cc | 12 +++++++- pulsar-client-cpp/lib/Commands.h | 2 +- pulsar-client-cpp/lib/ConsumerConfiguration.cc | 32 ++++++++++++++++++++ pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 4 ++- pulsar-client-cpp/lib/ConsumerImpl.cc | 2 +- pulsar-client-cpp/python/pulsar/__init__.py | 11 ++++++- pulsar-client-cpp/python/src/config.cc | 1 + pulsar-client-cpp/python/test_consumer.py | 6 +++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 6 +++- 10 files changed, 103 insertions(+), 7 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index 36e5808..0687166 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -162,6 +162,40 @@ class ConsumerConfiguration { void setPatternAutoDiscoveryPeriod(int periodInSeconds); int getPatternAutoDiscoveryPeriod() const; + /** + * Check whether the message has a specific property attached. + * + * @param name the name of the property to check + * @return true if the message has the specified property + * @return false if the property is not defined + */ + bool hasProperty(const std::string& name) const; + + /** + * Get the value of a specific property + * + * @param name the name of the property + * @return the value of the property or null if the property was not defined + */ + const std::string& getProperty(const std::string& name) const; + + /** + * Get all the properties attached to this producer. + */ + std::map<std::string, std::string>& getProperties() const; + + /** + * Sets a new property on a message. + * @param name the name of the property + * @param value the associated value + */ + ConsumerConfiguration& setProperty(const std::string& name, const std::string& value); + + /** + * Add all the properties in the provided map + */ + ConsumerConfiguration& setProperties(const std::map<std::string, std::string>& properties); + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/lib/Commands.cc b/pulsar-client-cpp/lib/Commands.cc index 8a1933b..8bd0128 100644 --- a/pulsar-client-cpp/lib/Commands.cc +++ b/pulsar-client-cpp/lib/Commands.cc @@ -20,6 +20,7 @@ #include "MessageImpl.h" #include "Version.h" #include "pulsar/MessageBuilder.h" +#include "PulsarApi.pb.h" #include "LogUtils.h" #include "Utils.h" #include "Url.h" @@ -27,6 +28,7 @@ #include <algorithm> #include <boost/thread/mutex.hpp> +using namespace pulsar; namespace pulsar { using namespace pulsar::proto; @@ -185,7 +187,8 @@ SharedBuffer Commands::newConnect(const AuthenticationPtr& authentication, const SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& subscription, uint64_t consumerId, uint64_t requestId, CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, - Optional<MessageId> startMessageId, bool readCompacted) { + Optional<MessageId> startMessageId, bool readCompacted, + const std::map<std::string, std::string>& metadata) { BaseCommand cmd; cmd.set_type(BaseCommand::SUBSCRIBE); CommandSubscribe* subscribe = cmd.mutable_subscribe(); @@ -206,6 +209,13 @@ SharedBuffer Commands::newSubscribe(const std::string& topic, const std::string& messageIdData.set_batch_index(startMessageId.value().batchIndex()); } } + for (std::map<std::string, std::string>::const_iterator it = metadata.begin(); it != metadata.end(); + it++) { + proto::KeyValue* keyValue = proto::KeyValue().New(); + keyValue->set_key(it->first); + keyValue->set_value(it->second); + subscribe->mutable_metadata()->AddAllocated(keyValue); + } return writeMessageWithSize(cmd); } diff --git a/pulsar-client-cpp/lib/Commands.h b/pulsar-client-cpp/lib/Commands.h index d9b8589..ef1e280 100644 --- a/pulsar-client-cpp/lib/Commands.h +++ b/pulsar-client-cpp/lib/Commands.h @@ -78,7 +78,7 @@ class Commands { uint64_t consumerId, uint64_t requestId, proto::CommandSubscribe_SubType subType, const std::string& consumerName, SubscriptionMode subscriptionMode, Optional<MessageId> startMessageId, - bool readCompacted); + bool readCompacted, const std::map<std::string, std::string>& metadata); static SharedBuffer newUnsubscribe(uint64_t consumerId, uint64_t requestId); diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 058ca57..4014ad2 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -20,6 +20,8 @@ namespace pulsar { +const static std::string emptyString; + ConsumerConfiguration::ConsumerConfiguration() : impl_(boost::make_shared<ConsumerConfigurationImpl>()) {} ConsumerConfiguration::~ConsumerConfiguration() {} @@ -111,4 +113,34 @@ void ConsumerConfiguration::setPatternAutoDiscoveryPeriod(int periodInSeconds) { int ConsumerConfiguration::getPatternAutoDiscoveryPeriod() const { return impl_->patternAutoDiscoveryPeriod; } +bool ConsumerConfiguration::hasProperty(const std::string& name) const { + const std::map<std::string, std::string>& m = impl_->properties; + return m.find(name) != m.end(); +} + +const std::string& ConsumerConfiguration::getProperty(const std::string& name) const { + if (hasProperty(name)) { + const std::map<std::string, std::string>& m = impl_->properties; + return m.at(name); + } else { + return emptyString; + } +} + +std::map<std::string, std::string>& ConsumerConfiguration::getProperties() const { return impl_->properties; } + +ConsumerConfiguration& ConsumerConfiguration::setProperty(const std::string& name, const std::string& value) { + impl_->properties.insert(std::make_pair(name, value)); + return *this; +} + +ConsumerConfiguration& ConsumerConfiguration::setProperties( + const std::map<std::string, std::string>& properties) { + for (std::map<std::string, std::string>::const_iterator it = properties.begin(); it != properties.end(); + it++) { + setProperty(it->first, it->second); + } + return *this; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index 0cc0c72..16e91c8 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -36,6 +36,7 @@ struct ConsumerConfigurationImpl { ConsumerCryptoFailureAction cryptoFailureAction; bool readCompacted; int patternAutoDiscoveryPeriod; + std::map<std::string, std::string> properties; ConsumerConfigurationImpl() : unAckedMessagesTimeoutMs(0), consumerType(ConsumerExclusive), @@ -47,7 +48,8 @@ struct ConsumerConfigurationImpl { cryptoKeyReader(), cryptoFailureAction(ConsumerCryptoFailureAction::FAIL), readCompacted(false), - patternAutoDiscoveryPeriod(60) {} + patternAutoDiscoveryPeriod(60), + properties() {} }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index f0a9cca..06eb43f 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -139,7 +139,7 @@ void ConsumerImpl::connectionOpened(const ClientConnectionPtr& cnx) { uint64_t requestId = client->newRequestId(); SharedBuffer cmd = Commands::newSubscribe(topic_, subscription_, consumerId_, requestId, getSubType(), consumerName_, - subscriptionMode_, startMessageId_, readCompacted_); + subscriptionMode_, startMessageId_, readCompacted_, config_.getProperties()); cnx->sendRequestWithId(cmd, requestId) .addListener(boost::bind(&ConsumerImpl::handleCreateConsumer, shared_from_this(), cnx, _1)); } diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py index f3b560b..222c29f 100644 --- a/pulsar-client-cpp/python/pulsar/__init__.py +++ b/pulsar-client-cpp/python/pulsar/__init__.py @@ -402,7 +402,8 @@ class Client: consumer_name=None, unacked_messages_timeout_ms=None, broker_consumer_stats_cache_time_ms=30000, - is_read_compacted=False + is_read_compacted=False, + properties=None ): """ Subscribe to the given topic and subscription combination. @@ -455,6 +456,9 @@ class Client: * `broker_consumer_stats_cache_time_ms`: Sets the time duration for which the broker-side consumer stats will be cached in the client. + * `properties`: + Sets the properties for the consumer. The properties associated with a consumer + can be used for identify a consumer at broker side. """ _check_type(str, topic, 'topic') _check_type(str, subscription_name, 'subscription_name') @@ -466,6 +470,7 @@ class Client: _check_type_or_none(int, unacked_messages_timeout_ms, 'unacked_messages_timeout_ms') _check_type(int, broker_consumer_stats_cache_time_ms, 'broker_consumer_stats_cache_time_ms') _check_type(bool, is_read_compacted, 'is_read_compacted') + _check_type_or_none(dict, properties, 'properties') conf = _pulsar.ConsumerConfiguration() conf.consumer_type(consumer_type) @@ -479,6 +484,10 @@ class Client: if unacked_messages_timeout_ms: conf.unacked_messages_timeout_ms(unacked_messages_timeout_ms) conf.broker_consumer_stats_cache_time_ms(broker_consumer_stats_cache_time_ms) + if properties: + for k, v in properties.items(): + conf.property(k, v) + c = Consumer() c._consumer = self._client.subscribe(topic, subscription_name, conf) c._client = self diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc index 9deee9a..7b4459a 100644 --- a/pulsar-client-cpp/python/src/config.cc +++ b/pulsar-client-cpp/python/src/config.cc @@ -140,6 +140,7 @@ void export_config() { .def("broker_consumer_stats_cache_time_ms", &ConsumerConfiguration::setBrokerConsumerStatsCacheTimeInMs) .def("read_compacted", &ConsumerConfiguration::isReadCompacted) .def("read_compacted", &ConsumerConfiguration::setReadCompacted) + .def("property", &ConsumerConfiguration::setProperty, return_self<>()) ; class_<ReaderConfiguration>("ReaderConfiguration") diff --git a/pulsar-client-cpp/python/test_consumer.py b/pulsar-client-cpp/python/test_consumer.py index dd0f937..495dfc0 100755 --- a/pulsar-client-cpp/python/test_consumer.py +++ b/pulsar-client-cpp/python/test_consumer.py @@ -22,7 +22,11 @@ import pulsar client = pulsar.Client('pulsar://localhost:6650') -consumer = client.subscribe('my-topic', "my-subscription") +consumer = client.subscribe('my-topic', "my-subscription", + properties={ + "consumer-name": "test-consumer-name", + "consumer-id": "test-consumer-id" + }) while True: msg = consumer.receive() diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index d4c1df8..b1b05ef 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -134,8 +134,12 @@ TEST(BasicEndToEndTest, testBatchMessages) { ASSERT_EQ(ResultOk, result); Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setProperty("consumer-name", "test-consumer-name"); + consumerConfig.setProperty("consumer-id", "test-consumer-id"); Promise<Result, Consumer> consumerPromise; - client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise)); + client.subscribeAsync(topicName, subName, consumerConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); result = consumerFuture.get(consumer); ASSERT_EQ(ResultOk, result);