This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new 468bfd6 Encryption failure test case fix (#708) 468bfd6 is described below commit 468bfd6bdcd45857e64fd499b98c7f30ec6f60f7 Author: Garule Prabhudas <prabhudas...@gmail.com> AuthorDate: Mon Jan 17 12:02:51 2022 +0530 Encryption failure test case fix (#708) * test case to detect race condition in creation of producer/consumer * fix for race condition in consumer/producer creation * refactor * restore test case Co-authored-by: PGarule <pgar...@fanatics.com> --- pulsar/consumer_impl.go | 30 +++++++++++++++++++ pulsar/consumer_partition.go | 7 ----- pulsar/consumer_test.go | 70 +++++++++++++++++++++++++++++++++++++++++++- pulsar/producer_impl.go | 21 +++++++++++++ pulsar/producer_partition.go | 21 ------------- pulsar/reader_impl.go | 12 ++++++++ 6 files changed, 132 insertions(+), 29 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 00e0b76..2bd3ed5 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -156,6 +157,10 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { return nil, err } topic = tns[0].Name + err = addMessageCryptoIfMissing(client, &options, topic) + if err != nil { + return nil, err + } return newInternalConsumer(client, options, topic, messageCh, dlq, rlq, false) } @@ -168,6 +173,11 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } options.Topics = distinct(options.Topics) + err = addMessageCryptoIfMissing(client, &options, options.Topics) + if err != nil { + return nil, err + } + return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq) } @@ -181,6 +191,12 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { if err != nil { return nil, err } + + err = addMessageCryptoIfMissing(client, &options, tn.Name) + if err != nil { + return nil, err + } + return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq) } @@ -654,3 +670,17 @@ func (c *consumer) messageID(msgID MessageID) (trackingMessageID, bool) { return mid, true } + +func addMessageCryptoIfMissing(client *client, options *ConsumerOptions, topics interface{}) error { + // decryption is enabled, use default messagecrypto if not provided + if options.Decryption != nil && options.Decryption.MessageCrypto == nil { + messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", + false, + client.log.SubLogger(log.Fields{"topic": topics})) + if err != nil { + return err + } + options.Decryption.MessageCrypto = messageCrypto + } + return nil +} diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 1d95c42..7679a8c 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -186,13 +186,6 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon if pc.options.decryption == nil { decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor } else { - if options.decryption.MessageCrypto == nil { - messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", false, pc.log) - if err != nil { - return nil, err - } - options.decryption.MessageCrypto = messageCrypto - } decryptor = cryptointernal.NewConsumerDecryptor( options.decryption.KeyReader, options.decryption.MessageCrypto, diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 55823e4..cadd8e4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -332,14 +332,77 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { assert.Nil(t, err) defer client.Close() + topic := "persistent://public/default/testGetPartitions5" + testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions5/partitions" + + makeHTTPCall(t, http.MethodPut, testURL, "64") + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + }) + assert.Nil(t, err) + defer producer.Close() + + topics, err := client.TopicPartitions(topic) + assert.Nil(t, err) + assert.Equal(t, topic+"-partition-0", topics[0]) + assert.Equal(t, topic+"-partition-1", topics[1]) + assert.Equal(t, topic+"-partition-2", topics[2]) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Exclusive, + ReceiverQueueSize: 10, + }) + assert.Nil(t, err) + defer consumer.Close() + + ctx := context.Background() + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.Nil(t, err) + } + + msgs := make([]string, 0) + + for i := 0; i < 10; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + msgs = append(msgs, string(msg.Payload())) + + fmt.Printf("Received message msgId: %#v -- content: '%s'\n", + msg.ID(), string(msg.Payload())) + + consumer.Ack(msg) + } + + assert.Equal(t, len(msgs), 10) +} + +func TestPartitionTopicsConsumerPubSubEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + topic := "persistent://public/default/testGetPartitions" testURL := adminURL + "/" + "admin/v2/persistent/public/default/testGetPartitions/partitions" - makeHTTPCall(t, http.MethodPut, testURL, "64") + makeHTTPCall(t, http.MethodPut, testURL, "6") // create producer producer, err := client.CreateProducer(ProducerOptions{ Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, }) assert.Nil(t, err) defer producer.Close() @@ -355,6 +418,11 @@ func TestPartitionTopicsConsumerPubSub(t *testing.T) { SubscriptionName: "my-sub", Type: Exclusive, ReceiverQueueSize: 10, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, }) assert.Nil(t, err) defer consumer.Close() diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go index 48e2aa4..9bbfccb 100644 --- a/pulsar/producer_impl.go +++ b/pulsar/producer_impl.go @@ -19,11 +19,13 @@ package pulsar import ( "context" + "fmt" "sync" "sync/atomic" "time" "unsafe" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -124,6 +126,25 @@ func newProducer(client *client, options *ProducerOptions) (*producer, error) { } } + encryption := options.Encryption + // add default message crypto if not provided + if encryption != nil && len(encryption.Keys) > 0 { + if encryption.KeyReader == nil { + return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil") + } + + if encryption.MessageCrypto == nil { + logCtx := fmt.Sprintf("[%v] [%v]", p.topic, p.options.Name) + messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, + true, + client.log.SubLogger(log.Fields{"topic": p.topic})) + if err != nil { + return nil, fmt.Errorf("unable to get MessageCrypto instance. Producer creation is abandoned. %v", err) + } + p.options.Encryption.MessageCrypto = messageCrypto + } + } + err := p.internalCreatePartitionsProducers() if err != nil { return nil, err diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go index 8fdcfdb..e318b81 100644 --- a/pulsar/producer_partition.go +++ b/pulsar/producer_partition.go @@ -19,13 +19,11 @@ package pulsar import ( "context" - "fmt" "strings" "sync" "sync/atomic" "time" - "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal/compression" internalcrypto "github.com/apache/pulsar-client-go/pulsar/internal/crypto" @@ -142,25 +140,6 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions } else { p.userProvidedProducerName = false } - - encryption := options.Encryption - // add default message crypto if not provided - if encryption != nil && len(encryption.Keys) > 0 { - if encryption.KeyReader == nil { - return nil, fmt.Errorf("encryption is enabled, KeyReader can not be nil") - } - - if encryption.MessageCrypto == nil { - logCtx := fmt.Sprintf("[%v] [%v] [%v]", p.topic, p.producerName, p.producerID) - messageCrypto, err := crypto.NewDefaultMessageCrypto(logCtx, true, logger) - if err != nil { - logger.WithError(err).Error("Unable to get MessageCrypto instance. Producer creation is abandoned") - return nil, err - } - p.options.Encryption.MessageCrypto = messageCrypto - } - } - err := p.grabCnx() if err != nil { logger.WithError(err).Error("Failed to create producer") diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index c854325..0fed80c 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -23,6 +23,7 @@ import ( "sync" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/log" ) @@ -76,6 +77,17 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { receiverQueueSize = defaultReceiverQueueSize } + // decryption is enabled, use default message crypto if not provided + if options.Decryption != nil && options.Decryption.MessageCrypto == nil { + messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", + false, + client.log.SubLogger(log.Fields{"topic": options.Topic})) + if err != nil { + return nil, err + } + options.Decryption.MessageCrypto = messageCrypto + } + consumerOptions := &partitionConsumerOpts{ topic: options.Topic, consumerName: options.Name,