This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f50a67  Add schema support to Reader (#741)
4f50a67 is described below

commit 4f50a678d9030828933e03c1a79ae310c38893ad
Author: Ziyao Wei <ziyao.wei....@gmail.com>
AuthorDate: Tue Mar 22 02:26:37 2022 -0400

    Add schema support to Reader (#741)
    
    Add schema support to Reader
---
 pulsar/reader.go      |  3 +++
 pulsar/reader_impl.go |  1 +
 pulsar/reader_test.go | 43 +++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 47 insertions(+)

diff --git a/pulsar/reader.go b/pulsar/reader.go
index c45b8ff..f1cb575 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -79,6 +79,9 @@ type ReaderOptions struct {
 
        // Decryption represents the encryption related fields required by the 
reader to decrypt a message.
        Decryption *MessageDecryptionInfo
+
+       // Schema represents the schema implementation.
+       Schema Schema
 }
 
 // Reader can be used to scan through all the messages currently available in 
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 0fed80c..596884a 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -102,6 +102,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                nackRedeliveryDelay:        defaultNackRedeliveryDelay,
                replicateSubscriptionState: false,
                decryption:                 options.Decryption,
+               schema:                     options.Schema,
        }
 
        reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index bdafea0..aa12078 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -710,3 +710,46 @@ func TestProducerReaderRSAEncryption(t *testing.T) {
                assert.Equal(t, []byte(expectMsg), msg.Payload())
        }
 }
+
+func TestReaderWithSchema(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       schema := NewStringSchema(nil)
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:  topic,
+               Schema: schema,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       value := "hello pulsar"
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Value: value,
+       })
+       assert.Nil(t, err)
+
+       // create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+               Schema:         schema,
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       msg, err := reader.Next(context.Background())
+       assert.NoError(t, err)
+
+       var res *string
+       err = msg.GetSchemaValue(&res)
+       assert.Nil(t, err)
+       assert.Equal(t, *res, value)
+}

Reply via email to