This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 468bfd6  Encryption failure test case fix (#708)
468bfd6 is described below

commit 468bfd6bdcd45857e64fd499b98c7f30ec6f60f7
Author: Garule Prabhudas <prabhudas...@gmail.com>
AuthorDate: Mon Jan 17 12:02:51 2022 +0530

    Encryption failure test case fix (#708)
    
    * test case to detect race condition in creation of producer/consumer
    
    * fix for race condition in consumer/producer creation
    
    * refactor
    
    * restore test case
    
    Co-authored-by: PGarule <pgar...@fanatics.com>
---
 pulsar/consumer_impl.go      | 30 +++++++++++++++++++
 pulsar/consumer_partition.go |  7 -----
 pulsar/consumer_test.go      | 70 +++++++++++++++++++++++++++++++++++++++++++-
 pulsar/producer_impl.go      | 21 +++++++++++++
 pulsar/producer_partition.go | 21 -------------
 pulsar/reader_impl.go        | 12 ++++++++
 6 files changed, 132 insertions(+), 29 deletions(-)

diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 00e0b76..2bd3ed5 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -25,6 +25,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto"
        "github.com/apache/pulsar-client-go/pulsar/log"
@@ -156,6 +157,10 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                        return nil, err
                }
                topic = tns[0].Name
+               err = addMessageCryptoIfMissing(client, &options, topic)
+               if err != nil {
+                       return nil, err
+               }
                return newInternalConsumer(client, options, topic, messageCh, 
dlq, rlq, false)
        }
 
@@ -168,6 +173,11 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                }
                options.Topics = distinct(options.Topics)
 
+               err = addMessageCryptoIfMissing(client, &options, 
options.Topics)
+               if err != nil {
+                       return nil, err
+               }
+
                return newMultiTopicConsumer(client, options, options.Topics, 
messageCh, dlq, rlq)
        }
 
@@ -181,6 +191,12 @@ func newConsumer(client *client, options ConsumerOptions) 
(Consumer, error) {
                if err != nil {
                        return nil, err
                }
+
+               err = addMessageCryptoIfMissing(client, &options, tn.Name)
+               if err != nil {
+                       return nil, err
+               }
+
                return newRegexConsumer(client, options, tn, pattern, 
messageCh, dlq, rlq)
        }
 
@@ -654,3 +670,17 @@ func (c *consumer) messageID(msgID MessageID) 
(trackingMessageID, bool) {
 
        return mid, true
 }
+
+func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, 
topics interface{}) error {
+       // decryption is enabled, use default messagecrypto if not provided
+       if options.Decryption != nil && options.Decryption.MessageCrypto == nil 
{
+               messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
+                       false,
+                       client.log.SubLogger(log.Fields{"topic": topics}))
+               if err != nil {
+                       return err
+               }
+               options.Decryption.MessageCrypto = messageCrypto
+       }
+       return nil
+}
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 1d95c42..7679a8c 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -186,13 +186,6 @@ func newPartitionConsumer(parent Consumer, client *client, 
options *partitionCon
        if pc.options.decryption == nil {
                decryptor = cryptointernal.NewNoopDecryptor() // default to 
noopDecryptor
        } else {
-               if options.decryption.MessageCrypto == nil {
-                       messageCrypto, err := 
crypto.NewDefaultMessageCrypto("decrypt", false, pc.log)
-                       if err != nil {
-                               return nil, err
-                       }
-                       options.decryption.MessageCrypto = messageCrypto
-               }
                decryptor = cryptointernal.NewConsumerDecryptor(
                        options.decryption.KeyReader,
                        options.decryption.MessageCrypto,
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 55823e4..cadd8e4 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -332,14 +332,77 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
        assert.Nil(t, err)
        defer client.Close()
 
+       topic := "persistent://public/default/testGetPartitions5"
+       testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testGetPartitions5/partitions"
+
+       makeHTTPCall(t, http.MethodPut, testURL, "64")
+
+       // create producer
+       producer, err := client.CreateProducer(ProducerOptions{
+               Topic: topic,
+       })
+       assert.Nil(t, err)
+       defer producer.Close()
+
+       topics, err := client.TopicPartitions(topic)
+       assert.Nil(t, err)
+       assert.Equal(t, topic+"-partition-0", topics[0])
+       assert.Equal(t, topic+"-partition-1", topics[1])
+       assert.Equal(t, topic+"-partition-2", topics[2])
+
+       consumer, err := client.Subscribe(ConsumerOptions{
+               Topic:             topic,
+               SubscriptionName:  "my-sub",
+               Type:              Exclusive,
+               ReceiverQueueSize: 10,
+       })
+       assert.Nil(t, err)
+       defer consumer.Close()
+
+       ctx := context.Background()
+       for i := 0; i < 10; i++ {
+               _, err := producer.Send(ctx, &ProducerMessage{
+                       Payload: []byte(fmt.Sprintf("hello-%d", i)),
+               })
+               assert.Nil(t, err)
+       }
+
+       msgs := make([]string, 0)
+
+       for i := 0; i < 10; i++ {
+               msg, err := consumer.Receive(ctx)
+               assert.Nil(t, err)
+               msgs = append(msgs, string(msg.Payload()))
+
+               fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
+                       msg.ID(), string(msg.Payload()))
+
+               consumer.Ack(msg)
+       }
+
+       assert.Equal(t, len(msgs), 10)
+}
+
+func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) {
+       client, err := NewClient(ClientOptions{
+               URL: lookupURL,
+       })
+       assert.Nil(t, err)
+       defer client.Close()
+
        topic := "persistent://public/default/testGetPartitions"
        testURL := adminURL + "/" + 
"admin/v2/persistent/public/default/testGetPartitions/partitions"
 
-       makeHTTPCall(t, http.MethodPut, testURL, "64")
+       makeHTTPCall(t, http.MethodPut, testURL, "6")
 
        // create producer
        producer, err := client.CreateProducer(ProducerOptions{
                Topic: topic,
+               Encryption: &ProducerEncryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       Keys: []string{"client-rsa.pem"},
+               },
        })
        assert.Nil(t, err)
        defer producer.Close()
@@ -355,6 +418,11 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) {
                SubscriptionName:  "my-sub",
                Type:              Exclusive,
                ReceiverQueueSize: 10,
+               Decryption: &MessageDecryptionInfo{
+                       KeyReader: 
crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem",
+                               "crypto/testdata/pri_key_rsa.pem"),
+                       ConsumerCryptoFailureAction: 
crypto.ConsumerCryptoFailureActionFail,
+               },
        })
        assert.Nil(t, err)
        defer consumer.Close()
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 48e2aa4..9bbfccb 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -19,11 +19,13 @@ package pulsar
 
 import (
        "context"
+       "fmt"
        "sync"
        "sync/atomic"
        "time"
        "unsafe"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -124,6 +126,25 @@ func newProducer(client *client, options *ProducerOptions) 
(*producer, error) {
                }
        }
 
+       encryption := options.Encryption
+       // add default message crypto if not provided
+       if encryption != nil && len(encryption.Keys) > 0 {
+               if encryption.KeyReader == nil {
+                       return nil, fmt.Errorf("encryption is enabled, 
KeyReader can not be nil")
+               }
+
+               if encryption.MessageCrypto == nil {
+                       logCtx := fmt.Sprintf("[%v] [%v]", p.topic, 
p.options.Name)
+                       messageCrypto, err := 
crypto.NewDefaultMessageCrypto(logCtx,
+                               true,
+                               client.log.SubLogger(log.Fields{"topic": 
p.topic}))
+                       if err != nil {
+                               return nil, fmt.Errorf("unable to get 
MessageCrypto instance. Producer creation is abandoned. %v", err)
+                       }
+                       p.options.Encryption.MessageCrypto = messageCrypto
+               }
+       }
+
        err := p.internalCreatePartitionsProducers()
        if err != nil {
                return nil, err
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 8fdcfdb..e318b81 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -19,13 +19,11 @@ package pulsar
 
 import (
        "context"
-       "fmt"
        "strings"
        "sync"
        "sync/atomic"
        "time"
 
-       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal/compression"
        internalcrypto 
"github.com/apache/pulsar-client-go/pulsar/internal/crypto"
 
@@ -142,25 +140,6 @@ func newPartitionProducer(client *client, topic string, 
options *ProducerOptions
        } else {
                p.userProvidedProducerName = false
        }
-
-       encryption := options.Encryption
-       // add default message crypto if not provided
-       if encryption != nil && len(encryption.Keys) > 0 {
-               if encryption.KeyReader == nil {
-                       return nil, fmt.Errorf("encryption is enabled, 
KeyReader can not be nil")
-               }
-
-               if encryption.MessageCrypto == nil {
-                       logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, 
p.producerName, p.producerID)
-                       messageCrypto, err := 
crypto.NewDefaultMessageCrypto(logCtx, true, logger)
-                       if err != nil {
-                               logger.WithError(err).Error("Unable to get 
MessageCrypto instance. Producer creation is abandoned")
-                               return nil, err
-                       }
-                       p.options.Encryption.MessageCrypto = messageCrypto
-               }
-       }
-
        err := p.grabCnx()
        if err != nil {
                logger.WithError(err).Error("Failed to create producer")
diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go
index c854325..0fed80c 100644
--- a/pulsar/reader_impl.go
+++ b/pulsar/reader_impl.go
@@ -23,6 +23,7 @@ import (
        "sync"
        "time"
 
+       "github.com/apache/pulsar-client-go/pulsar/crypto"
        "github.com/apache/pulsar-client-go/pulsar/internal"
        "github.com/apache/pulsar-client-go/pulsar/log"
 )
@@ -76,6 +77,17 @@ func newReader(client *client, options ReaderOptions) 
(Reader, error) {
                receiverQueueSize = defaultReceiverQueueSize
        }
 
+       // decryption is enabled, use default message crypto if not provided
+       if options.Decryption != nil && options.Decryption.MessageCrypto == nil 
{
+               messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt",
+                       false,
+                       client.log.SubLogger(log.Fields{"topic": 
options.Topic}))
+               if err != nil {
+                       return nil, err
+               }
+               options.Decryption.MessageCrypto = messageCrypto
+       }
+
        consumerOptions := &partitionConsumerOpts{
                topic:                      options.Topic,
                consumerName:               options.Name,

Reply via email to