This is an automated email from the ASF dual-hosted git repository. rxl pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push: new ab96ad7 Encryption support ext consumer (#612) ab96ad7 is described below commit ab96ad7d84b7c53e3e6d241f68c24eb7d6fa0037 Author: Garule Prabhudas <prabhudas...@gmail.com> AuthorDate: Sat Oct 9 14:14:53 2021 +0530 Encryption support ext consumer (#612) * add ability to encrypt messages - use base crypto package for encryption * fix typo * lint fixes * address review suggestions * revert go mod * remove encryption context - move it to Consumer MR * try to fix check issues * remove unused code * consumer encryption/decryption changes * remove embedded crypto struct * remove embedded struct * add comments * merge conflict issues fix * add noop decryptor * lint fixes * fix test case * refactor and reader encryption changes * refactor - move decryptor creation to partition_producer.go - update reader_impl - update consumer_impl * address review feedback * Nack on decryption failure * add comment on test case Co-authored-by: PGarule <pgar...@fanatics.com> --- pulsar/consumer.go | 3 + pulsar/consumer_impl.go | 1 + pulsar/consumer_partition.go | 108 ++- pulsar/consumer_partition_test.go | 4 + pulsar/consumer_test.go | 847 ++++++++++++++++++++- pulsar/encryption.go | 12 + pulsar/impl_message.go | 23 + pulsar/internal/crypto/consumer_decryptor.go | 60 ++ .../crypto/decryptor.go} | 23 +- .../crypto/noop_decryptor.go} | 30 +- .../pulsartracing/message_carrier_util_test.go | 4 + pulsar/message.go | 4 + pulsar/reader.go | 3 + pulsar/reader_impl.go | 1 + pulsar/reader_test.go | 56 ++ 15 files changed, 1148 insertions(+), 31 deletions(-) diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 1c52b29..c9fbc0d 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -155,6 +155,9 @@ type ConsumerOptions struct { // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate) MaxReconnectToBroker *uint + + // Decryption decryption related fields to decrypt the encrypted message + Decryption *MessageDecryptionInfo } // Consumer is an interface that abstracts behavior of Pulsar's consumer diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index 6677062..232079b 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -335,6 +335,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error { maxReconnectToBroker: c.options.MaxReconnectToBroker, keySharedPolicy: c.options.KeySharedPolicy, schema: c.options.Schema, + decryption: c.options.Decryption, } cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq, c.metrics) ch <- ConsumerError{ diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go index 5f74bcf..e691d14 100644 --- a/pulsar/consumer_partition.go +++ b/pulsar/consumer_partition.go @@ -26,8 +26,10 @@ import ( "github.com/gogo/protobuf/proto" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + cryptointernal "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/apache/pulsar-client-go/pulsar/log" @@ -98,6 +100,7 @@ type partitionConsumerOpts struct { maxReconnectToBroker *uint keySharedPolicy *KeySharedPolicy schema Schema + decryption *MessageDecryptionInfo } type partitionConsumer struct { @@ -142,6 +145,7 @@ type partitionConsumer struct { providersMutex sync.RWMutex compressionProviders map[pb.CompressionType]compression.Provider metrics *internal.TopicMetrics + decryptor cryptointernal.Decryptor } func newPartitionConsumer(parent Consumer, client *client, options *partitionConsumerOpts, @@ -176,6 +180,27 @@ func newPartitionConsumer(parent Consumer, client *client, options *partitionCon "subscription": options.subscription, "consumerID": pc.consumerID, }) + + var decryptor cryptointernal.Decryptor + if pc.options.decryption == nil { + decryptor = cryptointernal.NewNoopDecryptor() // default to noopDecryptor + } else { + if options.decryption.MessageCrypto == nil { + messageCrypto, err := crypto.NewDefaultMessageCrypto("decrypt", false, pc.log) + if err != nil { + return nil, err + } + options.decryption.MessageCrypto = messageCrypto + } + decryptor = cryptointernal.NewConsumerDecryptor( + options.decryption.KeyReader, + options.decryption.MessageCrypto, + pc.log, + ) + } + + pc.decryptor = decryptor + pc.nackTracker = newNegativeAcksTracker(pc, options.nackRedeliveryDelay, pc.log) err := pc.grabConn() @@ -480,7 +505,54 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header return err } - uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, headersAndPayload) + decryptedPayload, err := pc.decryptor.Decrypt(headersAndPayload.ReadableSlice(), pbMsgID, msgMeta) + // error decrypting the payload + if err != nil { + // default crypto failure action + crypToFailureAction := crypto.ConsumerCryptoFailureActionFail + if pc.options.decryption != nil { + crypToFailureAction = pc.options.decryption.ConsumerCryptoFailureAction + } + + switch crypToFailureAction { + case crypto.ConsumerCryptoFailureActionFail: + pc.log.Errorf("consuming message failed due to decryption err :%v", err) + pc.NackID(newTrackingMessageID(int64(pbMsgID.GetLedgerId()), int64(pbMsgID.GetEntryId()), 0, 0, nil)) + return err + case crypto.ConsumerCryptoFailureActionDiscard: + pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecryptionError) + return fmt.Errorf("discarding message on decryption error :%v", err) + case crypto.ConsumerCryptoFailureActionConsume: + pc.log.Warnf("consuming encrypted message due to error in decryption :%v", err) + messages := []*message{ + { + publishTime: timeFromUnixTimestampMillis(msgMeta.GetPublishTime()), + eventTime: timeFromUnixTimestampMillis(msgMeta.GetEventTime()), + key: msgMeta.GetPartitionKey(), + producerName: msgMeta.GetProducerName(), + properties: internal.ConvertToStringMap(msgMeta.GetProperties()), + topic: pc.topic, + msgID: newMessageID( + int64(pbMsgID.GetLedgerId()), + int64(pbMsgID.GetEntryId()), + pbMsgID.GetBatchIndex(), + pc.partitionIdx, + ), + payLoad: headersAndPayload.ReadableSlice(), + schema: pc.options.schema, + replicationClusters: msgMeta.GetReplicateTo(), + replicatedFrom: msgMeta.GetReplicatedFrom(), + redeliveryCount: response.GetRedeliveryCount(), + encryptionContext: createEncryptionContext(msgMeta), + }, + } + pc.queueCh <- messages + return nil + } + } + + // decryption is success, decompress the payload + uncompressedHeadersAndPayload, err := pc.Decompress(msgMeta, internal.NewBufferWrapper(decryptedPayload)) if err != nil { pc.discardCorruptedMessage(pbMsgID, pb.CommandAck_DecompressionError) return err @@ -493,6 +565,7 @@ func (pc *partitionConsumer) MessageReceived(response *pb.CommandMessage, header if msgMeta.NumMessagesInBatch != nil { numMsgs = int(msgMeta.GetNumMessagesInBatch()) } + messages := make([]*message, 0) var ackTracker *ackTracker // are there multiple messages in this batch? @@ -590,6 +663,39 @@ func (pc *partitionConsumer) messageShouldBeDiscarded(msgID trackingMessageID) b return pc.startMessageID.greaterEqual(msgID.messageID) } +// create EncryptionContext from message metadata +// this will be used to decrypt the message payload outside of this client +// it is the responsibility of end user to decrypt the payload +// It will be used only when crypto failure action is set to consume i.e crypto.ConsumerCryptoFailureActionConsume +func createEncryptionContext(msgMeta *pb.MessageMetadata) *EncryptionContext { + encCtx := EncryptionContext{ + Algorithm: msgMeta.GetEncryptionAlgo(), + Param: msgMeta.GetEncryptionParam(), + UncompressedSize: int(msgMeta.GetUncompressedSize()), + BatchSize: int(msgMeta.GetNumMessagesInBatch()), + } + + if msgMeta.Compression != nil { + encCtx.CompressionType = CompressionType(*msgMeta.Compression) + } + + kMap := map[string]EncryptionKey{} + for _, k := range msgMeta.GetEncryptionKeys() { + metaMap := map[string]string{} + for _, m := range k.GetMetadata() { + metaMap[*m.Key] = *m.Value + } + + kMap[*k.Key] = EncryptionKey{ + KeyValue: k.GetValue(), + Metadata: metaMap, + } + } + + encCtx.Keys = kMap + return &encCtx +} + func (pc *partitionConsumer) ConnectionClosed() { // Trigger reconnection in the consumer goroutine pc.log.Debug("connection closed and send to connectClosedCh") diff --git a/pulsar/consumer_partition_test.go b/pulsar/consumer_partition_test.go index b1322e3..560afb6 100644 --- a/pulsar/consumer_partition_test.go +++ b/pulsar/consumer_partition_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/apache/pulsar-client-go/pulsar/internal/compression" + "github.com/apache/pulsar-client-go/pulsar/internal/crypto" pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" "github.com/stretchr/testify/assert" @@ -36,6 +37,7 @@ func TestSingleMessageIDNoAckTracker(t *testing.T) { compressionProviders: make(map[pb.CompressionType]compression.Provider), options: &partitionConsumerOpts{}, metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), } headersAndPayload := internal.NewBufferWrapper(rawCompatSingleMessage) @@ -67,6 +69,7 @@ func TestBatchMessageIDNoAckTracker(t *testing.T) { compressionProviders: make(map[pb.CompressionType]compression.Provider), options: &partitionConsumerOpts{}, metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage1) @@ -98,6 +101,7 @@ func TestBatchMessageIDWithAckTracker(t *testing.T) { compressionProviders: make(map[pb.CompressionType]compression.Provider), options: &partitionConsumerOpts{}, metrics: internal.NewMetricsProvider(map[string]string{}).GetTopicMetrics("topic"), + decryptor: crypto.NewNoopDecryptor(), } headersAndPayload := internal.NewBufferWrapper(rawBatchMessage10) diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 66587f9..55823e4 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "io/ioutil" "log" "net/http" "strconv" @@ -27,8 +28,13 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/apache/pulsar-client-go/pulsar/internal" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + plog "github.com/apache/pulsar-client-go/pulsar/log" + "github.com/gogo/protobuf/proto" "github.com/google/uuid" + "github.com/pierrec/lz4" "github.com/stretchr/testify/assert" ) @@ -92,7 +98,6 @@ func TestProducerConsumer(t *testing.T) { assert.Equal(t, []byte(expectMsg), msg.Payload()) assert.Equal(t, "pulsar", msg.Key()) assert.Equal(t, expectProperties, msg.Properties()) - // ack message consumer.Ack(msg) } @@ -2104,3 +2109,843 @@ func TestConsumerKeySharedWithOrderingKey(t *testing.T) { ) assert.Equal(t, 100, receivedConsumer1+receivedConsumer2) } + +func TestProducerConsumerRSAEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond()) + + cryptoConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, + SubscriptionName: "crypto-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + normalConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "normal-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + cryptoProducer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + msgFormat := "my-message-%v" + + totalMessages := 10 + + ctx := context.Background() + + for i := 0; i < totalMessages; i++ { + _, err := cryptoProducer.Send(ctx, &ProducerMessage{ + Value: fmt.Sprintf(msgFormat, i), + }) + + assert.Nil(t, err) + } + + // try to consume with normal consumer + normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + msg, err := normalConsumer.Receive(normalConsumerCtx) + // msg should be null as the consumer will not be able to decrypt + assert.NotNil(t, err) + assert.Nil(t, msg) + + // try to consume the message by crypto consumer + // consumer should be able to read all the messages + var actualMessage *string + for i := 0; i < totalMessages; i++ { + msg, err := cryptoConsumer.Receive(ctx) + fmt.Println(msg) + assert.Nil(t, err) + expectedMsg := fmt.Sprintf(msgFormat, i) + err = msg.GetSchemaValue(&actualMessage) + assert.Nil(t, err) + assert.Equal(t, expectedMsg, *actualMessage) + cryptoConsumer.Ack(msg) + } +} + +func TestProducerConsumerRSAEncryptionWithCompression(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond()) + + cryptoConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + SubscriptionName: "crypto-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + normalConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "normal-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + cryptoProducer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + CompressionType: LZ4, + }) + + assert.Nil(t, err) + + msgFormat := "my-message-%v" + + totalMessages := 10 + + ctx := context.Background() + + for i := 0; i < totalMessages; i++ { + _, err := cryptoProducer.Send(ctx, &ProducerMessage{ + Value: fmt.Sprintf(msgFormat, i), + }) + + assert.Nil(t, err) + } + + // try to consume with normal consumer + normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + msg, err := normalConsumer.Receive(normalConsumerCtx) + // msg should be null as the consumer will not be able to decrypt + assert.NotNil(t, err) + assert.Nil(t, msg) + + // try to consume the message by crypto consumer + // consumer should be able to read all the messages + var actualMessage *string + for i := 0; i < totalMessages; i++ { + msg, err := cryptoConsumer.Receive(ctx) + assert.Nil(t, err) + expectedMsg := fmt.Sprintf(msgFormat, i) + err = msg.GetSchemaValue(&actualMessage) + assert.Nil(t, err) + assert.Equal(t, expectedMsg, *actualMessage) + cryptoConsumer.Ack(msg) + } +} + +func TestBatchProducerConsumerRSAEncryptionWithCompression(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond()) + + cryptoConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + SubscriptionName: "crypto-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + + normalConsumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "normal-subscription", + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + batchSize := 2 + cryptoProducer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + CompressionType: LZ4, + DisableBatching: false, + BatchingMaxMessages: uint(batchSize), + }) + + assert.Nil(t, err) + + msgFormat := "my-message-%v" + + totalMessages := 10 + + ctx := context.Background() + + for i := 0; i < totalMessages; i++ { + _, err := cryptoProducer.Send(ctx, &ProducerMessage{ + Value: fmt.Sprintf(msgFormat, i), + }) + + assert.Nil(t, err) + } + + // try to consume with normal consumer + normalConsumerCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + msg, err := normalConsumer.Receive(normalConsumerCtx) + // msg should be null as the consumer will not be able to decrypt + assert.NotNil(t, err) + assert.Nil(t, msg) + + // try to consume the message by crypto consumer + // consumer should be able to read all the messages + var actualMessage *string + for i := 0; i < totalMessages; i++ { + msg, err := cryptoConsumer.Receive(ctx) + assert.Nil(t, err) + expectedMsg := fmt.Sprintf(msgFormat, i) + err = msg.GetSchemaValue(&actualMessage) + assert.Nil(t, err) + assert.Equal(t, expectedMsg, *actualMessage) + cryptoConsumer.Ack(msg) + } +} + +func TestProducerConsumerRedeliveryOfFailedEncryptedMessages(t *testing.T) { + // create new client instance for each producer and consumer + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + clientCryptoConsumer, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer clientCryptoConsumer.Close() + + clientCryptoConsumerInvalidKeyReader, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer clientCryptoConsumerInvalidKeyReader.Close() + + clientcryptoConsumerNoKeyReader, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer clientcryptoConsumerNoKeyReader.Close() + + topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond()) + + cryptoProducer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + CompressionType: LZ4, + Schema: NewStringSchema(nil), + }) + assert.Nil(t, err) + + sharedSubscription := "crypto-shared-subscription" + + cryptoConsumer, err := clientCryptoConsumer.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sharedSubscription, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + Schema: NewStringSchema(nil), + Type: Shared, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + + cryptoConsumerInvalidKeyReader, err := clientCryptoConsumerInvalidKeyReader.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sharedSubscription, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa_invalid.pem"), + }, + Schema: NewStringSchema(nil), + Type: Shared, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + + cryptoConsumerNoKeyReader, err := clientcryptoConsumerNoKeyReader.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: sharedSubscription, + Schema: NewStringSchema(nil), + Type: Shared, + NackRedeliveryDelay: 1 * time.Second, + }) + assert.Nil(t, err) + + totalMessages := 5 + message := "my-message-%v" + // since messages can be in random order + // map can be used to check if all the messages are received + messageMap := map[string]struct{}{} + + // producer messages + for i := 0; i < totalMessages; i++ { + mid, err := cryptoProducer.Send(context.Background(), &ProducerMessage{ + Value: fmt.Sprintf(message, i), + }) + assert.Nil(t, err) + fmt.Printf("Sent : %v\n", mid) + } + + // Consuming from consumer 2 and 3 + // no message should be returned since they can't decrypt the message + ctxWithTimeOut1, c1 := context.WithTimeout(context.Background(), 2*time.Second) + defer c1() + + ctxWithTimeOut2, c2 := context.WithTimeout(context.Background(), 2*time.Second) + defer c2() + + // try to consume messages + msg, err := cryptoConsumerInvalidKeyReader.Receive(ctxWithTimeOut1) + assert.NotNil(t, err) + assert.Nil(t, msg) + + msg, err = cryptoConsumerNoKeyReader.Receive(ctxWithTimeOut2) + assert.NotNil(t, err) + assert.Nil(t, msg) + + cryptoConsumerInvalidKeyReader.Close() + cryptoConsumerNoKeyReader.Close() + + // try to consume by consumer1 + // all the messages would by received by it + var receivedMsg *string + for i := 0; i < totalMessages; i++ { + m, err := cryptoConsumer.Receive(context.Background()) + assert.Nil(t, err) + err = m.GetSchemaValue(&receivedMsg) + assert.Nil(t, err) + messageMap[*receivedMsg] = struct{}{} + cryptoConsumer.Ack(m) + fmt.Printf("Received : %v\n", m.ID()) + } + + // check if all messages were received + for i := 0; i < totalMessages; i++ { + key := fmt.Sprintf(message, i) + _, ok := messageMap[key] + assert.True(t, ok) + } +} + +func TestRSAEncryptionFailure(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + client.Close() + + topic := fmt.Sprintf("my-topic-enc-%v", time.Now().Nanosecond()) + + // 1. invalid key name + // create producer with invalid key + // producer creation succeeds but message sending should fail with an error + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa_invalid.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + DisableBatching: true, + }) + assert.Nil(t, err) + assert.NotNil(t, producer) + + // sending of message should fail with an error, since invalid rsa keys are configured + mid, err := producer.Send(context.Background(), &ProducerMessage{ + Value: "some-message", + }) + + assert.Nil(t, mid) + assert.NotNil(t, err) + producer.Close() + + // 2. Producer with valid key name + producer, err = client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"client-rsa.pem"}, + }, + Schema: NewStringSchema(nil), + DisableBatching: true, + }) + assert.Nil(t, err) + assert.NotNil(t, producer) + + subscriptionName := "enc-failure-subcription" + totalMessages := 10 + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + }) + assert.Nil(t, err) + + messageFormat := "my-message-%v" + for i := 0; i < totalMessages; i++ { + _, err := producer.Send(context.Background(), &ProducerMessage{ + Value: fmt.Sprintf(messageFormat, i), + }) + assert.Nil(t, err) + } + + // 3. KeyReader is not set by the consumer + // Receive should fail since KeyReader is not setup + // because default behaviour of consumer is fail receiving message if error in decryption + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + msg, err := consumer.Receive(ctx) + assert.NotNil(t, err) + assert.Nil(t, msg, "Receive should have failed with no keyreader") + + // 4. Set consumer config to consume even if decryption fails + consumer.Close() + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + Decryption: &MessageDecryptionInfo{ + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionConsume, + }, + Schema: NewStringSchema(nil), + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + ctx2, cancel2 := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel2() + + for i := 0; i < totalMessages-1; i++ { + expectedMessage := fmt.Sprintf(messageFormat, i) + msg, err = consumer.Receive(ctx2) + assert.Nil(t, err) + assert.NotNil(t, msg) + + receivedMsg := string(msg.Payload()) + assert.NotEqual(t, expectedMessage, receivedMsg, fmt.Sprintf(`Received encrypted message [%v] + should not match the expected message [%v]`, expectedMessage, receivedMsg)) + // verify the message contains Encryption context + assert.NotEmpty(t, msg.GetEncryptionContext(), + "Encrypted message which is failed to decrypt must contain EncryptionContext") + consumer.Ack(msg) + } + + // 5. discard action on decryption failure + consumer.Close() + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subscriptionName, + Decryption: &MessageDecryptionInfo{ + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionDiscard, + }, + Schema: NewStringSchema(nil), + }) + assert.Nil(t, err) + assert.NotNil(t, consumer) + + ctx3, cancel3 := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel3() + + msg, err = consumer.Receive(ctx3) + assert.NotNil(t, err) + assert.Nil(t, msg, "Message received even aftet ConsumerCryptoFailureAction.Discard is set.") +} + +func TestConsumerCompressionWithRSAEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := newTopicName() + ctx := context.Background() + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + CompressionType: LZ4, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"enc-compress-app.key"}, + }, + }) + + assert.Nil(t, err) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: "sub-1", + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + }) + + assert.Nil(t, err) + defer consumer.Close() + + const N = 100 + + for i := 0; i < N; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("msg-content-%d", i)), + }); err != nil { + t.Fatal(err) + } + } + + for i := 0; i < N; i++ { + msg, err := consumer.Receive(ctx) + assert.Nil(t, err) + assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload())) + consumer.Ack(msg) + } +} + +func TestBatchMessageReceiveWithCompressionAndRSAEcnryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topicName := "persistent://public/default/receive-batch-comp-enc" + subName := "subscription-name" + prefix := "msg-batch-" + ctx := context.Background() + + // Enable batching on producer side + batchSize, numOfMessages := 2, 100 + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topicName, + BatchingMaxMessages: uint(batchSize), + DisableBatching: false, + CompressionType: LZ4, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"batch-encryption-app.key"}, + }, + }) + assert.Nil(t, err) + assert.Equal(t, topicName, producer.Topic()) + defer producer.Close() + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topicName, + SubscriptionName: subName, + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + }) + + assert.Nil(t, err) + defer consumer.Close() + + count := 0 + for i := 0; i < numOfMessages; i++ { + messageContent := prefix + fmt.Sprintf("%d", i) + msg := &ProducerMessage{ + Payload: []byte(messageContent), + } + _, err := producer.Send(ctx, msg) + assert.Nil(t, err) + } + + for i := 0; i < numOfMessages; i++ { + msg, err := consumer.Receive(ctx) + fmt.Printf("received : %v\n", string(msg.Payload())) + assert.Nil(t, err) + consumer.Ack(msg) + count++ + } + + assert.Equal(t, count, numOfMessages) +} + +type EncKeyReader struct { + publicKeyPath string + privateKeyPath string + metaMap map[string]string +} + +func NewEncKeyReader(publicKeyPath, privateKeyPath string) *EncKeyReader { + metaMap := map[string]string{ + "version": "1.0", + } + + return &EncKeyReader{ + publicKeyPath: publicKeyPath, + privateKeyPath: privateKeyPath, + metaMap: metaMap, + } +} + +// GetPublicKey read public key from the given path +func (d *EncKeyReader) PublicKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) { + return readKey(keyName, d.publicKeyPath, d.metaMap) +} + +// GetPrivateKey read private key from the given path +func (d *EncKeyReader) PrivateKey(keyName string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) { + return readKey(keyName, d.privateKeyPath, d.metaMap) +} + +func readKey(keyName, path string, keyMeta map[string]string) (*crypto.EncryptionKeyInfo, error) { + key, err := ioutil.ReadFile(path) + if err != nil { + return nil, err + } + return crypto.NewEncryptionKeyInfo(keyName, key, keyMeta), nil +} + +func TestConsumerEncryptionWithoutKeyReader(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + encryptionKeyName := "client-rsa.pem" + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{encryptionKeyName}, + }, + CompressionType: LZ4, + Schema: NewStringSchema(nil), + }) + + assert.Nil(t, err) + assert.NotNil(t, producer) + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-subscription-name", + Decryption: &MessageDecryptionInfo{ + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionConsume, + }, + Schema: NewStringSchema(nil), + }) + assert.Nil(t, err) + + message := "my-message" + + _, err = producer.Send(context.Background(), &ProducerMessage{ + Value: message, + }) + assert.Nil(t, err) + + // consume encrypted message + msg, err := consumer.Receive(context.Background()) + assert.Nil(t, err) + assert.NotNil(t, msg) + + // try to decrypt message + encCtx := msg.GetEncryptionContext() + assert.NotEmpty(t, encCtx) + + keys := encCtx.Keys + assert.Equal(t, 1, len(keys)) + + encryptionKey, ok := keys[encryptionKeyName] + assert.True(t, ok) + + encDataKey := encryptionKey.KeyValue + assert.NotNil(t, encDataKey) + + metadata := encryptionKey.Metadata + assert.NotNil(t, metadata) + + version := metadata["version"] + assert.Equal(t, "1.0", version) + + compressionType := encCtx.CompressionType + uncompressedSize := uint32(encCtx.UncompressedSize) + encParam := encCtx.Param + encAlgo := encCtx.Algorithm + batchSize := encCtx.BatchSize + + // try to decrypt using default MessageCrypto + msgCrypto, err := crypto.NewDefaultMessageCrypto("testing", false, plog.DefaultNopLogger()) + assert.Nil(t, err) + + producerName := "test" + sequenceID := uint64(123) + publishTime := uint64(12333453454) + + messageMetaData := pb.MessageMetadata{ + EncryptionParam: encParam, + ProducerName: &producerName, + SequenceId: &sequenceID, + PublishTime: &publishTime, + UncompressedSize: &uncompressedSize, + EncryptionAlgo: &encAlgo, + } + + if compressionType == LZ4 { + messageMetaData.Compression = pb.CompressionType_LZ4.Enum() + } + + messageMetaData.EncryptionKeys = []*pb.EncryptionKeys{{ + Key: &encryptionKeyName, + Value: encDataKey, + }} + + decryptedPayload, err := msgCrypto.Decrypt(crypto.NewMessageMetadataSupplier(&messageMetaData), + msg.Payload(), + NewEncKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem")) + assert.Nil(t, err) + assert.NotNil(t, decryptedPayload) + + // try to uncompress payload + uncompressedPayload := make([]byte, uncompressedSize) + s, err := lz4.UncompressBlock(decryptedPayload, uncompressedPayload) + assert.Nil(t, err) + assert.Equal(t, uncompressedSize, uint32(s)) + + buffer := internal.NewBufferWrapper(uncompressedPayload) + + if batchSize > 0 { + size := buffer.ReadUint32() + var meta pb.SingleMessageMetadata + if err := proto.Unmarshal(buffer.Read(size), &meta); err != nil { + fmt.Println(err) + } + d := buffer.Read(uint32(meta.GetPayloadSize())) + assert.Equal(t, message, string(d)) + } +} + +// TestEncryptDecryptRedeliveryOnFailure test redelivery failed messages +func TestEncryptDecryptRedeliveryOnFailure(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + + topic := newTopicName() + subcription := "test-subscription-redelivery" + + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subcription, + Decryption: &MessageDecryptionInfo{ + KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_invalid_rsa.pem"), + }, + }) + assert.Nil(t, err) + + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + Keys: []string{"new-enc-key"}, + }, + }) + assert.Nil(t, err) + + producer.Send(context.Background(), &ProducerMessage{ + Payload: []byte("new-test-message"), + }) + + ctx, cancel := context.WithTimeout(context.Background(), 1000*time.Millisecond) + defer cancel() + + // message receive should fail due to decryption error + msg, err := consumer.Receive(ctx) + assert.Nil(t, msg) + assert.NotNil(t, err) + + consumer.Close() + + // create consumer with same subscription and proper rsa key pairs + consumer, err = client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: subcription, + Decryption: &MessageDecryptionInfo{ + KeyReader: NewEncKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + }, + }) + assert.Nil(t, err) + + // previous message should be redelivered + msg, err = consumer.Receive(context.Background()) + assert.Nil(t, err) + assert.NotNil(t, msg) + consumer.Ack(msg) +} diff --git a/pulsar/encryption.go b/pulsar/encryption.go index aade2ca..3ab2527 100644 --- a/pulsar/encryption.go +++ b/pulsar/encryption.go @@ -34,3 +34,15 @@ type ProducerEncryptionInfo struct { // default is ProducerCryptoFailureActionFail ProducerCryptoFailureAction int } + +// MessageDecryptionInfo encryption related fields required by the consumer to decrypt the message +type MessageDecryptionInfo struct { + // KeyReader read RSA public/private key pairs + KeyReader crypto.KeyReader + + // MessageCrypto used to encrypt and decrypt the data and session keys + MessageCrypto crypto.MessageCrypto + + // ConsumerCryptoFailureAction action to be taken on failure of message decryption + ConsumerCryptoFailureAction int +} diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go index c4f215c..19fa6d8 100644 --- a/pulsar/impl_message.go +++ b/pulsar/impl_message.go @@ -201,6 +201,24 @@ func timeFromUnixTimestampMillis(timestamp uint64) time.Time { return time.Unix(seconds, nanos) } +// EncryptionContext +// It will be used to decrypt message outside of this client +type EncryptionContext struct { + Keys map[string]EncryptionKey + Param []byte + Algorithm string + CompressionType CompressionType + UncompressedSize int + BatchSize int +} + +// EncryptionKey +// Encryption key used to encrypt the message payload +type EncryptionKey struct { + KeyValue []byte + Metadata map[string]string +} + type message struct { publishTime time.Time eventTime time.Time @@ -215,6 +233,7 @@ type message struct { replicatedFrom string redeliveryCount uint32 schema Schema + encryptionContext *EncryptionContext } func (msg *message) Topic() string { @@ -269,6 +288,10 @@ func (msg *message) ProducerName() string { return msg.producerName } +func (msg *message) GetEncryptionContext() *EncryptionContext { + return msg.encryptionContext +} + func newAckTracker(size int) *ackTracker { var batchIDs *big.Int if size <= 64 { diff --git a/pulsar/internal/crypto/consumer_decryptor.go b/pulsar/internal/crypto/consumer_decryptor.go new file mode 100644 index 0000000..bbc1f9b --- /dev/null +++ b/pulsar/internal/crypto/consumer_decryptor.go @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package crypto + +import ( + "fmt" + + "github.com/apache/pulsar-client-go/pulsar/crypto" + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" + "github.com/apache/pulsar-client-go/pulsar/log" +) + +type consumerDecryptor struct { + keyReader crypto.KeyReader + messageCrypto crypto.MessageCrypto + logger log.Logger +} + +func NewConsumerDecryptor(keyReader crypto.KeyReader, + messageCrypto crypto.MessageCrypto, + logger log.Logger) Decryptor { + return &consumerDecryptor{ + keyReader: keyReader, + messageCrypto: messageCrypto, + logger: logger, + } +} + +func (d *consumerDecryptor) Decrypt(payload []byte, + msgID *pb.MessageIdData, + msgMetadata *pb.MessageMetadata) ([]byte, error) { + // encryption keys are not present in message metadta, no need decrypt the payload + if len(msgMetadata.GetEncryptionKeys()) == 0 { + return payload, nil + } + + // KeyReader interface is not implemented + if d.keyReader == nil { + return payload, fmt.Errorf("KeyReader interface is not implemented") + } + + return d.messageCrypto.Decrypt(crypto.NewMessageMetadataSupplier(msgMetadata), + payload, + d.keyReader) +} diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/decryptor.go similarity index 56% copy from pulsar/encryption.go copy to pulsar/internal/crypto/decryptor.go index aade2ca..da67d5c 100644 --- a/pulsar/encryption.go +++ b/pulsar/internal/crypto/decryptor.go @@ -15,22 +15,13 @@ // specific language governing permissions and limitations // under the License. -package pulsar +package crypto -import "github.com/apache/pulsar-client-go/pulsar/crypto" +import ( + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" +) -// ProducerEncryptionInfo encryption related fields required by the producer -type ProducerEncryptionInfo struct { - // KeyReader read RSA public/private key pairs - KeyReader crypto.KeyReader - - // MessageCrypto used to encrypt and decrypt the data and session keys - MessageCrypto crypto.MessageCrypto - - // Keys list of encryption key names to encrypt session key - Keys []string - - // ProducerCryptoFailureAction action to be taken on failure of message encryption - // default is ProducerCryptoFailureActionFail - ProducerCryptoFailureAction int +// Decryptor support decrypting of message +type Decryptor interface { + Decrypt(payload []byte, msgID *pb.MessageIdData, msgMetadata *pb.MessageMetadata) ([]byte, error) } diff --git a/pulsar/encryption.go b/pulsar/internal/crypto/noop_decryptor.go similarity index 56% copy from pulsar/encryption.go copy to pulsar/internal/crypto/noop_decryptor.go index aade2ca..c049c47 100644 --- a/pulsar/encryption.go +++ b/pulsar/internal/crypto/noop_decryptor.go @@ -15,22 +15,26 @@ // specific language governing permissions and limitations // under the License. -package pulsar +package crypto -import "github.com/apache/pulsar-client-go/pulsar/crypto" +import ( + "fmt" -// ProducerEncryptionInfo encryption related fields required by the producer -type ProducerEncryptionInfo struct { - // KeyReader read RSA public/private key pairs - KeyReader crypto.KeyReader + pb "github.com/apache/pulsar-client-go/pulsar/internal/pulsar_proto" +) - // MessageCrypto used to encrypt and decrypt the data and session keys - MessageCrypto crypto.MessageCrypto +type noopDecryptor struct{} - // Keys list of encryption key names to encrypt session key - Keys []string +func NewNoopDecryptor() Decryptor { + return &noopDecryptor{} +} - // ProducerCryptoFailureAction action to be taken on failure of message encryption - // default is ProducerCryptoFailureActionFail - ProducerCryptoFailureAction int +// Decrypt noop decryptor +func (d *noopDecryptor) Decrypt(payload []byte, + msgID *pb.MessageIdData, + msgMetadata *pb.MessageMetadata) ([]byte, error) { + if len(msgMetadata.GetEncryptionKeys()) > 0 { + return payload, fmt.Errorf("incoming message payload is encrypted, consumer is not configured to decrypt") + } + return payload, nil } diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go index 9fe608d..677a7ff 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util_test.go +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -119,3 +119,7 @@ func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error { func (msg *mockConsumerMessage) ProducerName() string { return "" } + +func (msg *mockConsumerMessage) GetEncryptionContext() *pulsar.EncryptionContext { + return &pulsar.EncryptionContext{} +} diff --git a/pulsar/message.go b/pulsar/message.go index 23dfefb..1003bb1 100644 --- a/pulsar/message.go +++ b/pulsar/message.go @@ -116,6 +116,10 @@ type Message interface { //Get the de-serialized value of the message, according the configured GetSchemaValue(v interface{}) error + + // GetEncryptionContext get the ecryption context of message + // It will be used by the application to parse undecrypted message + GetEncryptionContext() *EncryptionContext } // MessageID identifier for a particular message diff --git a/pulsar/reader.go b/pulsar/reader.go index 40234aa..7af3470 100644 --- a/pulsar/reader.go +++ b/pulsar/reader.go @@ -76,6 +76,9 @@ type ReaderOptions struct { // ReadCompacted can only be enabled when reading from a persistent topic. Attempting to enable it on non-persistent // topics will lead to the reader create call throwing a PulsarClientException. ReadCompacted bool + + // Decryption decryption related fields to decrypt the encrypted message + Decryption *MessageDecryptionInfo } // Reader can be used to scan through all the messages currently available in a topic. diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 9983286..0e63f8b 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -89,6 +89,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { metadata: options.Properties, nackRedeliveryDelay: defaultNackRedeliveryDelay, replicateSubscriptionState: false, + decryption: options.Decryption, } reader := &reader{ diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index 618f5ab..bdafea0 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/apache/pulsar-client-go/pulsar/crypto" "github.com/stretchr/testify/assert" ) @@ -654,3 +655,58 @@ func TestReaderWithMultiHosts(t *testing.T) { assert.Equal(t, 10, i) } + +func TestProducerReaderRSAEncryption(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + + // create reader + reader, err := client.CreateReader(ReaderOptions{ + Topic: topic, + StartMessageID: EarliestMessageID(), + Decryption: &MessageDecryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ConsumerCryptoFailureAction: crypto.ConsumerCryptoFailureActionFail, + }, + }) + assert.Nil(t, err) + defer reader.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + Encryption: &ProducerEncryptionInfo{ + KeyReader: crypto.NewFileKeyReader("crypto/testdata/pub_key_rsa.pem", + "crypto/testdata/pri_key_rsa.pem"), + ProducerCryptoFailureAction: crypto.ProducerCryptoFailureActionFail, + Keys: []string{"client-rsa.pem"}, + }, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + } + + // receive 10 messages + for i := 0; i < 10; i++ { + msg, err := reader.Next(context.Background()) + assert.NoError(t, err) + + expectMsg := fmt.Sprintf("hello-%d", i) + assert.Equal(t, []byte(expectMsg), msg.Payload()) + } +}