This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
commit af1ff3a789bec223b102bc7e27e33cf7bdc17104 Author: Matteo Merli <mme...@apache.org> AuthorDate: Fri Aug 31 15:39:11 2018 -0700 Added producer/consumer properties in Go client (#2447) --- pulsar-client-cpp/include/pulsar/ProducerConfiguration.h | 5 +++-- pulsar-client-cpp/include/pulsar/c/consumer_configuration.h | 3 +++ pulsar-client-cpp/include/pulsar/c/producer_configuration.h | 3 +++ pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc | 5 +++++ pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc | 5 +++++ pulsar-client-go/pulsar/c_consumer.go | 12 ++++++++++++ pulsar-client-go/pulsar/c_producer.go | 12 ++++++++++++ pulsar-client-go/pulsar/consumer.go | 4 ++++ pulsar-client-go/pulsar/producer.go | 4 ++++ pulsar-client-go/pulsar/producer_test.go | 8 ++++++-- 10 files changed, 57 insertions(+), 4 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index 45154c5..565a6ab 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -128,7 +128,7 @@ class ProducerConfiguration { ProducerConfiguration& addEncryptionKey(std::string key); /** - * Check whether the message has a specific property attached. + * Check whether the producer has a specific property attached. * * @param name the name of the property to check * @return true if the message has the specified property @@ -150,7 +150,8 @@ class ProducerConfiguration { std::map<std::string, std::string>& getProperties() const; /** - * Sets a new property on a message. + * Sets a new property on the producer + * . * @param name the name of the property * @param value the associated value */ diff --git a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h index 7299867..fca47eb 100644 --- a/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/consumer_configuration.h @@ -152,6 +152,9 @@ int pulsar_consumer_is_read_compacted(pulsar_consumer_configuration_t *consumer_ void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consumer_configuration, int compacted); +void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name, + const char *value); + // const CryptoKeyReaderPtr getCryptoKeyReader() // // const; diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h index ae88198..670bf50 100644 --- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h +++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h @@ -134,6 +134,9 @@ void pulsar_producer_configuration_set_batching_max_publish_delay_ms(pulsar_prod unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms( pulsar_producer_configuration_t *conf); +void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name, + const char *value); + // const CryptoKeyReaderPtr getCryptoKeyReader() const; // ProducerConfiguration &setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader); // diff --git a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc index c8d5453..75cdc47 100644 --- a/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ConsumerConfiguration.cc @@ -113,3 +113,8 @@ void pulsar_consumer_set_read_compacted(pulsar_consumer_configuration_t *consume int compacted) { consumer_configuration->consumerConfiguration.setReadCompacted(compacted); } + +void pulsar_consumer_configuration_set_property(pulsar_consumer_configuration_t *conf, const char *name, + const char *value) { + conf->consumerConfiguration.setProperty(name, value); +} diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc index 914fc0a..a8eb5be 100644 --- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc @@ -174,3 +174,8 @@ unsigned long pulsar_producer_configuration_get_batching_max_publish_delay_ms( pulsar_producer_configuration_t *conf) { return conf->conf.getBatchingMaxPublishDelayMs(); } + +void pulsar_producer_configuration_set_property(pulsar_producer_configuration_t *conf, const char *name, + const char *value) { + conf->conf.setProperty(name, value); +} diff --git a/pulsar-client-go/pulsar/c_consumer.go b/pulsar-client-go/pulsar/c_consumer.go index 7f613b2..1b41a71 100644 --- a/pulsar-client-go/pulsar/c_consumer.go +++ b/pulsar-client-go/pulsar/c_consumer.go @@ -120,6 +120,18 @@ func subscribeAsync(client *client, options ConsumerOptions, callback func(Consu C.pulsar_consumer_set_consumer_name(conf, name) } + if options.Properties != nil { + for key, value := range options.Properties { + cKey := C.CString(key) + cValue := C.CString(value) + + C.pulsar_consumer_configuration_set_property(conf, cKey, cValue) + + C.free(unsafe.Pointer(cKey)) + C.free(unsafe.Pointer(cValue)) + } + } + C.pulsar_consumer_set_read_compacted(conf, cBool(options.ReadCompacted)) subName := C.CString(options.SubscriptionName) diff --git a/pulsar-client-go/pulsar/c_producer.go b/pulsar-client-go/pulsar/c_producer.go index b4cd2c5..284315d 100644 --- a/pulsar-client-go/pulsar/c_producer.go +++ b/pulsar-client-go/pulsar/c_producer.go @@ -124,6 +124,18 @@ func createProducerAsync(client *client, options ProducerOptions, callback func( C.pulsar_producer_configuration_set_batching_max_messages(conf, C.uint(options.BatchingMaxMessages)) } + if options.Properties != nil { + for key, value := range options.Properties { + cKey := C.CString(key) + cValue := C.CString(value) + + C.pulsar_producer_configuration_set_property(conf, cKey, cValue) + + C.free(unsafe.Pointer(cKey)) + C.free(unsafe.Pointer(cValue)) + } + } + topicName := C.CString(options.Topic) defer C.free(unsafe.Pointer(topicName)) diff --git a/pulsar-client-go/pulsar/consumer.go b/pulsar-client-go/pulsar/consumer.go index b9f2616..030ba1b 100644 --- a/pulsar-client-go/pulsar/consumer.go +++ b/pulsar-client-go/pulsar/consumer.go @@ -64,6 +64,10 @@ type ConsumerOptions struct { // This argument is required when subscribing SubscriptionName string + // Attach a set of application defined properties to the consumer + // This properties will be visible in the topic stats + Properties map[string]string + // Set the timeout for unacked messages // Message not acknowledged within the give time, will be replayed by the broker to the same or a different consumer // Default is 0, which means message are not being replayed based on ack time diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go index 2cfd141..46d6dd6 100644 --- a/pulsar-client-go/pulsar/producer.go +++ b/pulsar-client-go/pulsar/producer.go @@ -71,6 +71,10 @@ type ProducerOptions struct { // a topic. Name string + // Attach a set of application defined properties to the producer + // This properties will be visible in the topic stats + Properties map[string]string + // Set the send timeout (default: 30 seconds) // If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported. // Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go index 940be85..cfa0bcb 100644 --- a/pulsar-client-go/pulsar/producer_test.go +++ b/pulsar-client-go/pulsar/producer_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" "time" ) @@ -77,6 +77,10 @@ func TestProducer(t *testing.T) { MaxPendingMessages: 100, BlockIfQueueFull: true, CompressionType: LZ4, + Properties: map[string]string{ + "my-name": "test", + "key": "value", + }, }) assertNil(t, err)