This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git

commit e1dddcf21608b76f57e3d4830d45fbf87f8d2e99
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Aug 27 16:34:00 2018 -0700

    Add multi-topic and regex consumer in Go client (#2448)
---
 pulsar-client-cpp/include/pulsar/c/client.h |  10 +++
 pulsar-client-cpp/lib/c/c_Client.cc         |  21 +++++
 pulsar-client-go/pulsar/c_consumer.go       |  38 +++++++--
 pulsar-client-go/pulsar/c_go_pulsar.h       |  13 +++
 pulsar-client-go/pulsar/consumer.go         |  10 ++-
 pulsar-client-go/pulsar/consumer_test.go    | 124 +++++++++++++++++++++++++++-
 6 files changed, 207 insertions(+), 9 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/c/client.h 
b/pulsar-client-cpp/include/pulsar/c/client.h
index 2da7c6d..4b603bb 100644
--- a/pulsar-client-cpp/include/pulsar/c/client.h
+++ b/pulsar-client-cpp/include/pulsar/c/client.h
@@ -86,6 +86,16 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
                                    const pulsar_consumer_configuration_t *conf,
                                    pulsar_subscribe_callback callback, void 
*ctx);
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const 
pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback 
callback, void *ctx);
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+                                           const char *subscriptionName,
+                                           const 
pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, 
void *ctx);
+
 /**
  * Create a topic reader with given {@code ReaderConfiguration} for reading 
messages from the specified
  * topic.
diff --git a/pulsar-client-cpp/lib/c/c_Client.cc 
b/pulsar-client-cpp/lib/c/c_Client.cc
index 905e410..ecefe20 100644
--- a/pulsar-client-cpp/lib/c/c_Client.cc
+++ b/pulsar-client-cpp/lib/c/c_Client.cc
@@ -98,6 +98,27 @@ void pulsar_client_subscribe_async(pulsar_client_t *client, 
const char *topic, c
                                    boost::bind(&handle_subscribe_callback, _1, 
_2, callback, ctx));
 }
 
+void pulsar_client_subscribe_multi_topics_async(pulsar_client_t *client, const 
char **topics, int topicsCount,
+                                                const char *subscriptionName,
+                                                const 
pulsar_consumer_configuration_t *conf,
+                                                pulsar_subscribe_callback 
callback, void *ctx) {
+    std::vector<std::string> topicsList;
+    for (int i = 0; i < topicsCount; i++) {
+        topicsList.push_back(topics[i]);
+    }
+
+    client->client->subscribeAsync(topicsList, subscriptionName, 
conf->consumerConfiguration,
+                                   boost::bind(&handle_subscribe_callback, _1, 
_2, callback, ctx));
+}
+
+void pulsar_client_subscribe_pattern_async(pulsar_client_t *client, const char 
*topicPattern,
+                                           const char *subscriptionName,
+                                           const 
pulsar_consumer_configuration_t *conf,
+                                           pulsar_subscribe_callback callback, 
void *ctx) {
+    client->client->subscribeWithRegexAsync(topicPattern, subscriptionName, 
conf->consumerConfiguration,
+                                            
boost::bind(&handle_subscribe_callback, _1, _2, callback, ctx));
+}
+
 pulsar_result pulsar_client_create_reader(pulsar_client_t *client, const char 
*topic,
                                           const pulsar_message_id_t 
*startMessageId,
                                           pulsar_reader_configuration_t *conf, 
pulsar_reader_t **c_reader) {
diff --git a/pulsar-client-go/pulsar/c_consumer.go 
b/pulsar-client-go/pulsar/c_consumer.go
index aebabc8..093dd9d 100644
--- a/pulsar-client-go/pulsar/c_consumer.go
+++ b/pulsar-client-go/pulsar/c_consumer.go
@@ -25,10 +25,10 @@ package pulsar
 import "C"
 
 import (
+       "context"
        "runtime"
        "time"
        "unsafe"
-       "context"
 )
 
 type consumer struct {
@@ -64,7 +64,7 @@ type subscribeContext struct {
 }
 
 func subscribeAsync(client *client, options ConsumerOptions, callback 
func(Consumer, error)) {
-       if options.Topic == "" {
+       if options.Topic == "" && options.Topics == nil && 
options.TopicsPattern == "" {
                go callback(nil, newError(C.pulsar_result_InvalidConfiguration, 
"topic is required"))
                return
        }
@@ -120,12 +120,38 @@ func subscribeAsync(client *client, options 
ConsumerOptions, callback func(Consu
                C.pulsar_consumer_set_consumer_name(conf, name)
        }
 
-       topic := C.CString(options.Topic)
        subName := C.CString(options.SubscriptionName)
-       defer C.free(unsafe.Pointer(topic))
        defer C.free(unsafe.Pointer(subName))
-       C._pulsar_client_subscribe_async(client.ptr, topic, subName,
-               conf, savePointer(&subscribeContext{conf: conf, consumer: 
consumer, callback: callback}))
+
+       callbackPtr := savePointer(&subscribeContext{conf: conf, consumer: 
consumer, callback: callback})
+
+       if options.Topic != "" {
+               topic := C.CString(options.Topic)
+               defer C.free(unsafe.Pointer(topic))
+               C._pulsar_client_subscribe_async(client.ptr, topic, subName, 
conf, callbackPtr)
+       } else if options.Topics != nil {
+               cArray := C.malloc(C.size_t(len(options.Topics)) * 
C.size_t(unsafe.Sizeof(uintptr(0))))
+
+               // convert the C array to a Go Array so we can index it
+               a := (*[1<<30 - 1]*C.char)(cArray)
+
+               for idx, topic := range options.Topics {
+                       a[idx] = C.CString(topic)
+               }
+
+               C._pulsar_client_subscribe_multi_topics_async(client.ptr, 
(**C.char)(cArray), C.int(len(options.Topics)),
+                       subName, conf, callbackPtr)
+
+               for idx, _ := range options.Topics {
+                       C.free(unsafe.Pointer(a[idx]))
+               }
+
+               C.free(cArray)
+       } else if options.TopicsPattern != "" {
+               topicsPattern := C.CString(options.TopicsPattern)
+               defer C.free(unsafe.Pointer(topicsPattern))
+               C._pulsar_client_subscribe_pattern_async(client.ptr, 
topicsPattern, subName, conf, callbackPtr)
+       }
 }
 
 type consumerCallback struct {
diff --git a/pulsar-client-go/pulsar/c_go_pulsar.h 
b/pulsar-client-go/pulsar/c_go_pulsar.h
index 045a4a2..8814276 100644
--- a/pulsar-client-go/pulsar/c_go_pulsar.h
+++ b/pulsar-client-go/pulsar/c_go_pulsar.h
@@ -73,6 +73,19 @@ static inline void 
_pulsar_client_subscribe_async(pulsar_client_t *client, const
     pulsar_client_subscribe_async(client, topic, subscriptionName, conf, 
pulsarSubscribeCallbackProxy, ctx);
 }
 
+static inline void _pulsar_client_subscribe_multi_topics_async(pulsar_client_t 
*client, const char ** topics,
+                                                  int topicsCount,  const char 
*subscriptionName,
+                                                  const 
pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_multi_topics_async(client, topics, topicsCount, 
subscriptionName, conf,
+                                               pulsarSubscribeCallbackProxy, 
ctx);
+}
+
+static inline void _pulsar_client_subscribe_pattern_async(pulsar_client_t 
*client, const char *topicPattern,
+                                                  const char *subscriptionName,
+                                                  const 
pulsar_consumer_configuration_t *conf, void *ctx) {
+    pulsar_client_subscribe_pattern_async(client, topicPattern, 
subscriptionName, conf, pulsarSubscribeCallbackProxy, ctx);
+}
+
 void pulsarMessageListenerProxy(pulsar_consumer_t *consumer, pulsar_message_t 
*message, void *ctx);
 
 static inline void _pulsar_consumer_configuration_set_message_listener(
diff --git a/pulsar-client-go/pulsar/consumer.go 
b/pulsar-client-go/pulsar/consumer.go
index 4ce0857..ed56d9e 100644
--- a/pulsar-client-go/pulsar/consumer.go
+++ b/pulsar-client-go/pulsar/consumer.go
@@ -49,9 +49,17 @@ const (
 // ConsumerBuilder is used to configure and create instances of Consumer
 type ConsumerOptions struct {
        // Specify the topic this consumer will subscribe on.
-       // This argument is required when subscribing
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
        Topic string
 
+       // Specify a list of topics this consumer will subscribe on.
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
+       Topics []string
+
+       // Specify a regular expression to subscribe to multiple topics under 
the same namespace.
+       // Either a topic, a list of topics or a topics pattern are required 
when subscribing
+       TopicsPattern string
+
        // Specify the subscription name for this consumer
        // This argument is required when subscribing
        SubscriptionName string
diff --git a/pulsar-client-go/pulsar/consumer_test.go 
b/pulsar-client-go/pulsar/consumer_test.go
index 2930f19..75a454b 100644
--- a/pulsar-client-go/pulsar/consumer_test.go
+++ b/pulsar-client-go/pulsar/consumer_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
        "time"
 )
 
@@ -131,3 +131,123 @@ func TestConsumerWithInvalidConf(t *testing.T) {
 
        assertEqual(t, err.(*Error).Result(), InvalidConfiguration)
 }
+
+
+func TestConsumerMultiTopics(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic: "multi-topic-1",
+       })
+
+       assertNil(t, err)
+
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic: "multi-topic-2",
+       })
+
+       assertNil(t, err)
+       defer producer1.Close()
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topics:           []string{"multi-topic-1", "multi-topic-2"},
+               SubscriptionName: "my-sub",
+       })
+
+       assertNil(t, err)
+       defer consumer.Close()
+
+       assertEqual(t, consumer.Subscription(), "my-sub")
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer1.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+
+               if err := producer2.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < 20; i++ {
+               msg, err := consumer.Receive(ctx)
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               consumer.Ack(msg)
+       }
+
+       consumer.Unsubscribe()
+}
+
+
+func TestConsumerRegex(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: "pulsar://localhost:6650",
+       })
+
+       assertNil(t, err)
+       defer client.Close()
+
+       producer1, err := client.CreateProducer(ProducerOptions{
+               Topic: "topic-1",
+       })
+
+       assertNil(t, err)
+
+       producer2, err := client.CreateProducer(ProducerOptions{
+               Topic: "topic-2",
+       })
+
+       assertNil(t, err)
+       defer producer1.Close()
+       defer producer2.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               TopicsPattern: "topic-\\d+",
+               SubscriptionName: "my-sub",
+       })
+
+       assertNil(t, err)
+       defer consumer.Close()
+
+       assertEqual(t, consumer.Subscription(), "my-sub")
+
+       ctx := context.Background()
+
+       for i := 0; i < 10; i++ {
+               if err := producer1.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+
+               if err := producer2.Send(ctx, ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < 20; i++ {
+               msg, err := consumer.Receive(ctx)
+               assertNil(t, err)
+               assertNotNil(t, msg)
+
+               consumer.Ack(msg)
+       }
+
+       consumer.Unsubscribe()
+}
\ No newline at end of file

Reply via email to