This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a081ad  Added end to end encryption in C++ client (#1129)
9a081ad is described below

commit 9a081ad9d69aa3f250d5d821ab1d8169b728eeda
Author: Andrews <sahaya.andr...@gmail.com>
AuthorDate: Thu Feb 1 17:17:10 2018 -0800

    Added end to end encryption in C++ client (#1129)
---
 .../include/pulsar/ConsumerConfiguration.h         |  10 +
 .../include/pulsar/ConsumerCryptoFailureAction.h   |  40 ++
 pulsar-client-cpp/include/pulsar/CryptoKeyReader.h |  70 ++++
 .../include/pulsar/EncryptionKeyInfo.h             |  68 +++
 .../include/pulsar/ProducerConfiguration.h         |  15 +
 .../include/pulsar/ProducerCryptoFailureAction.h   |  36 ++
 pulsar-client-cpp/include/pulsar/Result.h          |   4 +-
 pulsar-client-cpp/lib/BatchMessageContainer.cc     |   4 +
 pulsar-client-cpp/lib/ConsumerConfiguration.cc     |  19 +
 pulsar-client-cpp/lib/ConsumerConfigurationImpl.h  |   6 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc              |  55 ++-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   7 +
 pulsar-client-cpp/lib/EncryptionKeyInfo.cc         |  38 ++
 pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc     |  39 ++
 pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h      |  55 +++
 pulsar-client-cpp/lib/MessageCrypto.cc             | 458 +++++++++++++++++++++
 pulsar-client-cpp/lib/MessageCrypto.h              | 142 +++++++
 pulsar-client-cpp/lib/ProducerConfiguration.cc     |  28 ++
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |   9 +-
 pulsar-client-cpp/lib/ProducerImpl.cc              |  59 ++-
 pulsar-client-cpp/lib/ProducerImpl.h               |  10 +
 pulsar-client-cpp/lib/Result.cc                    |   3 +
 pulsar-client-cpp/perf/PerfConsumer.cc             |  44 +-
 pulsar-client-cpp/perf/PerfProducer.cc             |  49 ++-
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 206 +++++++++
 25 files changed, 1461 insertions(+), 13 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
index c73f942..274abd5 100644
--- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h
@@ -24,6 +24,8 @@
 #include <pulsar/Result.h>
 #include <pulsar/ConsumerType.h>
 #include <pulsar/Message.h>
+#include <pulsar/ConsumerCryptoFailureAction.h>
+#include <pulsar/CryptoKeyReader.h>
 
 #pragma GCC visibility push(default)
 namespace pulsar {
@@ -139,6 +141,14 @@ class ConsumerConfiguration {
      * @return the configured timeout in milliseconds caching 
BrokerConsumerStats.
      */
     long getBrokerConsumerStatsCacheTimeInMs() const;
+
+    bool isEncryptionEnabled() const;
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ConsumerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr 
cryptoKeyReader);
+
+    ConsumerCryptoFailureAction getCryptoFailureAction() const;
+    ConsumerConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction 
action);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h 
b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h
new file mode 100644
index 0000000..d9d845c
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CONSUMERCRYPTOFAILUREACTION_H_
+#define CONSUMERCRYPTOFAILUREACTION_H_
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum class ConsumerCryptoFailureAction
+{
+    FAIL,     // This is the default option to fail consume until crypto 
succeeds
+    DISCARD,  // Message is silently acknowledged and not delivered to the 
application
+    CONSUME   // Deliver the encrypted message to the application. It's the 
application's
+              // responsibility to decrypt the message. If message is also 
compressed,
+              // decompression will fail. If message contain batch messages, 
client will
+              // not be able to retrieve individual messages in the batch
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* CONSUMERCRYPTOFAILUREACTION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h 
b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
new file mode 100644
index 0000000..cb4cf15
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef CRYPTOKEYREADER_H_
+#define CRYPTOKEYREADER_H_
+
+#include <pulsar/Result.h>
+#include <pulsar/EncryptionKeyInfo.h>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class CryptoKeyReader {
+   public:
+    CryptoKeyReader() {}
+
+    /*
+     * Return the encryption key corresponding to the key name in the argument
+     * <p>
+     * This method should be implemented to return the EncryptionKeyInfo. This 
method will be
+     * called at the time of producer creation as well as consumer receiving 
messages.
+     * Hence, application should not make any blocking calls within the 
implementation.
+     * <p>
+     *
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @param encKeyInfo updated with details about the public key
+     * @return Result ResultOk is returned for success
+     *
+     */
+    virtual Result getPublicKey(const std::string& keyName, 
std::map<std::string, std::string>& metadata,
+                                EncryptionKeyInfo& encKeyInfo) const = 0;
+
+    /*
+     * @param keyName
+     *            Unique name to identify the key
+     * @param metadata
+     *            Additional information needed to identify the key
+     * @param encKeyInfo updated with details about the private key
+     * @return Result ResultOk is returned for success
+     */
+    virtual Result getPrivateKey(const std::string& keyName, 
std::map<std::string, std::string>& metadata,
+                                 EncryptionKeyInfo& encKeyInfo) const = 0;
+
+}; /* namespace pulsar */
+
+typedef boost::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr;
+}  // namespace pulsar
+
+#pragma GCC visibility pop
+
+#endif /* CRYPTOKEYREADER_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h 
b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h
new file mode 100644
index 0000000..9461b14
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef ENCRYPTIONKEYINFO_H_
+#define ENCRYPTIONKEYINFO_H_
+
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class EncryptionKeyInfoImpl;
+class PulsarWrapper;
+
+typedef boost::shared_ptr<EncryptionKeyInfoImpl> EncryptionKeyInfoImplPtr;
+
+class EncryptionKeyInfo {
+    /*
+     * This object contains the encryption key and corresponding metadata 
which contains
+     * additional information about the key such as version, timestammp
+     */
+
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+
+    EncryptionKeyInfo();
+
+    EncryptionKeyInfo(std::string key, StringMap& metadata);
+
+    std::string& getKey();
+
+    void setKey(std::string key);
+
+    StringMap& getMetadata(void);
+
+    void setMetadata(StringMap& metadata);
+
+   private:
+    explicit EncryptionKeyInfo(EncryptionKeyInfoImplPtr);
+
+    EncryptionKeyInfoImplPtr impl_;
+
+    friend class PulsarWrapper;
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* ENCRYPTIONKEYINFO_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h 
b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index d12b65d..9f7bf1f 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -23,6 +23,11 @@
 #include <pulsar/Result.h>
 #include <pulsar/Message.h>
 #include <boost/function.hpp>
+#include <pulsar/ProducerCryptoFailureAction.h>
+#include <pulsar/CryptoKeyReader.h>
+
+#include <set>
+
 #pragma GCC visibility push(default)
 
 namespace pulsar {
@@ -112,6 +117,16 @@ class ProducerConfiguration {
     ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& 
batchingMaxPublishDelayMs);
     const unsigned long& getBatchingMaxPublishDelayMs() const;
 
+    const CryptoKeyReaderPtr getCryptoKeyReader() const;
+    ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr 
cryptoKeyReader);
+
+    ProducerCryptoFailureAction getCryptoFailureAction() const;
+    ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction 
action);
+
+    std::set<std::string>& getEncryptionKeys();
+    bool isEncryptionEnabled() const;
+    ProducerConfiguration& addEncryptionKey(std::string key);
+
     friend class PulsarWrapper;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h 
b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h
new file mode 100644
index 0000000..0e9a5d9
--- /dev/null
+++ b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef PRODUCERCRYPTOFAILUREACTION_H_
+#define PRODUCERCRYPTOFAILUREACTION_H_
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+enum class ProducerCryptoFailureAction
+{
+    FAIL,  // This is the default option to fail send if crypto operation fails
+    SEND   // Ignore crypto failure and proceed with sending unencrypted 
messages
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* PRODUCERCRYPTOFAILUREACTION_H_ */
diff --git a/pulsar-client-cpp/include/pulsar/Result.h 
b/pulsar-client-cpp/include/pulsar/Result.h
index 8085d48..68480e8 100644
--- a/pulsar-client-cpp/include/pulsar/Result.h
+++ b/pulsar-client-cpp/include/pulsar/Result.h
@@ -72,8 +72,8 @@ enum Result
     ResultSubscriptionNotFound,                   /// Subscription not found
     ResultConsumerNotFound,                       /// Consumer not found
     ResultUnsupportedVersionError,  /// Error when an older client/version 
doesn't support a required feature
-
-    ResultTopicTerminated  /// Topic was already terminated
+    ResultTopicTerminated,          /// Topic was already terminated
+    ResultCryptoError               /// Error when crypto operation fails
 };
 
 // Return string representation of result code
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc 
b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 01be503..0986887 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -93,6 +93,10 @@ void BatchMessageContainer::sendMessage() {
     
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
     compressPayLoad();
 
+    SharedBuffer encryptedPayload;
+    producer_.encryptMessage(impl_->metadata, impl_->payload, 
encryptedPayload);
+    impl_->payload = encryptedPayload;
+
     Message msg;
     msg.impl_ = impl_;
 
diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc 
b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
index 17d2c4b..1b99f58 100644
--- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc
@@ -82,4 +82,23 @@ void 
ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco
     }
     impl_->unAckedMessagesTimeoutMs = milliSeconds;
 }
+
+bool ConsumerConfiguration::isEncryptionEnabled() const { return 
(impl_->cryptoKeyReader != NULL); }
+
+const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { 
return impl_->cryptoKeyReader; }
+
+ConsumerConfiguration& 
ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ConsumerCryptoFailureAction ConsumerConfiguration::getCryptoFailureAction() 
const {
+    return impl_->cryptoFailureAction;
+}
+
+ConsumerConfiguration& 
ConsumerConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction 
action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
index 1fafc9a..91434a2 100644
--- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h
@@ -32,6 +32,8 @@ struct ConsumerConfigurationImpl {
     int maxTotalReceiverQueueSizeAcrossPartitions;
     std::string consumerName;
     long brokerConsumerStatsCacheTimeInMs;
+    CryptoKeyReaderPtr cryptoKeyReader;
+    ConsumerCryptoFailureAction cryptoFailureAction;
     ConsumerConfigurationImpl()
         : unAckedMessagesTimeoutMs(0),
           consumerType(ConsumerExclusive),
@@ -39,7 +41,9 @@ struct ConsumerConfigurationImpl {
           hasMessageListener(false),
           brokerConsumerStatsCacheTimeInMs(30 * 1000),  // 30 seconds
           receiverQueueSize(1000),
-          maxTotalReceiverQueueSizeAcrossPartitions(50000) {}
+          maxTotalReceiverQueueSizeAcrossPartitions(50000),
+          cryptoKeyReader(),
+          cryptoFailureAction(ConsumerCryptoFailureAction::FAIL) {}
 };
 }  // namespace pulsar
 #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc 
b/pulsar-client-cpp/lib/ConsumerImpl.cc
index ce20747..1c62023 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -57,7 +57,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
       messageListenerRunning_(true),
       batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_),
       brokerConsumerStats_(),
-      consumerStatsBasePtr_() {
+      consumerStatsBasePtr_(),
+      msgCrypto_() {
     std::stringstream consumerStrStream;
     consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << 
consumerId_ << "] ";
     consumerStr_ = consumerStrStream.str();
@@ -81,6 +82,11 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const 
std::string& topic,
     } else {
         consumerStatsBasePtr_ = boost::make_shared<ConsumerStatsDisabled>();
     }
+
+    // Create msgCrypto
+    if (conf.isEncryptionEnabled()) {
+        msgCrypto_ = boost::make_shared<MessageCrypto>(consumerStr_, false);
+    }
 }
 
 ConsumerImpl::~ConsumerImpl() {
@@ -249,6 +255,11 @@ void ConsumerImpl::messageReceived(const 
ClientConnectionPtr& cnx, const proto::
                                    SharedBuffer& payload) {
     LOG_DEBUG(getName() << "Received Message -- Size: " << 
payload.readableBytes());
 
+    if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) {
+        // Message was discarded or not consumed due to decryption failure
+        return;
+    }
+
     if (!uncompressMessageIfNeeded(cnx, msg, metadata, payload)) {
         // Message was discarded on decompression error
         return;
@@ -339,6 +350,48 @@ uint32_t 
ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection
     return batchSize - skippedMessages;
 }
 
+bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, 
const proto::CommandMessage& msg,
+                                          const proto::MessageMetadata& 
metadata, SharedBuffer& payload) {
+    if (!metadata.encryption_keys_size()) {
+        return true;
+    }
+
+    // If KeyReader is not configured throw exception based on config param
+    if (!config_.isEncryptionEnabled()) {
+        if (config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::CONSUME) {
+            LOG_WARN(getName() << "CryptoKeyReader is not implemented. 
Consuming encrypted message.");
+            return true;
+        } else if (config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::DISCARD) {
+            LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader 
is not implemented and config "
+                                  "is set to discard");
+            discardCorruptedMessage(cnx, msg.message_id(), 
proto::CommandAck::DecryptionError);
+        } else {
+            LOG_ERROR(getName() << "Message delivery failed since 
CryptoKeyReader is not implemented to "
+                                   "consume encrypted message");
+        }
+        return false;
+    }
+
+    SharedBuffer decryptedPayload;
+    if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), 
decryptedPayload)) {
+        payload = decryptedPayload;
+        return true;
+    }
+
+    if (config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::CONSUME) {
+        // Note, batch message will fail to consume even if config is set to 
consume
+        LOG_WARN(
+            getName() << "Decryption failed. Consuming encrypted message since 
config is set to consume.");
+        return true;
+    } else if (config_.getCryptoFailureAction() == 
ConsumerCryptoFailureAction::DISCARD) {
+        LOG_WARN(getName() << "Discarding message since decryption failed and 
config is set to discard");
+        discardCorruptedMessage(cnx, msg.message_id(), 
proto::CommandAck::DecryptionError);
+    } else {
+        LOG_ERROR(getName() << "Message delivery failed since unable to 
decrypt incoming message");
+    }
+    return false;
+}
+
 bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, 
const proto::CommandMessage& msg,
                                              const proto::MessageMetadata& 
metadata, SharedBuffer& payload) {
     if (!metadata.has_compression()) {
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h 
b/pulsar-client-cpp/lib/ConsumerImpl.h
index c68abf4..d115ed8 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -32,6 +32,7 @@
 #include "ExecutorService.h"
 #include "ConsumerImplBase.h"
 #include "lib/UnAckedMessageTrackerDisabled.h"
+#include "MessageCrypto.h"
 
 #include "CompressionCodec.h"
 #include <boost/dynamic_bitset.hpp>
@@ -51,6 +52,7 @@ class ConsumerImpl;
 class BatchAcknowledgementTracker;
 typedef boost::shared_ptr<ConsumerImpl> ConsumerImplPtr;
 typedef boost::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr;
+typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr;
 
 enum ConsumerTopicType
 {
@@ -124,6 +126,9 @@ class ConsumerImpl : public ConsumerImplBase,
     uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& 
cnx, Message& batchedMessage);
     void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, 
BrokerConsumerStatsCallback);
 
+    bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const 
proto::CommandMessage& msg,
+                                const proto::MessageMetadata& metadata, 
SharedBuffer& payload);
+
     // TODO - Convert these functions to lambda when we move to C++11
     Result receiveHelper(Message& msg);
     Result receiveHelper(Message& msg, int timeout);
@@ -157,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase,
     BatchAcknowledgementTracker batchAcknowledgementTracker_;
     BrokerConsumerStatsImpl brokerConsumerStats_;
 
+    MessageCryptoPtr msgCrypto_;
+
     friend class PulsarFriend;
 };
 
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfo.cc 
b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc
new file mode 100644
index 0000000..68c6c0a
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pulsar/EncryptionKeyInfo.h>
+
+#include "EncryptionKeyInfoImpl.h"
+
+namespace pulsar {
+
+EncryptionKeyInfo::EncryptionKeyInfo() : impl_(new EncryptionKeyInfoImpl()) {}
+
+EncryptionKeyInfo::EncryptionKeyInfo(EncryptionKeyInfoImplPtr impl) : 
impl_(impl) {}
+
+std::string& EncryptionKeyInfo::getKey() { return impl_->getKey(); }
+
+void EncryptionKeyInfo::setKey(std::string key) { impl_->setKey(key); }
+
+EncryptionKeyInfo::StringMap& EncryptionKeyInfo::getMetadata() { return 
impl_->getMetadata(); }
+
+void EncryptionKeyInfo::setMetadata(StringMap& metadata) { 
impl_->setMetadata(metadata); }
+
+}; /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc 
b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
new file mode 100644
index 0000000..1409c50
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "EncryptionKeyInfoImpl.h"
+
+namespace pulsar {
+
+EncryptionKeyInfoImpl::EncryptionKeyInfoImpl() : key_(), metadata_() {}
+
+EncryptionKeyInfoImpl::EncryptionKeyInfoImpl(std::string key, StringMap& 
metadata) {
+    key_ = key;
+    metadata_ = metadata;
+}
+
+std::string& EncryptionKeyInfoImpl::getKey() { return key_; }
+
+void EncryptionKeyInfoImpl::setKey(std::string key) { key_ = key; }
+
+EncryptionKeyInfoImpl::StringMap& EncryptionKeyInfoImpl::getMetadata() { 
return metadata_; }
+
+void EncryptionKeyInfoImpl::setMetadata(StringMap& metadata) { metadata_ = 
metadata; }
+
+}; /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h 
b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
new file mode 100644
index 0000000..80bfae1
--- /dev/null
+++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_ENCRYPTIONKEYINFOIMPL_H_
+#define LIB_ENCRYPTIONKEYINFOIMPL_H_
+
+#include <boost/shared_ptr.hpp>
+#include <iostream>
+#include <map>
+
+#pragma GCC visibility push(default)
+
+namespace pulsar {
+
+class EncryptionKeyInfoImpl {
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+
+    EncryptionKeyInfoImpl();
+
+    EncryptionKeyInfoImpl(std::string key, StringMap& metadata);
+
+    std::string& getKey();
+
+    void setKey(std::string key);
+
+    StringMap& getMetadata(void);
+
+    void setMetadata(StringMap& metadata);
+
+   private:
+    StringMap metadata_;
+    std::string key_;
+};
+
+} /* namespace pulsar */
+
+#pragma GCC visibility pop
+
+#endif /* LIB_ENCRYPTIONKEYINFOIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc 
b/pulsar-client-cpp/lib/MessageCrypto.cc
new file mode 100644
index 0000000..5e3bd3c
--- /dev/null
+++ b/pulsar-client-cpp/lib/MessageCrypto.cc
@@ -0,0 +1,458 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "LogUtils.h"
+#include "MessageCrypto.h"
+
+namespace pulsar {
+
+DECLARE_LOG_OBJECT()
+
+MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded)
+    : dataKeyLen_(32),
+      dataKey_(new unsigned char[dataKeyLen_]),
+      tagLen_(16),
+      ivLen_(12),
+      iv_(new unsigned char[ivLen_]),
+      logCtx_(logCtx) {
+    SSL_library_init();
+    SSL_load_error_strings();
+
+    if (!keyGenNeeded) {
+        mdCtx_ = EVP_MD_CTX_create();
+        EVP_MD_CTX_init(mdCtx_);
+        return;
+    }
+
+    RAND_bytes(dataKey_.get(), dataKeyLen_);
+    RAND_bytes(iv_.get(), ivLen_);
+}
+
+MessageCrypto::~MessageCrypto() {}
+
+RSA* MessageCrypto::loadPublicKey(std::string& pubKeyStr) {
+    BIO* pubBio = NULL;
+    RSA* rsaPub = NULL;
+
+    pubBio = BIO_new_mem_buf((char*)pubKeyStr.c_str(), -1);
+    if (pubBio == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to get memory for public key");
+        return rsaPub;
+    }
+
+    rsaPub = PEM_read_bio_RSA_PUBKEY(pubBio, NULL, NULL, NULL);
+    if (rsaPub == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to load public key");
+    }
+
+    BIO_free(pubBio);
+    return rsaPub;
+}
+
+RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) {
+    BIO* privBio = NULL;
+    RSA* rsaPriv = NULL;
+
+    privBio = BIO_new_mem_buf((char*)privateKeyStr.c_str(), -1);
+    if (privBio == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to get memory for private key");
+        return rsaPriv;
+    }
+
+    rsaPriv = PEM_read_bio_RSAPrivateKey(privBio, NULL, NULL, NULL);
+    if (rsaPriv == NULL) {
+        LOG_ERROR(logCtx_ << " Failed to load private key");
+    }
+
+    BIO_free(privBio);
+    return rsaPriv;
+}
+
+bool MessageCrypto::getDigest(const std::string& keyName, const void* input, 
unsigned int inputLen,
+                              unsigned char keyDigest[], unsigned int& 
digestLen) {
+    if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + 
keyName);
+        return false;
+    }
+
+    digestLen = 0;
+    if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName);
+        return false;
+    }
+
+    if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) {
+        LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + 
keyName);
+        return false;
+    }
+
+    return true;
+}
+
+void MessageCrypto::removeExpiredDataKey() {
+    boost::posix_time::ptime now = 
boost::posix_time::second_clock::universal_time();
+    boost::posix_time::time_duration expireTime = boost::posix_time::hours(4);
+
+    auto dataKeyCacheIter = dataKeyCache_.begin();
+    while (dataKeyCacheIter != dataKeyCache_.end()) {
+        auto dataKeyEntry = dataKeyCacheIter->second;
+        boost::posix_time::time_duration td = now - dataKeyEntry.second;
+
+        if ((now - dataKeyEntry.second) > expireTime) {
+            dataKeyCache_.erase(dataKeyCacheIter++);
+        } else {
+            dataKeyCacheIter++;
+        }
+    }
+}
+
+Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames,
+                                         const CryptoKeyReaderPtr keyReader) {
+    Lock lock(mutex_);
+
+    // Generate data key
+    RAND_bytes(dataKey_.get(), dataKeyLen_);
+
+    Result result = ResultOk;
+    for (auto it = keyNames.begin(); it != keyNames.end(); it++) {
+        result = addPublicKeyCipher(*it, keyReader);
+        if (result != ResultOk) {
+            return result;
+        }
+    }
+    return result;
+}
+
+Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const 
CryptoKeyReaderPtr keyReader) {
+    if (keyName.empty()) {
+        LOG_ERROR(logCtx_ + "Keyname is empty ");
+        return ResultCryptoError;
+    }
+
+    // Read the public key and its info using callback
+    StringMap keyMeta;
+    EncryptionKeyInfo keyInfo;
+    Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo);
+    if (result != ResultOk) {
+        LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " 
+ keyName);
+        return result;
+    }
+
+    RSA* pubKey = loadPublicKey(keyInfo.getKey());
+    if (pubKey == NULL) {
+        LOG_ERROR(logCtx_ + "Failed to load public key " + keyName);
+        return ResultCryptoError;
+    }
+
+    int inSize = RSA_size(pubKey);
+    boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]);
+
+    int outSize =
+        RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), 
pubKey, RSA_PKCS1_OAEP_PADDING);
+
+    if (inSize != outSize) {
+        LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key 
length for key " + keyName);
+        return ResultCryptoError;
+    }
+    std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), 
inSize);
+    std::shared_ptr<EncryptionKeyInfo> eki(new EncryptionKeyInfo());
+    eki->setKey(encryptedKeyStr);
+    eki->setMetadata(keyInfo.getMetadata());
+
+    encryptedDataKeyMap_.insert(std::make_pair(keyName, eki));
+    return ResultOk;
+}
+
+bool MessageCrypto::removeKeyCipher(std::string& keyName) {
+    if (!keyName.size()) {
+        return false;
+    }
+    encryptedDataKeyMap_.erase(keyName);
+    return true;
+}
+
+bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const 
CryptoKeyReaderPtr keyReader,
+                            proto::MessageMetadata& msgMetadata, SharedBuffer& 
payload,
+                            SharedBuffer& encryptedPayload) {
+    if (!encKeys.size()) {
+        return false;
+    }
+    SharedBuffer emptyBuffer;
+
+    Lock lock(mutex_);
+
+    // Update message metadata with encrypted data key
+    for (auto it = encKeys.begin(); it != encKeys.end(); it++) {
+        const std::string& keyName = *it;
+        auto keyInfoIter = encryptedDataKeyMap_.find(keyName);
+
+        if (keyInfoIter == encryptedDataKeyMap_.end()) {
+            // Attempt to load the key. This will allow us to load keys as 
soon as
+            // a new key is added to producer config
+            Result result = addPublicKeyCipher(keyName, keyReader);
+            if (result != ResultOk) {
+                return false;
+            }
+
+            keyInfoIter = encryptedDataKeyMap_.find(keyName);
+
+            if (keyInfoIter == encryptedDataKeyMap_.end()) {
+                LOG_ERROR(logCtx_ + "Unable to find encrypted data key for " + 
keyName);
+                return false;
+            }
+        }
+        EncryptionKeyInfo* keyInfo = keyInfoIter->second.get();
+
+        proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New();
+        encKeys->set_key(keyName);
+        encKeys->set_value(keyInfo->getKey());
+
+        if (keyInfo->getMetadata().size()) {
+            for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != 
keyInfo->getMetadata().end();
+                 metaIter++) {
+                proto::KeyValue* keyValue = proto::KeyValue().New();
+                keyValue->set_key(metaIter->first);
+                keyValue->set_value(metaIter->second);
+                encKeys->mutable_metadata()->AddAllocated(keyValue);
+            }
+        }
+
+        msgMetadata.mutable_encryption_keys()->AddAllocated(encKeys);
+    }
+
+    // TODO: Replace random with counter and periodic refreshing based on 
timer/counter value
+    RAND_bytes(iv_.get(), ivLen_);
+    msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), 
ivLen_);
+
+    EVP_CIPHER_CTX* cipherCtx = NULL;
+    encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + 
EVP_MAX_BLOCK_LENGTH + tagLen_);
+    int encLen = 0;
+
+    if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
+        LOG_ERROR(logCtx_ + " Failed to cipher ctx.");
+        return false;
+    }
+
+    if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), 
iv_.get()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to init cipher ctx.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to set cipher padding.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned 
char*>(encryptedPayload.mutableData()),
+                          &encLen, reinterpret_cast<unsigned const 
char*>(payload.data()),
+                          payload.readableBytes()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to encrypt payload.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(encLen);
+    encLen = 0;
+
+    if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned 
char*>(encryptedPayload.mutableData()),
+                            &encLen) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to finalize encryption.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(encLen);
+
+    if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, 
encryptedPayload.mutableData()) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to get cipher tag info.");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    encryptedPayload.bytesWritten(tagLen_);
+
+    EVP_CIPHER_CTX_free(cipherCtx);
+
+    return true;
+}
+
+bool MessageCrypto::decryptDataKey(const std::string& keyName, const 
std::string& encryptedDataKey,
+                                   const 
google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
+                                   const CryptoKeyReaderPtr keyReader) {
+    StringMap keyMeta;
+    for (auto iter = encKeyMeta.begin(); iter != encKeyMeta.end(); iter++) {
+        keyMeta[iter->key()] = iter->value();
+    }
+
+    // Read the private key info using callback
+    EncryptionKeyInfo keyInfo;
+    keyReader->getPrivateKey(keyName, keyMeta, keyInfo);
+
+    // Convert key from string to RSA key
+    RSA* privKey = loadPrivateKey(keyInfo.getKey());
+    if (privKey == NULL) {
+        LOG_ERROR(logCtx_ + " Failed to load private key " + keyName);
+        return false;
+    }
+
+    // Decrypt data key
+    int outSize = RSA_private_decrypt(encryptedDataKey.size(),
+                                      reinterpret_cast<unsigned const 
char*>(encryptedDataKey.c_str()),
+                                      dataKey_.get(), privKey, 
RSA_PKCS1_OAEP_PADDING);
+
+    if (outSize == -1) {
+        LOG_ERROR(logCtx_ + "Failed to decrypt AES key for " + keyName);
+        return false;
+    }
+
+    unsigned char keyDigest[EVP_MAX_MD_SIZE];
+    unsigned int digestLen = 0;
+    if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), 
keyDigest, digestLen)) {
+        LOG_ERROR(logCtx_ + "Failed to get digest for data key " + keyName);
+        return false;
+    }
+
+    std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen);
+    std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), 
dataKeyLen_);
+    dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, 
boost::posix_time::second_clock::universal_time());
+
+    // Remove expired entries from the cache
+    removeExpiredDataKey();
+    return true;
+}
+
+bool MessageCrypto::decryptData(const std::string& dataKeySecret, const 
proto::MessageMetadata& msgMetadata,
+                                SharedBuffer& payload, SharedBuffer& 
decryptedPayload) {
+    // unpack iv and encrypted data
+    msgMetadata.encryption_param().copy(reinterpret_cast<char*>(iv_.get()),
+                                        msgMetadata.encryption_param().size());
+
+    EVP_CIPHER_CTX* cipherCtx = NULL;
+    decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + 
EVP_MAX_BLOCK_LENGTH + tagLen_);
+
+    if (!(cipherCtx = EVP_CIPHER_CTX_new())) {
+        LOG_ERROR(logCtx_ + " Failed to get cipher ctx");
+        return false;
+    }
+
+    if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL,
+                            reinterpret_cast<unsigned const 
char*>(dataKeySecret.c_str()),
+                            reinterpret_cast<unsigned const 
char*>(iv_.get()))) {
+        LOG_ERROR(logCtx_ + " Failed to init decrypt cipher ctx");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) {
+        LOG_ERROR(logCtx_ + " Failed to set cipher padding");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    int cipherLen = payload.readableBytes() - tagLen_;
+    int decLen = 0;
+    if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned 
char*>(decryptedPayload.mutableData()),
+                           &decLen, reinterpret_cast<unsigned const 
char*>(payload.data()), cipherLen)) {
+        LOG_ERROR(logCtx_ + " Failed to decrypt update");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    };
+    decryptedPayload.bytesWritten(decLen);
+
+    if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, 
(void*)(payload.data() + cipherLen))) {
+        LOG_ERROR(logCtx_ + " Failed to set gcm tag");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+
+    if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned 
char*>(decryptedPayload.mutableData()),
+                             &decLen)) {
+        LOG_ERROR(logCtx_ + " Failed to finalize encrypted message");
+        EVP_CIPHER_CTX_free(cipherCtx);
+        return false;
+    }
+    decryptedPayload.bytesWritten(decLen);
+
+    EVP_CIPHER_CTX_free(cipherCtx);
+
+    return true;
+}
+
+bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& 
msgMetadata, SharedBuffer& payload,
+                                         SharedBuffer& decryptedPayload) {
+    SharedBuffer decryptedData;
+    bool dataDecrypted = false;
+
+    for (auto iter = msgMetadata.encryption_keys().begin(); iter != 
msgMetadata.encryption_keys().end();
+         iter++) {
+        const std::string& keyName = iter->key();
+        const std::string& encDataKey = iter->value();
+        unsigned char keyDigest[EVP_MAX_MD_SIZE];
+        unsigned int digestLen = 0;
+        getDigest(keyName, encDataKey.c_str(), encDataKey.size(), keyDigest, 
digestLen);
+
+        std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), 
digestLen);
+
+        auto dataKeyCacheIter = dataKeyCache_.find(keyDigestStr);
+        if (dataKeyCacheIter != dataKeyCache_.end()) {
+            // Taking a small performance hit here if the hash collides. When 
it
+            // retruns a different key, decryption fails. At this point, we 
would
+            // call decryptDataKey to refresh the cache and come here again to 
decrypt.
+            auto dataKeyEntry = dataKeyCacheIter->second;
+            if (decryptData(dataKeyEntry.first, msgMetadata, payload, 
decryptedPayload)) {
+                dataDecrypted = true;
+                break;
+            }
+        } else {
+            // First time, entry won't be present in cache
+            LOG_DEBUG(logCtx_ + " Failed to decrypt data or data key is not in 
cache for " + keyName +
+                      ". Will attempt to refresh.");
+        }
+    }
+    return dataDecrypted;
+}
+
+bool MessageCrypto::decrypt(const proto::MessageMetadata& msgMetadata, 
SharedBuffer& payload,
+                            const CryptoKeyReaderPtr keyReader, SharedBuffer& 
decryptedPayload) {
+    // Attempt to decrypt using the existing key
+    if (getKeyAndDecryptData(msgMetadata, payload, decryptedPayload)) {
+        return true;
+    }
+
+    // Either first time, or decryption failed. Attempt to regenerate data key
+    bool isDataKeyDecrypted = false;
+    for (int index = 0; index < msgMetadata.encryption_keys_size(); index++) {
+        const proto::EncryptionKeys& encKeys = 
msgMetadata.encryption_keys(index);
+
+        const std::string& encDataKey = encKeys.value();
+        const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta 
= encKeys.metadata();
+        if (decryptDataKey(encKeys.key(), encDataKey, encKeyMeta, keyReader)) {
+            isDataKeyDecrypted = true;
+            break;
+        }
+    }
+
+    if (!isDataKeyDecrypted) {
+        // Unable to decrypt data key
+        return false;
+    }
+
+    return getKeyAndDecryptData(msgMetadata, payload, decryptedPayload);
+}
+
+} /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/MessageCrypto.h 
b/pulsar-client-cpp/lib/MessageCrypto.h
new file mode 100644
index 0000000..016ce54
--- /dev/null
+++ b/pulsar-client-cpp/lib/MessageCrypto.h
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef LIB_MESSAGECRYPTO_H_
+#define LIB_MESSAGECRYPTO_H_
+
+#include <iostream>
+#include <map>
+#include <set>
+#include <boost/thread/mutex.hpp>
+#include <boost/scoped_array.hpp>
+
+#include <openssl/ssl.h>
+#include <openssl/rand.h>
+#include <openssl/bio.h>
+#include <openssl/evp.h>
+#include <openssl/rsa.h>
+#include <openssl/engine.h>
+
+#include "SharedBuffer.h"
+#include "ExecutorService.h"
+#include "pulsar/CryptoKeyReader.h"
+#include "PulsarApi.pb.h"
+
+namespace pulsar {
+
+class MessageCrypto {
+   public:
+    typedef std::map<std::string, std::string> StringMap;
+    typedef std::map<std::string, std::pair<std::string, 
boost::posix_time::ptime>> DataKeyCacheMap;
+
+    MessageCrypto(std::string& logCtx, bool keyGenNeeded);
+    ~MessageCrypto();
+
+    /*
+     * Encrypt data key using the public key(s) in the argument. <p> If more 
than one key name is specified,
+     * data key is encrypted using each of those keys. If the public key is 
expired or changed, application is
+     * responsible to remove the old key and add the new key <p>
+     *
+     * @param keyNames List of public keys to encrypt data key
+     * @param keyReader Implementation to read the key values
+     * @return ResultOk if succeeded
+     *
+     */
+    Result addPublicKeyCipher(std::set<std::string>& keyNames, const 
CryptoKeyReaderPtr keyReader);
+
+    /*
+     * Remove a key <p> Remove the key identified by the keyName from the list 
of keys.<p>
+     *
+     * @param keyName Unique name to identify the key
+     * @return true if succeeded, false otherwise
+     */
+    bool removeKeyCipher(std::string& keyName);
+
+    /*
+     * Encrypt the payload using the data key and update message metadata with 
the keyname & encrypted data
+     * key
+     *
+     * @param encKeys One or more public keys to encrypt data key
+     * @param keyReader Implementation to read the key values
+     * @param msgMetadata Message Metadata
+     * @param payload Message which needs to be encrypted
+     * @param encryptedPayload Contains encrypted payload if success
+     *
+     * @return true if success
+     */
+    bool encrypt(std::set<std::string>& encKeys, const CryptoKeyReaderPtr 
keyReader,
+                 proto::MessageMetadata& msgMetadata, SharedBuffer& payload, 
SharedBuffer& encryptedPayload);
+
+    /*
+     * Decrypt the payload using the data key. Keys used to encrypt data key 
can be retrieved from msgMetadata
+     *
+     * @param msgMetadata Message Metadata
+     * @param payload Message which needs to be decrypted
+     * @param keyReader KeyReader implementation to retrieve key value
+     * @param decryptedPayload Contains decrypted payload if success
+     *
+     * @return true if success
+     */
+    bool decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& 
payload,
+                 const CryptoKeyReaderPtr keyReader, SharedBuffer& 
decryptedPayload);
+
+   private:
+    typedef boost::unique_lock<boost::mutex> Lock;
+    boost::mutex mutex_;
+
+    int dataKeyLen_;
+    boost::scoped_array<unsigned char> dataKey_;
+
+    int tagLen_;
+    int ivLen_;
+    boost::scoped_array<unsigned char> iv_;
+
+    std::string logCtx_;
+
+    /* This cache uses the digest of encrypted data key as it's key. It's 
possible
+     * for consumers to receive messages with data key encrypted using older or
+     * newer version of public key. If we use the key name as the key for 
dataKeyCache,
+     * we will end up decrypting data key way too often which is costly.
+     */
+    DataKeyCacheMap dataKeyCache_;
+
+    // Map of key name and encrypted gcm key, metadata pair which is sent with 
encrypted message
+    std::map<std::string, std::shared_ptr<EncryptionKeyInfo>> 
encryptedDataKeyMap_;
+
+    EVP_MD_CTX* mdCtx_;
+
+    RSA* loadPublicKey(std::string& pubKeyStr);
+    RSA* loadPrivateKey(std::string& privateKeyStr);
+    bool getDigest(const std::string& keyName, const void* input, unsigned int 
inputLen,
+                   unsigned char keyDigest[], unsigned int& digestLen);
+    void removeExpiredDataKey();
+
+    Result addPublicKeyCipher(const std::string& keyName, const 
CryptoKeyReaderPtr keyReader);
+
+    bool decryptDataKey(const std::string& keyName, const std::string& 
encryptedDataKey,
+                        const 
google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta,
+                        const CryptoKeyReaderPtr keyReader);
+    bool decryptData(const std::string& dataKeySecret, const 
proto::MessageMetadata& msgMetadata,
+                     SharedBuffer& payload, SharedBuffer& decPayload);
+    bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, 
SharedBuffer& payload,
+                              SharedBuffer& decryptedPayload);
+};
+
+} /* namespace pulsar */
+
+#endif /* LIB_MESSAGECRYPTO_H_ */
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc 
b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index c010a9f..ab70db2 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -157,4 +157,32 @@ ProducerConfiguration& 
ProducerConfiguration::setBatchingMaxPublishDelayMs(
 const unsigned long& ProducerConfiguration::getBatchingMaxPublishDelayMs() 
const {
     return impl_->batchingMaxPublishDelayMs;
 }
+
+const CryptoKeyReaderPtr ProducerConfiguration::getCryptoKeyReader() const { 
return impl_->cryptoKeyReader; }
+
+ProducerConfiguration& 
ProducerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) {
+    impl_->cryptoKeyReader = cryptoKeyReader;
+    return *this;
+}
+
+ProducerCryptoFailureAction ProducerConfiguration::getCryptoFailureAction() 
const {
+    return impl_->cryptoFailureAction;
+}
+
+ProducerConfiguration& 
ProducerConfiguration::setCryptoFailureAction(ProducerCryptoFailureAction 
action) {
+    impl_->cryptoFailureAction = action;
+    return *this;
+}
+
+std::set<std::string>& ProducerConfiguration::getEncryptionKeys() { return 
impl_->encryptionKeys; }
+
+bool ProducerConfiguration::isEncryptionEnabled() const {
+    return (!impl_->encryptionKeys.empty() && (impl_->cryptoKeyReader != 
NULL));
+}
+
+ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string 
key) {
+    impl_->encryptionKeys.insert(key);
+    return *this;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h 
b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index 11fe7cc..3f788a9 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -41,6 +41,9 @@ struct ProducerConfigurationImpl {
     unsigned int batchingMaxMessages;
     unsigned long batchingMaxAllowedSizeInBytes;
     unsigned long batchingMaxPublishDelayMs;
+    CryptoKeyReaderPtr cryptoKeyReader;
+    std::set<std::string> encryptionKeys;
+    ProducerCryptoFailureAction cryptoFailureAction;
     ProducerConfigurationImpl()
         : sendTimeoutMs(30000),
           compressionType(CompressionNone),
@@ -52,8 +55,10 @@ struct ProducerConfigurationImpl {
           batchingEnabled(false),
           batchingMaxMessages(1000),
           batchingMaxAllowedSizeInBytes(128 * 1024),  // 128 KB
-          batchingMaxPublishDelayMs(10) {             // 10 milli seconds
-    }
+          batchingMaxPublishDelayMs(10),              // 10 milli seconds
+          cryptoKeyReader(),
+          encryptionKeys(),
+          cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {}
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index 2d36c7f..b2ed33f 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -50,7 +50,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const 
std::string& topic, const
       producerStr_("[" + topic_ + ", " + producerName_ + "] "),
       producerId_(client->newProducerId()),
       msgSequenceGenerator_(0),
-      sendTimer_() {
+      sendTimer_(),
+      msgCrypto_(),
+      dataKeyGenIntervalSec_(4 * 60 * 60) {
     LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on 
topic " << topic_
                                 << " id: " << producerId_);
 
@@ -70,10 +72,26 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const 
std::string& topic, const
     } else {
         producerStatsBasePtr_ = boost::make_shared<ProducerStatsDisabled>();
     }
+
+    if (conf_.isEncryptionEnabled()) {
+        std::ostringstream logCtxStream;
+        logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << 
producerId_ << "]";
+        std::string logCtx = logCtxStream.str();
+        msgCrypto_ = boost::make_shared<MessageCrypto>(logCtx, true);
+        msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), 
conf_.getCryptoKeyReader());
+
+        dataKeyGenTImer_ = executor_->createDeadlineTimer();
+        
dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
+        dataKeyGenTImer_->async_wait(
+            boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, 
boost::asio::placeholders::error));
+    }
 }
 
 ProducerImpl::~ProducerImpl() {
     LOG_DEBUG(getName() << "~ProducerImpl");
+    if (dataKeyGenTImer_) {
+        dataKeyGenTImer_->cancel();
+    }
     closeAsync(ResultCallback());
     printStats();
 }
@@ -84,6 +102,19 @@ const std::string& ProducerImpl::getProducerName() const { 
return producerName_;
 
 int64_t ProducerImpl::getLastSequenceId() const { return 
lastSequenceIdPublished_; }
 
+void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) {
+    if (ec) {
+        LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]");
+        return;
+    }
+
+    msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), 
conf_.getCryptoKeyReader());
+
+    
dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_));
+    dataKeyGenTImer_->async_wait(
+        boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, 
boost::asio::placeholders::error));
+}
+
 void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) {
     Lock lock(mutex_);
     if (state_ == Closed) {
@@ -262,14 +293,23 @@ void ProducerImpl::sendAsync(const Message& msg, 
SendCallback callback) {
     SharedBuffer& payload = msg.impl_->payload;
 
     uint32_t uncompressedSize = payload.readableBytes();
+    uint32_t payloadSize = uncompressedSize;
 
     if (!batchMessageContainer) {
         // If batching is enabled we compress all the payloads together before 
sending the batch
         payload = 
CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload);
+        payloadSize = payload.readableBytes();
+
+        // Encrypt the payload if enabled
+        SharedBuffer encryptedPayload;
+        if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) {
+            cb(ResultCryptoError, msg);
+            return;
+        }
+        payload = encryptedPayload;
     }
-    uint32_t compressedSize = payload.readableBytes();
-    if (compressedSize > Commands::MaxMessageSize) {
-        LOG_DEBUG(getName() << " - compressed Message payload size" << 
compressedSize << "cannot exceed "
+    if (payloadSize > Commands::MaxMessageSize) {
+        LOG_DEBUG(getName() << " - compressed Message payload size" << 
payloadSize << "cannot exceed "
                             << Commands::MaxMessageSize << " bytes");
         cb(ResultMessageTooBig, msg);
         return;
@@ -559,6 +599,17 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) {
     }
 }
 
+bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, 
SharedBuffer& payload,
+                                  SharedBuffer& encryptedPayload) {
+    if (!conf_.isEncryptionEnabled() || msgCrypto_ == NULL) {
+        encryptedPayload = payload;
+        return true;
+    }
+
+    return msgCrypto_->encrypt(conf_.getEncryptionKeys(), 
conf_.getCryptoKeyReader(), metadata, payload,
+                               encryptedPayload);
+}
+
 void ProducerImpl::disconnectProducer() {
     LOG_DEBUG("Broker notification of Closed producer: " << producerId_);
     Lock lock(mutex_);
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index f89823f..2907d40 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -27,6 +27,7 @@
 #include "HandlerBase.h"
 #include "SharedBuffer.h"
 #include "CompressionCodec.h"
+#include "MessageCrypto.h"
 #include "stats/ProducerStatsDisabled.h"
 #include "stats/ProducerStatsImpl.h"
 
@@ -37,6 +38,7 @@ namespace pulsar {
 class BatchMessageContainer;
 
 typedef boost::shared_ptr<BatchMessageContainer> BatchMessageContainerPtr;
+typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr;
 
 class PulsarFriend;
 
@@ -120,6 +122,10 @@ class ProducerImpl : public HandlerBase,
 
     void resendMessages(ClientConnectionPtr cnx);
 
+    void refreshEncryptionKey(const boost::system::error_code& ec);
+    bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& 
payload,
+                        SharedBuffer& encryptedPayload);
+
     typedef boost::unique_lock<boost::mutex> Lock;
 
     ProducerConfiguration conf_;
@@ -144,6 +150,10 @@ class ProducerImpl : public HandlerBase,
     Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_;
 
     void failPendingMessages(Result result);
+
+    MessageCryptoPtr msgCrypto_;
+    DeadlineTimerPtr dataKeyGenTImer_;
+    uint32_t dataKeyGenIntervalSec_;
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc
index 1deb5f2..64e23e1 100644
--- a/pulsar-client-cpp/lib/Result.cc
+++ b/pulsar-client-cpp/lib/Result.cc
@@ -122,6 +122,9 @@ const char* pulsar::strResult(Result result) {
 
         case ResultTopicTerminated:
             return "TopicTerminated";
+
+        case ResultCryptoError:
+            return "CryptoError";
     };
     // NOTE : Do not add default case in the switch above. In future if we get 
new cases for
     // ServerError and miss them in the switch above we would like to get 
notified. Adding
diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc 
b/pulsar-client-cpp/perf/PerfConsumer.cc
index 0ddef60..97aa8db 100644
--- a/pulsar-client-cpp/perf/PerfConsumer.cc
+++ b/pulsar-client-cpp/perf/PerfConsumer.cc
@@ -68,6 +68,8 @@ struct Arguments {
     int receiverQueueSize;
     int ioThreads;
     int listenerThreads;
+    std::string encKeyName;
+    std::string encKeyValueFile;
 };
 
 namespace pulsar {
@@ -87,6 +89,37 @@ class PulsarFriend {
 #include <atomic>
 #endif
 
+class EncKeyReader: public CryptoKeyReader {
+
+  private:
+    std::string privKeyContents;
+
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+        fileContents = fileStream.str();
+    }
+
+  public:
+
+    EncKeyReader(std::string keyFile) {
+        if (keyFile.empty()) {
+            return;
+        }
+        readFile(keyFile, privKeyContents);
+    }
+
+    Result getPublicKey(const std::string &keyName, std::map<std::string, 
std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        return ResultInvalidConfiguration;
+    }
+
+    Result getPrivateKey(const std::string &keyName, std::map<std::string, 
std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        encKeyInfo.setKey(privKeyContents);
+        return ResultOk;
+    }
+};
+
 // Counters
 std::atomic<uint32_t> messagesReceived;
 std::atomic<uint32_t> bytesReceived;
@@ -149,6 +182,10 @@ void startPerfConsumer(const Arguments& args) {
     ConsumerConfiguration consumerConf;
     consumerConf.setMessageListener(messageListener);
     consumerConf.setReceiverQueueSize(args.receiverQueueSize);
+    boost::shared_ptr<EncKeyReader> keyReader = 
boost::make_shared<EncKeyReader>(args.encKeyValueFile);
+    if (!args.encKeyName.empty()) {
+        consumerConf.setCryptoKeyReader(keyReader);
+    }
 
     Latch latch(args.numTopics * args.numConsumers);
 
@@ -261,7 +298,12 @@ int main(int argc, char** argv) {
      "Number of IO threads to use")  //
 
     ("listener-threads,l", 
po::value<int>(&args.listenerThreads)->default_value(1),
-     "Number of listener threads");
+     "Number of listener threads") //
+
+    ("encryption-key-name,k", 
po::value<std::string>(&args.encKeyName)->default_value(""), "The private key 
name to decrypt payload") //
+
+    ("encryption-key-value-file,f", 
po::value<std::string>(&args.encKeyValueFile)->default_value(""),
+            "The file which contains the private key to decrypt payload"); //
 
     po::options_description hidden;
     hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic 
name");
diff --git a/pulsar-client-cpp/perf/PerfProducer.cc 
b/pulsar-client-cpp/perf/PerfProducer.cc
index d6cea4c..d591d34 100644
--- a/pulsar-client-cpp/perf/PerfProducer.cc
+++ b/pulsar-client-cpp/perf/PerfProducer.cc
@@ -62,6 +62,8 @@ struct Arguments {
     unsigned int batchingMaxMessages;
     long batchingMaxAllowedSizeInBytes;
     long batchingMaxPublishDelayMs;
+    std::string encKeyName;
+    std::string encKeyValueFile;
 };
 
 namespace pulsar {
@@ -74,11 +76,43 @@ class PulsarFriend {
 };
 }
 
-// Stats
 unsigned long messagesProduced;
 unsigned long bytesProduced;
 using namespace boost::accumulators;
+using namespace pulsar;
+
+class EncKeyReader: public CryptoKeyReader {
+
+  private:
+    std::string pubKeyContents;
+
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+        fileContents = fileStream.str();
+    }
 
+  public:
+
+    EncKeyReader(std::string keyFile) {
+        if (keyFile.empty()) {
+            return;
+        }
+        readFile(keyFile, pubKeyContents);
+    }
+
+    Result getPublicKey(const std::string &keyName, std::map<std::string, 
std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        encKeyInfo.setKey(pubKeyContents);
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string &keyName, std::map<std::string, 
std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const {
+        return ResultInvalidConfiguration;
+    }
+};
+
+// Stats
 typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > 
LatencyAccumulator;
 LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99);
 std::vector<pulsar::Producer> producerList;
@@ -230,7 +264,12 @@ int main(int argc, char** argv) {
             "Use only is batch-size > 1, Default is 128 KB") //
 
     ("max-batch-publish-delay-in-ms", 
po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000),
-            "Use only is batch-size > 1, Default is 3 seconds");
+            "Use only is batch-size > 1, Default is 3 seconds") //
+
+    ("encryption-key-name,k", 
po::value<std::string>(&args.encKeyName)->default_value(""), "The public key 
name to encrypt payload") //
+
+    ("encryption-key-value-file,f", 
po::value<std::string>(&args.encKeyValueFile)->default_value(""),
+            "The file which contains the public key to encrypt payload"); //
 
     po::options_description hidden;
     hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic 
name");
@@ -291,6 +330,12 @@ int main(int argc, char** argv) {
 
     // Block if queue is full else we will start seeing errors in sendAsync
     producerConf.setBlockIfQueueFull(true);
+    boost::shared_ptr<EncKeyReader> keyReader = 
boost::make_shared<EncKeyReader>(args.encKeyValueFile);
+    if (!args.encKeyName.empty()) {
+        producerConf.addEncryptionKey(args.encKeyName);
+        producerConf.setCryptoKeyReader(keyReader);
+    }
+
     pulsar::ClientConfiguration conf;
     conf.setUseTls(args.isUseTls);
     conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection);
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8191d99..0c3f2b4 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -65,6 +65,42 @@ static void sendCallBack(Result r, const Message& msg, 
std::string prefix, doubl
     sendCallBack(r, msg, prefix);
 }
 
+class EncKeyReader : public CryptoKeyReader {
+   private:
+    void readFile(std::string fileName, std::string& fileContents) const {
+        std::ifstream ifs(fileName);
+        std::stringstream fileStream;
+        fileStream << ifs.rdbuf();
+
+        fileContents = fileStream.str();
+    }
+
+   public:
+    EncKeyReader() {}
+
+    Result getPublicKey(const std::string& keyName, std::map<std::string, 
std::string>& metadata,
+                        EncryptionKeyInfo& encKeyInfo) const {
+        std::string CERT_FILE_PATH =
+            "../../pulsar-broker/src/test/resources/certificate/public-key." + 
keyName;
+        std::string keyContents;
+        readFile(CERT_FILE_PATH, keyContents);
+
+        encKeyInfo.setKey(keyContents);
+        return ResultOk;
+    }
+
+    Result getPrivateKey(const std::string& keyName, std::map<std::string, 
std::string>& metadata,
+                         EncryptionKeyInfo& encKeyInfo) const {
+        std::string CERT_FILE_PATH =
+            "../../pulsar-broker/src/test/resources/certificate/private-key." 
+ keyName;
+        std::string keyContents;
+        readFile(CERT_FILE_PATH, keyContents);
+
+        encKeyInfo.setKey(keyContents);
+        return ResultOk;
+    }
+};
+
 TEST(BasicEndToEndTest, testBatchMessages) {
     ClientConfiguration config;
     Client client(lookupUrl);
@@ -1103,3 +1139,173 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
         ASSERT_TRUE(receivedMsgIndex.find(boost::lexical_cast<std::string>(i)) 
!= receivedMsgIndex.end());
     }
 }
+
+TEST(BasicEndToEndTest, testRSAEncryption) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://prop/unit/ns1/my-rsaenctopic";
+    std::string subName = "my-sub-name";
+    Producer producer;
+
+    boost::shared_ptr<EncKeyReader> keyReader = 
boost::make_shared<EncKeyReader>();
+    ProducerConfiguration conf;
+    conf.setCompressionType(CompressionLZ4);
+    conf.addEncryptionKey("client-rsa.pem");
+    conf.setCryptoKeyReader(keyReader);
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    ConsumerConfiguration consConfig;
+    consConfig.setCryptoKeyReader(keyReader);
+    // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send 1000 messages synchronously
+    std::string msgContent = "msg-content";
+    LOG_INFO("Publishing 1000 messages synchronously");
+    int msgNum = 0;
+    for (; msgNum < 1000; msgNum++) {
+        std::stringstream stream;
+        stream << msgContent << msgNum;
+        Message msg = MessageBuilder().setContent(stream.str()).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    LOG_INFO("Trying to receive 1000 messages");
+    Message msgReceived;
+    for (msgNum = 0; msgNum < 1000; msgNum++) {
+        consumer.receive(msgReceived, 1000);
+        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+        std::stringstream expected;
+        expected << msgContent << msgNum;
+        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    }
+
+    ASSERT_EQ(ResultOk, consumer.unsubscribe());
+    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
+    ASSERT_EQ(ResultOk, producer.close());
+    ASSERT_EQ(ResultOk, client.close());
+}
+
+TEST(BasicEndToEndTest, testEncryptionFailure) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = "persistent://prop/unit/ns1/my-rsaencfailtopic";
+    std::string subName = "my-sub-name";
+    Producer producer;
+
+    boost::shared_ptr<EncKeyReader> keyReader = 
boost::make_shared<EncKeyReader>();
+
+    ConsumerConfiguration consConfig;
+
+    Consumer consumer;
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    Result result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    std::string msgContent = "msg-content";
+    int msgNum = 0;
+    int totalMsgs = 10;
+    std::stringstream stream;
+    stream << msgContent << msgNum;
+    Message msg = MessageBuilder().setContent(msgContent).build();
+
+    // 1. Non existing key
+
+    {
+        ProducerConfiguration prodConf;
+        prodConf.setCryptoKeyReader(keyReader);
+        prodConf.addEncryptionKey("client-non-existing-rsa.pem");
+
+        Promise<Result, Producer> producerPromise;
+        client.createProducerAsync(topicName, prodConf, 
WaitForCallbackValue<Producer>(producerPromise));
+        Future<Result, Producer> producerFuture = producerPromise.getFuture();
+        result = producerFuture.get(producer);
+        ASSERT_EQ(ResultOk, result);
+
+        ASSERT_EQ(ResultCryptoError, producer.send(msg));
+    }
+
+    // 2. Add valid key
+    {
+        ProducerConfiguration prodConf;
+        prodConf.setCryptoKeyReader(keyReader);
+        prodConf.addEncryptionKey("client-rsa.pem");
+
+        Promise<Result, Producer> producerPromise;
+        client.createProducerAsync(topicName, prodConf, 
WaitForCallbackValue<Producer>(producerPromise));
+        Future<Result, Producer> producerFuture = producerPromise.getFuture();
+        result = producerFuture.get(producer);
+        ASSERT_EQ(ResultOk, result);
+
+        msgNum++;
+        for (; msgNum < totalMsgs; msgNum++) {
+            std::stringstream stream;
+            stream << msgContent << msgNum;
+            Message msg = MessageBuilder().setContent(stream.str()).build();
+            ASSERT_EQ(ResultOk, producer.send(msg));
+        }
+    }
+
+    // 3. Key reader is not set by consumer
+    Message msgReceived;
+    ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 4. Set consumer config to consume even if decryption fails
+    consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME);
+
+    Promise<Result, Consumer> consumerPromise2;
+    client.subscribeAsync(topicName, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise2));
+    consumerFuture = consumerPromise2.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000));
+
+    // Received message 0. Skip message comparision since its encrypted
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 5. Set valid keyreader and consume messages
+    msgNum = 1;
+    consConfig.setCryptoKeyReader(keyReader);
+    consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL);
+    Promise<Result, Consumer> consumerPromise3;
+    client.subscribeAsync(topicName, subName, consConfig, 
WaitForCallbackValue<Consumer>(consumerPromise3));
+    consumerFuture = consumerPromise3.getFuture();
+    result = consumerFuture.get(consumer);
+
+    for (; msgNum < totalMsgs - 1; msgNum++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000));
+        LOG_DEBUG("Received message :" << msgReceived.getMessageId());
+        std::stringstream expected;
+        expected << msgContent << msgNum;
+        ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
+        ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
+    }
+    ASSERT_EQ(ResultOk, consumer.close());
+
+    // 6. Discard message if decryption fails
+    ConsumerConfiguration consConfig2;
+    consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD);
+
+    Promise<Result, Consumer> consumerPromise4;
+    client.subscribeAsync(topicName, subName, consConfig2, 
WaitForCallbackValue<Consumer>(consumerPromise4));
+    consumerFuture = consumerPromise4.getFuture();
+    result = consumerFuture.get(consumer);
+
+    // Since messag is discarded, no message will be received.
+    ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000));
+}

-- 
To stop receiving notification emails like this one, please contact
mme...@apache.org.

Reply via email to