This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 73bc19d  Added Reader.HasNext in Go client (#2450)
73bc19d is described below

commit 73bc19dc41b6a06bd7051216682cc5d2fc5d5592
Author: Matteo Merli <mme...@apache.org>
AuthorDate: Mon Aug 27 11:41:24 2018 -0700

    Added Reader.HasNext in Go client (#2450)
    
    ### Motivation
    
    Added `Reader.HasNext()` in Go client library
---
 pulsar-client-go/pulsar/c_reader.go    | 13 +++++++++++++
 pulsar-client-go/pulsar/reader.go      |  3 +++
 pulsar-client-go/pulsar/reader_test.go | 12 ++++++++++--
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 730f9b8..12c1103 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
        }
 }
 
+func (r *reader) HasNext() (bool, error) {
+       value := C.int(0)
+       res := C.pulsar_reader_has_message_available(r.ptr, &value)
+
+       if res != C.pulsar_result_Ok {
+               return false, newError(res, "Failed to check if next message is 
available")
+       } else if value == C.int(1) {
+               return true, nil
+       } else {
+               return false, nil
+       }
+}
+
 func (r *reader) Close() error {
        channel := make(chan error)
        r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index f61ebd7..7015c9c 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
        // Read the next message in the topic, blocking until a message is 
available
        Next(context.Context) (Message, error)
 
+       // Check if there is any message available to read from the current 
position
+       HasNext() (bool, error)
+
        // Close the reader and stop the broker to push more messages
        Close() error
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index 11d1b36..3b075e1 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
                        t.Fatal(err)
                }
 
+               hasNext, err := reader.HasNext()
+               assertNil(t, err)
+               assertEqual(t, hasNext, true)
+
                msg, err := reader.Next(ctx)
                assertNil(t, err)
                assertNotNil(t, msg)
 
                assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
        }
+
+       hasNext, err := reader.HasNext()
+       assertNil(t, err)
+       assertEqual(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {

Reply via email to