sijie closed pull request #2450: Added Reader.HasNext in Go client
URL: https://github.com/apache/incubator-pulsar/pull/2450
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-go/pulsar/c_reader.go 
b/pulsar-client-go/pulsar/c_reader.go
index 730f9b86fe..12c11034aa 100644
--- a/pulsar-client-go/pulsar/c_reader.go
+++ b/pulsar-client-go/pulsar/c_reader.go
@@ -146,6 +146,19 @@ func (r *reader) Next(ctx context.Context) (Message, 
error) {
        }
 }
 
+func (r *reader) HasNext() (bool, error) {
+       value := C.int(0)
+       res := C.pulsar_reader_has_message_available(r.ptr, &value)
+
+       if res != C.pulsar_result_Ok {
+               return false, newError(res, "Failed to check if next message is 
available")
+       } else if value == C.int(1) {
+               return true, nil
+       } else {
+               return false, nil
+       }
+}
+
 func (r *reader) Close() error {
        channel := make(chan error)
        r.CloseAsync(func(err error) { channel <- err; close(channel) })
diff --git a/pulsar-client-go/pulsar/reader.go 
b/pulsar-client-go/pulsar/reader.go
index f61ebd7410..7015c9ca4d 100644
--- a/pulsar-client-go/pulsar/reader.go
+++ b/pulsar-client-go/pulsar/reader.go
@@ -67,6 +67,9 @@ type Reader interface {
        // Read the next message in the topic, blocking until a message is 
available
        Next(context.Context) (Message, error)
 
+       // Check if there is any message available to read from the current 
position
+       HasNext() (bool, error)
+
        // Close the reader and stop the broker to push more messages
        Close() error
 }
diff --git a/pulsar-client-go/pulsar/reader_test.go 
b/pulsar-client-go/pulsar/reader_test.go
index 11d1b3620c..3b075e1d50 100644
--- a/pulsar-client-go/pulsar/reader_test.go
+++ b/pulsar-client-go/pulsar/reader_test.go
@@ -20,9 +20,9 @@
 package pulsar
 
 import (
-       "testing"
-       "fmt"
        "context"
+       "fmt"
+       "testing"
 )
 
 func TestReaderConnectError(t *testing.T) {
@@ -80,12 +80,20 @@ func TestReader(t *testing.T) {
                        t.Fatal(err)
                }
 
+               hasNext, err := reader.HasNext()
+               assertNil(t, err)
+               assertEqual(t, hasNext, true)
+
                msg, err := reader.Next(ctx)
                assertNil(t, err)
                assertNotNil(t, msg)
 
                assertEqual(t, string(msg.Payload()), fmt.Sprintf("hello-%d", 
i))
        }
+
+       hasNext, err := reader.HasNext()
+       assertNil(t, err)
+       assertEqual(t, hasNext, false)
 }
 
 func TestReaderWithInvalidConf(t *testing.T) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to