This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ab96ad7  Encryption support ext consumer (#612)
ab96ad7 is described below

commit ab96ad7d84b7c53e3e6d241f68c24eb7d6fa0037
Author: Garule Prabhudas <prabhudas...@gmail.com>
AuthorDate: Sat Oct 9 14:14:53 2021 +0530

    Encryption support ext consumer (#612)
    
    * add ability to encrypt messages
    - use base crypto package for encryption
    
    * fix typo
    
    * lint fixes
    
    * address review suggestions
    
    * revert go mod
    
    * remove encryption context
     - move it to Consumer MR
    
    * try to fix check issues
    
    * remove unused code
    
    * consumer encryption/decryption changes
    
    * remove embedded crypto struct
    
    * remove embedded struct
    
    * add comments
    
    * merge conflict issues fix
    
    * add noop decryptor
    
    * lint fixes
    
    * fix test case
    
    * refactor and reader encryption changes
    
    * refactor
    - move decryptor creation to partition_producer.go
    - update reader_impl
    - update consumer_impl
    
    * address review feedback
    
    * Nack on decryption failure
    
    * add comment on test case
    
    Co-authored-by: PGarule <pgar...@fanatics.com>
---
 pulsar/consumer.go                                 |   3 +
 pulsar/consumer_impl.go                            |   1 +
 pulsar/consumer_partition.go                       | 108 ++-
 pulsar/consumer_partition_test.go                  |   4 +
 pulsar/consumer_test.go                            | 847 ++++++++++++++++++++-
 pulsar/encryption.go                               |  12 +
 pulsar/impl_message.go                             |  23 +
 pulsar/internal/crypto/consumer_decryptor.go       |  60 ++
 .../crypto/decryptor.go}                           |  23 +-
 .../crypto/noop_decryptor.go}                      |  30 +-
 .../pulsartracing/message_carrier_util_test.go     |   4 +
 pulsar/message.go                                  |   4 +
 pulsar/reader.go                                   |   3 +
 pulsar/reader_impl.go                              |   1 +
 pulsar/reader_test.go                              |  56 ++
 15 files changed, 1148 insertions(+), 31 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 1c52b29..c9fbc0d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -155,6 +155,9 @@ type ConsumerOptions struct {
 
        // MaxReconnectToBroker set the maximum retry number of 
reconnectToBroker. (default: ultimate)
        MaxReconnectToBroker *uint
+
+       // Decryption decryption related fields to decrypt the encrypted message
+       Decryption *MessageDecryptionInfo
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 6677062..232079b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -335,6 +335,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() 
error {
                                maxReconnectToBroker:       
c.options.MaxReconnectToBroker,
                                keySharedPolicy:            
c.options.KeySharedPolicy,
                                schema:                     c.options.Schema,
+                               decryption:                 
c.options.Decryption,
                        }
                        cons, err := newPartitionConsumer(c, c.client, opts, 
c.messageCh, c.dlq, c.metrics)
                        ch <- ConsumerError{
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 5f74bcf..e691d14 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -26,8 +26,10 @@ import (
 
        "github.com/gogo/protobuf/proto"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       cryptointernal 
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
 
@@ -98,6 +100,7 @@ type partitionConsumerOpts struct {
        maxReconnectToBroker       *uint
        keySharedPolicy            *KeySharedPolicy
        schema                     Schema
+       decryption                 *MessageDecryptionInfo
 }
 
 type partitionConsumer struct {
@@ -142,6 +145,7 @@ type partitionConsumer struct {
        providersMutex       sync.RWMutex
        compressionProviders map[pb.CompressionType]compression.Provider
        metrics              *internal.TopicMetrics
+       decryptor            cryptointernal.Decryptor
 }
 
 func newPartitionConsumer(parent Consumer, client *client, options 
*partitionConsumerOpts,
@@ -176,6 +180,27 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
                "subscription": options.subscription,
                "consumerID":   pc.consumerID,
        })
+
+       var decryptor cryptointernal.Decryptor
+       if pc.options.decryption == nil {
+               decryptor = cryptointernal.NewNoopDecryptor() // default to 
noopDecryptor
+       } else {
+               if options.decryption.MessageCrypto == nil {
+                       messageCrypto, err := 
crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
+                       if err != nil {
+                               return nil, err
+                       }
+                       options.decryption.MessageCrypto = messageCrypto
+               }
+               decryptor = cryptointernal.NewConsumerDecryptor(
+                       options.decryption.KeyReader,
+                       options.decryption.MessageCrypto,
+                       pc.log,
+               )
+       }
+
+       pc.decryptor = decryptor
+
        pc.nackTracker = newNegativeAcksTracker(pc, 
options.nackRedeliveryDelay, pc.log)
 
        err := pc.grabConn()
@@ -480,7 +505,54 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
                return err
        }
 
-       uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, 
headersAndPayload)
+       decryptedPayload, err := 
pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta)
+       // error decrypting the payload
+       if err != nil {
+               // default crypto failure action
+               crypToFailureAction := crypto.ConsumerCryptoFailureActionFail
+               if pc.options.decryption != nil {
+                       crypToFailureAction = 
pc.options.decryption.ConsumerCryptoFailureAction
+               }
+
+               switch crypToFailureAction {
+               case crypto.ConsumerCryptoFailureActionFail:
+                       pc.log.Errorf("consuming message failed due to 
decryption err :%v", err)
+                       
pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), 
int64(pbMsgID.GetEntryId()), 0, 0, nil))
+                       return err
+               case crypto.ConsumerCryptoFailureActionDiscard:
+                       pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_DecryptionError)
+                       return fmt.Errorf("discarding message on decryption 
error :%v", err)
+               case crypto.ConsumerCryptoFailureActionConsume:
+                       pc.log.Warnf("consuming encrypted message due to error 
in decryption :%v", err)
+                       messages := []*message{
+                               {
+                                       publishTime:  
timeFromUnixTimestampMillis(msgMeta.GetPublishTime()),
+                                       eventTime:    
timeFromUnixTimestampMillis(msgMeta.GetEventTime()),
+                                       key:          msgMeta.GetPartitionKey(),
+                                       producerName: msgMeta.GetProducerName(),
+                                       properties:   
internal.ConvertToStringMap(msgMeta.GetProperties()),
+                                       topic:        pc.topic,
+                                       msgID: newMessageID(
+                                               int64(pbMsgID.GetLedgerId()),
+                                               int64(pbMsgID.GetEntryId()),
+                                               pbMsgID.GetBatchIndex(),
+                                               pc.partitionIdx,
+                                       ),
+                                       payLoad:             
headersAndPayload.ReadableSlice(),
+                                       schema:              pc.options.schema,
+                                       replicationClusters: 
msgMeta.GetReplicateTo(),
+                                       replicatedFrom:      
msgMeta.GetReplicatedFrom(),
+                                       redeliveryCount:     
response.GetRedeliveryCount(),
+                                       encryptionContext:   
createEncryptionContext(msgMeta),
+                               },
+                       }
+                       pc.queueCh <- messages
+                       return nil
+               }
+       }
+
+       // decryption is success, decompress the payload
+       uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, 
internal.NewBufferWrapper(decryptedPayload))
        if err != nil {
                pc.discardCorruptedMessage(pbMsgID, 
pb.CommandAck_DecompressionError)
                return err
@@ -493,6 +565,7 @@ func (pc *partitionConsumer) MessageReceived(response 
*pb.CommandMessage, header
        if msgMeta.NumMessagesInBatch != nil {
                numMsgs = int(msgMeta.GetNumMessagesInBatch())
        }
+
        messages := make([]*message, 0)
        var ackTracker *ackTracker
        // are there multiple messages in this batch?
@@ -590,6 +663,39 @@ func (pc *partitionConsumer) 
messageShouldBeDiscarded(msgID trackingMessageID) b
        return pc.startMessageID.greaterEqual(msgID.messageID)
 }
 
+// create EncryptionContext from message metadata
+// this will be used to decrypt the message payload outside of this client
+// it is the responsibility of end user to decrypt the payload
+// It will be used only when  crypto failure action is set to consume i.e 
crypto.ConsumerCryptoFailureActionConsume
+func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext {
+       encCtx := EncryptionContext{
+               Algorithm:        msgMeta.GetEncryptionAlgo(),
+               Param:            msgMeta.GetEncryptionParam(),
+               UncompressedSize: int(msgMeta.GetUncompressedSize()),
+               BatchSize:        int(msgMeta.GetNumMessagesInBatch()),
+       }
+
+       if msgMeta.Compression != nil {
+               encCtx.CompressionType = CompressionType(*msgMeta.Compression)
+       }
+
+       kMap := map[string]EncryptionKey{}
+       for _, k := range msgMeta.GetEncryptionKeys() {
+               metaMap := map[string]string{}
+               for _, m := range k.GetMetadata() {
+                       metaMap[*m.Key] = *m.Value
+               }
+
+               kMap[*k.Key] = EncryptionKey{
+                       KeyValue: k.GetValue(),
+                       Metadata: metaMap,
+               }
+       }
+
+       encCtx.Keys = kMap
+       return &encCtx
+}
+
 func (pc *partitionConsumer) ConnectionClosed() {
        // Trigger reconnection in the consumer goroutine
        pc.log.Debug("connection closed and send to connectClosedCh")
diff --git a/pulsar/consumer_partition_test.go 
b/pulsar/consumer_partition_test.go
index b1322e3..560afb6 100644
--- a/pulsar/consumer_partition_test.go
+++ b/pulsar/consumer_partition_test.go
@@ -21,6 +21,7 @@ import (
        "testing"
 
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+       "github.com/apache/pulsar-client-go/pulsar/internal/crypto"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
 
        "github.com/stretchr/testify/assert"
@@ -36,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) {
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
                metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               decryptor:            crypto.NewNoopDecryptor(),
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage)
@@ -67,6 +69,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) {
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
                metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               decryptor:            crypto.NewNoopDecryptor(),
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1)
@@ -98,6 +101,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) {
                compressionProviders: 
make(map[pb.CompressionType]compression.Provider),
                options:              &partitionConsumerOpts{},
                metrics:              
internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"),
+               decryptor:            crypto.NewNoopDecryptor(),
        }
 
        headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10)
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 66587f9..55823e4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -20,6 +20,7 @@ package pulsar
 import (
        "context"
        "fmt"
+       "io/ioutil"
        "log"
        "net/http"
        "strconv"
@@ -27,8 +28,13 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       plog "github.com/apache/pulsar-client-go/pulsar/log"
+       "github.com/gogo/protobuf/proto"
        "github.com/google/uuid"
+       "github.com/pierrec/lz4"
        "github.com/stretchr/testify/assert"
 )
 
@@ -92,7 +98,6 @@ func TestProducerConsumer(t *testing.T) {
                assert.Equal(t, []byte(expectMsg), msg.Payload())
                assert.Equal(t, "pulsar", msg.Key())
                assert.Equal(t, expectProperties, msg.Properties())
-
                // ack message
                consumer.Ack(msg)
        }
@@ -2104,3 +2109,843 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) 
{
        )
        assert.Equal(t, 100, receivedConsumer1+receivedConsumer2)
 }
+
+func TestProducerConsumerRSAEncryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+       cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic: topic,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionFail,
+               },
+               SubscriptionName: "crypto-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       normalConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "normal-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       cryptoProducer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema: NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       msgFormat := "my-message-%v"
+
+       totalMessages := 10
+
+       ctx := context.Background()
+
+       for i := 0; i < totalMessages; i++ {
+               _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+                       Value: fmt.Sprintf(msgFormat, i),
+               })
+
+               assert.Nil(t, err)
+       }
+
+       // try to consume with normal consumer
+       normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
+
+       msg, err := normalConsumer.Receive(normalConsumerCtx)
+       // msg should be null as the consumer will not be able to decrypt
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+
+       // try to consume the message by crypto consumer
+       // consumer should be able to read all the messages
+       var actualMessage *string
+       for i := 0; i < totalMessages; i++ {
+               msg, err := cryptoConsumer.Receive(ctx)
+               fmt.Println(msg)
+               assert.Nil(t, err)
+               expectedMsg := fmt.Sprintf(msgFormat, i)
+               err = msg.GetSchemaValue(&actualMessage)
+               assert.Nil(t, err)
+               assert.Equal(t, expectedMsg, *actualMessage)
+               cryptoConsumer.Ack(msg)
+       }
+}
+
+func TestProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+       cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic: topic,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+               SubscriptionName: "crypto-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       normalConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "normal-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       cryptoProducer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema:          NewStringSchema(nil),
+               CompressionType: LZ4,
+       })
+
+       assert.Nil(t, err)
+
+       msgFormat := "my-message-%v"
+
+       totalMessages := 10
+
+       ctx := context.Background()
+
+       for i := 0; i < totalMessages; i++ {
+               _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+                       Value: fmt.Sprintf(msgFormat, i),
+               })
+
+               assert.Nil(t, err)
+       }
+
+       // try to consume with normal consumer
+       normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
+
+       msg, err := normalConsumer.Receive(normalConsumerCtx)
+       // msg should be null as the consumer will not be able to decrypt
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+
+       // try to consume the message by crypto consumer
+       // consumer should be able to read all the messages
+       var actualMessage *string
+       for i := 0; i < totalMessages; i++ {
+               msg, err := cryptoConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               expectedMsg := fmt.Sprintf(msgFormat, i)
+               err = msg.GetSchemaValue(&actualMessage)
+               assert.Nil(t, err)
+               assert.Equal(t, expectedMsg, *actualMessage)
+               cryptoConsumer.Ack(msg)
+       }
+}
+
+func TestBatchProducerConsumerRSAEncryptionWithCompression(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+       cryptoConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic: topic,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+               SubscriptionName: "crypto-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+
+       normalConsumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "normal-subscription",
+               Schema:           NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+       batchSize := 2
+       cryptoProducer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema:              NewStringSchema(nil),
+               CompressionType:     LZ4,
+               DisableBatching:     false,
+               BatchingMaxMessages: uint(batchSize),
+       })
+
+       assert.Nil(t, err)
+
+       msgFormat := "my-message-%v"
+
+       totalMessages := 10
+
+       ctx := context.Background()
+
+       for i := 0; i < totalMessages; i++ {
+               _, err := cryptoProducer.Send(ctx, &ProducerMessage{
+                       Value: fmt.Sprintf(msgFormat, i),
+               })
+
+               assert.Nil(t, err)
+       }
+
+       // try to consume with normal consumer
+       normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 
500*time.Millisecond)
+       defer cancel()
+
+       msg, err := normalConsumer.Receive(normalConsumerCtx)
+       // msg should be null as the consumer will not be able to decrypt
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+
+       // try to consume the message by crypto consumer
+       // consumer should be able to read all the messages
+       var actualMessage *string
+       for i := 0; i < totalMessages; i++ {
+               msg, err := cryptoConsumer.Receive(ctx)
+               assert.Nil(t, err)
+               expectedMsg := fmt.Sprintf(msgFormat, i)
+               err = msg.GetSchemaValue(&actualMessage)
+               assert.Nil(t, err)
+               assert.Equal(t, expectedMsg, *actualMessage)
+               cryptoConsumer.Ack(msg)
+       }
+}
+
+func TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) {
+       // create new client instance for each producer and consumer
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       clientCryptoConsumer, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer clientCryptoConsumer.Close()
+
+       clientCryptoConsumerInvalidKeyReader, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer clientCryptoConsumerInvalidKeyReader.Close()
+
+       clientcryptoConsumerNoKeyReader, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer clientcryptoConsumerNoKeyReader.Close()
+
+       topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+       cryptoProducer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               CompressionType: LZ4,
+               Schema:          NewStringSchema(nil),
+       })
+       assert.Nil(t, err)
+
+       sharedSubscription := "crypto-shared-subscription"
+
+       cryptoConsumer, err := clientCryptoConsumer.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sharedSubscription,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+               Schema:              NewStringSchema(nil),
+               Type:                Shared,
+               NackRedeliveryDelay: 1 * time.Second,
+       })
+       assert.Nil(t, err)
+
+       cryptoConsumerInvalidKeyReader, err := 
clientCryptoConsumerInvalidKeyReader.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: sharedSubscription,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa_invalid.pem"),
+               },
+               Schema:              NewStringSchema(nil),
+               Type:                Shared,
+               NackRedeliveryDelay: 1 * time.Second,
+       })
+       assert.Nil(t, err)
+
+       cryptoConsumerNoKeyReader, err := 
clientcryptoConsumerNoKeyReader.Subscribe(ConsumerOptions{
+               Topic:               topic,
+               SubscriptionName:    sharedSubscription,
+               Schema:              NewStringSchema(nil),
+               Type:                Shared,
+               NackRedeliveryDelay: 1 * time.Second,
+       })
+       assert.Nil(t, err)
+
+       totalMessages := 5
+       message := "my-message-%v"
+       // since messages can be in random order
+       // map can be used to check if all the messages are received
+       messageMap := map[string]struct{}{}
+
+       // producer messages
+       for i := 0; i < totalMessages; i++ {
+               mid, err := cryptoProducer.Send(context.Background(), 
&ProducerMessage{
+                       Value: fmt.Sprintf(message, i),
+               })
+               assert.Nil(t, err)
+               fmt.Printf("Sent : %v\n", mid)
+       }
+
+       // Consuming from consumer 2 and 3
+       // no message should be returned since they can't decrypt the message
+       ctxWithTimeOut1, c1 := context.WithTimeout(context.Background(), 
2*time.Second)
+       defer c1()
+
+       ctxWithTimeOut2, c2 := context.WithTimeout(context.Background(), 
2*time.Second)
+       defer c2()
+
+       // try to consume messages
+       msg, err := cryptoConsumerInvalidKeyReader.Receive(ctxWithTimeOut1)
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+
+       msg, err = cryptoConsumerNoKeyReader.Receive(ctxWithTimeOut2)
+       assert.NotNil(t, err)
+       assert.Nil(t, msg)
+
+       cryptoConsumerInvalidKeyReader.Close()
+       cryptoConsumerNoKeyReader.Close()
+
+       // try to consume by consumer1
+       // all the messages would by received by it
+       var receivedMsg *string
+       for i := 0; i < totalMessages; i++ {
+               m, err := cryptoConsumer.Receive(context.Background())
+               assert.Nil(t, err)
+               err = m.GetSchemaValue(&receivedMsg)
+               assert.Nil(t, err)
+               messageMap[*receivedMsg] = struct{}{}
+               cryptoConsumer.Ack(m)
+               fmt.Printf("Received : %v\n", m.ID())
+       }
+
+       // check if all messages were received
+       for i := 0; i < totalMessages; i++ {
+               key := fmt.Sprintf(message, i)
+               _, ok := messageMap[key]
+               assert.True(t, ok)
+       }
+}
+
+func TestRSAEncryptionFailure(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       client.Close()
+
+       topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond())
+
+       // 1. invalid key name
+       // create producer with invalid key
+       // producer creation succeeds but message sending should fail with an 
error
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa_invalid.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema:          NewStringSchema(nil),
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, producer)
+
+       // sending of message should fail with an error, since invalid rsa keys 
are configured
+       mid, err := producer.Send(context.Background(), &ProducerMessage{
+               Value: "some-message",
+       })
+
+       assert.Nil(t, mid)
+       assert.NotNil(t, err)
+       producer.Close()
+
+       // 2. Producer with valid key name
+       producer, err = client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
+               Schema:          NewStringSchema(nil),
+               DisableBatching: true,
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, producer)
+
+       subscriptionName := "enc-failure-subcription"
+       totalMessages := 10
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subscriptionName,
+       })
+       assert.Nil(t, err)
+
+       messageFormat := "my-message-%v"
+       for i := 0; i < totalMessages; i++ {
+               _, err := producer.Send(context.Background(), &ProducerMessage{
+                       Value: fmt.Sprintf(messageFormat, i),
+               })
+               assert.Nil(t, err)
+       }
+
+       // 3. KeyReader is not set by the consumer
+       // Receive should fail since KeyReader is not setup
+       // because default behaviour of consumer is fail receiving message if 
error in decryption
+       ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+       defer cancel()
+
+       msg, err := consumer.Receive(ctx)
+       assert.NotNil(t, err)
+       assert.Nil(t, msg, "Receive should have failed with no keyreader")
+
+       // 4. Set consumer config to consume even if decryption fails
+       consumer.Close()
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subscriptionName,
+               Decryption: &MessageDecryptionInfo{
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionConsume,
+               },
+               Schema: NewStringSchema(nil),
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, consumer)
+
+       ctx2, cancel2 := context.WithTimeout(context.Background(), 
2*time.Second)
+       defer cancel2()
+
+       for i := 0; i < totalMessages-1; i++ {
+               expectedMessage := fmt.Sprintf(messageFormat, i)
+               msg, err = consumer.Receive(ctx2)
+               assert.Nil(t, err)
+               assert.NotNil(t, msg)
+
+               receivedMsg := string(msg.Payload())
+               assert.NotEqual(t, expectedMessage, receivedMsg, 
fmt.Sprintf(`Received encrypted message [%v]
+       should not match the expected message  [%v]`, expectedMessage, 
receivedMsg))
+               // verify the message contains Encryption context
+               assert.NotEmpty(t, msg.GetEncryptionContext(),
+                       "Encrypted message which is failed to decrypt must 
contain EncryptionContext")
+               consumer.Ack(msg)
+       }
+
+       // 5. discard action on decryption failure
+       consumer.Close()
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subscriptionName,
+               Decryption: &MessageDecryptionInfo{
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionDiscard,
+               },
+               Schema: NewStringSchema(nil),
+       })
+       assert.Nil(t, err)
+       assert.NotNil(t, consumer)
+
+       ctx3, cancel3 := context.WithTimeout(context.Background(), 
3*time.Second)
+       defer cancel3()
+
+       msg, err = consumer.Receive(ctx3)
+       assert.NotNil(t, err)
+       assert.Nil(t, msg, "Message received even aftet 
ConsumerCryptoFailureAction.Discard is set.")
+}
+
+func TestConsumerCompressionWithRSAEncryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := newTopicName()
+       ctx := context.Background()
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:           topicName,
+               CompressionType: LZ4,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"enc-compress-app.key"},
+               },
+       })
+
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: "sub-1",
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+       })
+
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       const N = 100
+
+       for i := 0; i < N; i++ {
+               if _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+               }); err != nil {
+                       t.Fatal(err)
+               }
+       }
+
+       for i := 0; i < N; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               assert.Equal(t, fmt.Sprintf("msg-content-%d", i), 
string(msg.Payload()))
+               consumer.Ack(msg)
+       }
+}
+
+func TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topicName := "persistent://public/default/receive-batch-comp-enc"
+       subName := "subscription-name"
+       prefix := "msg-batch-"
+       ctx := context.Background()
+
+       // Enable batching on producer side
+       batchSize, numOfMessages := 2, 100
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic:               topicName,
+               BatchingMaxMessages: uint(batchSize),
+               DisableBatching:     false,
+               CompressionType:     LZ4,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"batch-encryption-app.key"},
+               },
+       })
+       assert.Nil(t, err)
+       assert.Equal(t, topicName, producer.Topic())
+       defer producer.Close()
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topicName,
+               SubscriptionName: subName,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+       })
+
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       count := 0
+       for i := 0; i < numOfMessages; i++ {
+               messageContent := prefix + fmt.Sprintf("%d", i)
+               msg := &ProducerMessage{
+                       Payload: []byte(messageContent),
+               }
+               _, err := producer.Send(ctx, msg)
+               assert.Nil(t, err)
+       }
+
+       for i := 0; i < numOfMessages; i++ {
+               msg, err := consumer.Receive(ctx)
+               fmt.Printf("received : %v\n", string(msg.Payload()))
+               assert.Nil(t, err)
+               consumer.Ack(msg)
+               count++
+       }
+
+       assert.Equal(t, count, numOfMessages)
+}
+
+type EncKeyReader struct {
+       publicKeyPath  string
+       privateKeyPath string
+       metaMap        map[string]string
+}
+
+func NewEncKeyReader(publicKeyPath, privateKeyPath string) *EncKeyReader {
+       metaMap := map[string]string{
+               "version": "1.0",
+       }
+
+       return &EncKeyReader{
+               publicKeyPath:  publicKeyPath,
+               privateKeyPath: privateKeyPath,
+               metaMap:        metaMap,
+       }
+}
+
+// GetPublicKey read public key from the given path
+func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
+       return readKey(keyName, d.publicKeyPath, d.metaMap)
+}
+
+// GetPrivateKey read private key from the given path
+func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
+       return readKey(keyName, d.privateKeyPath, d.metaMap)
+}
+
+func readKey(keyName, path string, keyMeta map[string]string) 
(*crypto.EncryptionKeyInfo, error) {
+       key, err := ioutil.ReadFile(path)
+       if err != nil {
+               return nil, err
+       }
+       return crypto.NewEncryptionKeyInfo(keyName, key, keyMeta), nil
+}
+
+func TestConsumerEncryptionWithoutKeyReader(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       encryptionKeyName := "client-rsa.pem"
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{encryptionKeyName},
+               },
+               CompressionType: LZ4,
+               Schema:          NewStringSchema(nil),
+       })
+
+       assert.Nil(t, err)
+       assert.NotNil(t, producer)
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: "my-subscription-name",
+               Decryption: &MessageDecryptionInfo{
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionConsume,
+               },
+               Schema: NewStringSchema(nil),
+       })
+       assert.Nil(t, err)
+
+       message := "my-message"
+
+       _, err = producer.Send(context.Background(), &ProducerMessage{
+               Value: message,
+       })
+       assert.Nil(t, err)
+
+       // consume encrypted message
+       msg, err := consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+
+       // try to decrypt message
+       encCtx := msg.GetEncryptionContext()
+       assert.NotEmpty(t, encCtx)
+
+       keys := encCtx.Keys
+       assert.Equal(t, 1, len(keys))
+
+       encryptionKey, ok := keys[encryptionKeyName]
+       assert.True(t, ok)
+
+       encDataKey := encryptionKey.KeyValue
+       assert.NotNil(t, encDataKey)
+
+       metadata := encryptionKey.Metadata
+       assert.NotNil(t, metadata)
+
+       version := metadata["version"]
+       assert.Equal(t, "1.0", version)
+
+       compressionType := encCtx.CompressionType
+       uncompressedSize := uint32(encCtx.UncompressedSize)
+       encParam := encCtx.Param
+       encAlgo := encCtx.Algorithm
+       batchSize := encCtx.BatchSize
+
+       // try to decrypt using default MessageCrypto
+       msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", false, 
plog.DefaultNopLogger())
+       assert.Nil(t, err)
+
+       producerName := "test"
+       sequenceID := uint64(123)
+       publishTime := uint64(12333453454)
+
+       messageMetaData := pb.MessageMetadata{
+               EncryptionParam:  encParam,
+               ProducerName:     &producerName,
+               SequenceId:       &sequenceID,
+               PublishTime:      &publishTime,
+               UncompressedSize: &uncompressedSize,
+               EncryptionAlgo:   &encAlgo,
+       }
+
+       if compressionType == LZ4 {
+               messageMetaData.Compression = pb.CompressionType_LZ4.Enum()
+       }
+
+       messageMetaData.EncryptionKeys = []*pb.EncryptionKeys{{
+               Key:   &encryptionKeyName,
+               Value: encDataKey,
+       }}
+
+       decryptedPayload, err := 
msgCrypto.Decrypt(crypto.NewMessageMetadataSupplier(&messageMetaData),
+               msg.Payload(),
+               NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+                       "crypto/testdata/pri_key_rsa.pem"))
+       assert.Nil(t, err)
+       assert.NotNil(t, decryptedPayload)
+
+       // try to uncompress payload
+       uncompressedPayload := make([]byte, uncompressedSize)
+       s, err := lz4.UncompressBlock(decryptedPayload, uncompressedPayload)
+       assert.Nil(t, err)
+       assert.Equal(t, uncompressedSize, uint32(s))
+
+       buffer := internal.NewBufferWrapper(uncompressedPayload)
+
+       if batchSize > 0 {
+               size := buffer.ReadUint32()
+               var meta pb.SingleMessageMetadata
+               if err := proto.Unmarshal(buffer.Read(size), &meta); err != nil 
{
+                       fmt.Println(err)
+               }
+               d := buffer.Read(uint32(meta.GetPayloadSize()))
+               assert.Equal(t, message, string(d))
+       }
+}
+
+// TestEncryptDecryptRedeliveryOnFailure test redelivery failed messages
+func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: serviceURL,
+       })
+       assert.Nil(t, err)
+
+       topic := newTopicName()
+       subcription := "test-subscription-redelivery"
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subcription,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_invalid_rsa.pem"),
+               },
+       })
+       assert.Nil(t, err)
+
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"new-enc-key"},
+               },
+       })
+       assert.Nil(t, err)
+
+       producer.Send(context.Background(), &ProducerMessage{
+               Payload: []byte("new-test-message"),
+       })
+
+       ctx, cancel := context.WithTimeout(context.Background(), 
1000*time.Millisecond)
+       defer cancel()
+
+       // message receive should fail due to decryption error
+       msg, err := consumer.Receive(ctx)
+       assert.Nil(t, msg)
+       assert.NotNil(t, err)
+
+       consumer.Close()
+
+       // create consumer with same subscription and proper rsa key pairs
+       consumer, err = client.Subscribe(ConsumerOptions{
+               Topic:            topic,
+               SubscriptionName: subcription,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
NewEncKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+               },
+       })
+       assert.Nil(t, err)
+
+       // previous message should be redelivered
+       msg, err = consumer.Receive(context.Background())
+       assert.Nil(t, err)
+       assert.NotNil(t, msg)
+       consumer.Ack(msg)
+}
diff --git a/pulsar/encryption.go b/pulsar/encryption.go
index aade2ca..3ab2527 100644
--- a/pulsar/encryption.go
+++ b/pulsar/encryption.go
@@ -34,3 +34,15 @@ type ProducerEncryptionInfo struct {
        // default is ProducerCryptoFailureActionFail
        ProducerCryptoFailureAction int
 }
+
+// MessageDecryptionInfo encryption related fields required by the consumer to 
decrypt the message
+type MessageDecryptionInfo struct {
+       // KeyReader read RSA public/private key pairs
+       KeyReader crypto.KeyReader
+
+       // MessageCrypto used to encrypt and decrypt the data and session keys
+       MessageCrypto crypto.MessageCrypto
+
+       // ConsumerCryptoFailureAction action to be taken on failure of message 
decryption
+       ConsumerCryptoFailureAction int
+}
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index c4f215c..19fa6d8 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -201,6 +201,24 @@ func timeFromUnixTimestampMillis(timestamp uint64) 
time.Time {
        return time.Unix(seconds, nanos)
 }
 
+// EncryptionContext
+// It will be used to decrypt message outside of this client
+type EncryptionContext struct {
+       Keys             map[string]EncryptionKey
+       Param            []byte
+       Algorithm        string
+       CompressionType  CompressionType
+       UncompressedSize int
+       BatchSize        int
+}
+
+// EncryptionKey
+// Encryption key used to encrypt the message payload
+type EncryptionKey struct {
+       KeyValue []byte
+       Metadata map[string]string
+}
+
 type message struct {
        publishTime         time.Time
        eventTime           time.Time
@@ -215,6 +233,7 @@ type message struct {
        replicatedFrom      string
        redeliveryCount     uint32
        schema              Schema
+       encryptionContext   *EncryptionContext
 }
 
 func (msg *message) Topic() string {
@@ -269,6 +288,10 @@ func (msg *message) ProducerName() string {
        return msg.producerName
 }
 
+func (msg *message) GetEncryptionContext() *EncryptionContext {
+       return msg.encryptionContext
+}
+
 func newAckTracker(size int) *ackTracker {
        var batchIDs *big.Int
        if size <= 64 {
diff --git a/pulsar/internal/crypto/consumer_decryptor.go 
b/pulsar/internal/crypto/consumer_decryptor.go
new file mode 100644
index 0000000..bbc1f9b
--- /dev/null
+++ b/pulsar/internal/crypto/consumer_decryptor.go
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package crypto
+
+import (
+       "fmt"
+
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+       "github.com/apache/pulsar-client-go/pulsar/log"
+)
+
+type consumerDecryptor struct {
+       keyReader     crypto.KeyReader
+       messageCrypto crypto.MessageCrypto
+       logger        log.Logger
+}
+
+func NewConsumerDecryptor(keyReader crypto.KeyReader,
+       messageCrypto crypto.MessageCrypto,
+       logger log.Logger) Decryptor {
+       return &consumerDecryptor{
+               keyReader:     keyReader,
+               messageCrypto: messageCrypto,
+               logger:        logger,
+       }
+}
+
+func (d *consumerDecryptor) Decrypt(payload []byte,
+       msgID *pb.MessageIdData,
+       msgMetadata *pb.MessageMetadata) ([]byte, error) {
+       // encryption keys are not present in message metadta, no need decrypt 
the payload
+       if len(msgMetadata.GetEncryptionKeys()) == 0 {
+               return payload, nil
+       }
+
+       // KeyReader interface is not implemented
+       if d.keyReader == nil {
+               return payload, fmt.Errorf("KeyReader interface is not 
implemented")
+       }
+
+       return 
d.messageCrypto.Decrypt(crypto.NewMessageMetadataSupplier(msgMetadata),
+               payload,
+               d.keyReader)
+}
diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/decryptor.go
similarity index 56%
copy from pulsar/encryption.go
copy to pulsar/internal/crypto/decryptor.go
index aade2ca..da67d5c 100644
--- a/pulsar/encryption.go
+++ b/pulsar/internal/crypto/decryptor.go
@@ -15,22 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package pulsar
+package crypto
 
-import "github.com/apache/pulsar-client-go/pulsar/crypto"
+import (
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
 
-// ProducerEncryptionInfo encryption related fields required by the producer
-type ProducerEncryptionInfo struct {
-       // KeyReader read RSA public/private key pairs
-       KeyReader crypto.KeyReader
-
-       // MessageCrypto used to encrypt and decrypt the data and session keys
-       MessageCrypto crypto.MessageCrypto
-
-       // Keys list of encryption key names to encrypt session key
-       Keys []string
-
-       // ProducerCryptoFailureAction action to be taken on failure of message 
encryption
-       // default is ProducerCryptoFailureActionFail
-       ProducerCryptoFailureAction int
+// Decryptor support decrypting of message
+type Decryptor interface {
+       Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata 
*pb.MessageMetadata) ([]byte, error)
 }
diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/noop_decryptor.go
similarity index 56%
copy from pulsar/encryption.go
copy to pulsar/internal/crypto/noop_decryptor.go
index aade2ca..c049c47 100644
--- a/pulsar/encryption.go
+++ b/pulsar/internal/crypto/noop_decryptor.go
@@ -15,22 +15,26 @@
 // specific language governing permissions and limitations
 // under the License.
 
-package pulsar
+package crypto
 
-import "github.com/apache/pulsar-client-go/pulsar/crypto"
+import (
+       "fmt"
 
-// ProducerEncryptionInfo encryption related fields required by the producer
-type ProducerEncryptionInfo struct {
-       // KeyReader read RSA public/private key pairs
-       KeyReader crypto.KeyReader
+       pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
+)
 
-       // MessageCrypto used to encrypt and decrypt the data and session keys
-       MessageCrypto crypto.MessageCrypto
+type noopDecryptor struct{}
 
-       // Keys list of encryption key names to encrypt session key
-       Keys []string
+func NewNoopDecryptor() Decryptor {
+       return &noopDecryptor{}
+}
 
-       // ProducerCryptoFailureAction action to be taken on failure of message 
encryption
-       // default is ProducerCryptoFailureActionFail
-       ProducerCryptoFailureAction int
+// Decrypt noop decryptor
+func (d *noopDecryptor) Decrypt(payload []byte,
+       msgID *pb.MessageIdData,
+       msgMetadata *pb.MessageMetadata) ([]byte, error) {
+       if len(msgMetadata.GetEncryptionKeys()) > 0 {
+               return payload, fmt.Errorf("incoming message payload is 
encrypted, consumer is not configured to decrypt")
+       }
+       return payload, nil
 }
diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go 
b/pulsar/internal/pulsartracing/message_carrier_util_test.go
index 9fe608d..677a7ff 100644
--- a/pulsar/internal/pulsartracing/message_carrier_util_test.go
+++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go
@@ -119,3 +119,7 @@ func (msg *mockConsumerMessage) GetSchemaValue(v 
interface{}) error {
 func (msg *mockConsumerMessage) ProducerName() string {
        return ""
 }
+
+func (msg *mockConsumerMessage) GetEncryptionContext() 
*pulsar.EncryptionContext {
+       return &pulsar.EncryptionContext{}
+}
diff --git a/pulsar/message.go b/pulsar/message.go
index 23dfefb..1003bb1 100644
--- a/pulsar/message.go
+++ b/pulsar/message.go
@@ -116,6 +116,10 @@ type Message interface {
 
        //Get the de-serialized value of the message, according the configured
        GetSchemaValue(v interface{}) error
+
+       // GetEncryptionContext get the ecryption context of message
+       // It will be used by the application to parse undecrypted message
+       GetEncryptionContext() *EncryptionContext
 }
 
 // MessageID identifier for a particular message
diff --git a/pulsar/reader.go b/pulsar/reader.go
index 40234aa..7af3470 100644
--- a/pulsar/reader.go
+++ b/pulsar/reader.go
@@ -76,6 +76,9 @@ type ReaderOptions struct {
        // ReadCompacted can only be enabled when reading from a persistent 
topic. Attempting to enable it on non-persistent
        // topics will lead to the reader create call throwing a 
PulsarClientException.
        ReadCompacted bool
+
+       // Decryption decryption related fields to decrypt the encrypted message
+       Decryption *MessageDecryptionInfo
 }
 
 // Reader can be used to scan through all the messages currently available in 
a topic.
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index 9983286..0e63f8b 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -89,6 +89,7 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                metadata:                   options.Properties,
                nackRedeliveryDelay:        defaultNackRedeliveryDelay,
                replicateSubscriptionState: false,
+               decryption:                 options.Decryption,
        }
 
        reader := &reader{
diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go
index 618f5ab..bdafea0 100644
--- a/pulsar/reader_test.go
+++ b/pulsar/reader_test.go
@@ -23,6 +23,7 @@ import (
        "testing"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/stretchr/testify/assert"
 )
 
@@ -654,3 +655,58 @@ func TestReaderWithMultiHosts(t *testing.T) {
 
        assert.Equal(t, 10, i)
 }
+
+func TestProducerReaderRSAEncryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+
+       assert.Nil(t, err)
+       defer client.Close()
+
+       topic := newTopicName()
+       ctx := context.Background()
+
+       // create reader
+       reader, err := client.CreateReader(ReaderOptions{
+               Topic:          topic,
+               StartMessageID: EarliestMessageID(),
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionFail,
+               },
+       })
+       assert.Nil(t, err)
+       defer reader.Close()
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       ProducerCryptoFailureAction: 
crypto.ProducerCryptoFailureActionFail,
+                       Keys:                        []string{"client-rsa.pem"},
+               },
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       // send 10 messages
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.NoError(t, err)
+       }
+
+       // receive 10 messages
+       for i := 0; i < 10; i++ {
+               msg, err := reader.Next(context.Background())
+               assert.NoError(t, err)
+
+               expectMsg := fmt.Sprintf("hello-%d", i)
+               assert.Equal(t, []byte(expectMsg), msg.Payload())
+       }
+}

Reply via email to