This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new b6dc7c1 Added Reader.HasNext in Go client (#2450) b6dc7c1 is described below commit b6dc7c1eae314e65cf782a7b0201408279dc9e8d Author: Matteo Merli <mme...@apache.org> AuthorDate: Mon Aug 27 11:41:24 2018 -0700 Added Reader.HasNext in Go client (#2450) ### Motivation Added `Reader.HasNext()` in Go client library --- pulsar-client-go/pulsar/c_reader.go | 13 +++++++++++++ pulsar-client-go/pulsar/reader.go | 3 +++ pulsar-client-go/pulsar/reader_test.go | 12 ++++++++++-- 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/pulsar-client-go/pulsar/c_reader.go b/pulsar-client-go/pulsar/c_reader.go index 730f9b8..12c1103 100644 --- a/pulsar-client-go/pulsar/c_reader.go +++ b/pulsar-client-go/pulsar/c_reader.go @@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, error) { } } +func (r *reader) HasNext() (bool, error) { + value := C.int(0) + res := C.pulsar_reader_has_message_available(r.ptr, &value) + + if res != C.pulsar_result_Ok { + return false, newError(res, "Failed to check if next message is available") + } else if value == C.int(1) { + return true, nil + } else { + return false, nil + } +} + func (r *reader) Close() error { channel := make(chan error) r.CloseAsync(func(err error) { channel <- err; close(channel) }) diff --git a/pulsar-client-go/pulsar/reader.go b/pulsar-client-go/pulsar/reader.go index f61ebd7..7015c9c 100644 --- a/pulsar-client-go/pulsar/reader.go +++ b/pulsar-client-go/pulsar/reader.go @@ -67,6 +67,9 @@ type Reader interface { // Read the next message in the topic, blocking until a message is available Next(context.Context) (Message, error) + // Check if there is any message available to read from the current position + HasNext() (bool, error) + // Close the reader and stop the broker to push more messages Close() error } diff --git a/pulsar-client-go/pulsar/reader_test.go b/pulsar-client-go/pulsar/reader_test.go index 11d1b36..3b075e1 100644 --- a/pulsar-client-go/pulsar/reader_test.go +++ b/pulsar-client-go/pulsar/reader_test.go @@ -20,9 +20,9 @@ package pulsar import ( - "testing" - "fmt" "context" + "fmt" + "testing" ) func TestReaderConnectError(t *testing.T) { @@ -80,12 +80,20 @@ func TestReader(t *testing.T) { t.Fatal(err) } + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, true) + msg, err := reader.Next(ctx) assertNil(t, err) assertNotNil(t, msg) assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", i)) } + + hasNext, err := reader.HasNext() + assertNil(t, err) + assertEqual(t, hasNext, false) } func TestReaderWithInvalidConf(t *testing.T) {