This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 9a081ad Added end to end encryption in C++ client (#1129) 9a081ad is described below commit 9a081ad9d69aa3f250d5d821ab1d8169b728eeda Author: Andrews <sahaya.andr...@gmail.com> AuthorDate: Thu Feb 1 17:17:10 2018 -0800 Added end to end encryption in C++ client (#1129) --- .../include/pulsar/ConsumerConfiguration.h | 10 + .../include/pulsar/ConsumerCryptoFailureAction.h | 40 ++ pulsar-client-cpp/include/pulsar/CryptoKeyReader.h | 70 ++++ .../include/pulsar/EncryptionKeyInfo.h | 68 +++ .../include/pulsar/ProducerConfiguration.h | 15 + .../include/pulsar/ProducerCryptoFailureAction.h | 36 ++ pulsar-client-cpp/include/pulsar/Result.h | 4 +- pulsar-client-cpp/lib/BatchMessageContainer.cc | 4 + pulsar-client-cpp/lib/ConsumerConfiguration.cc | 19 + pulsar-client-cpp/lib/ConsumerConfigurationImpl.h | 6 +- pulsar-client-cpp/lib/ConsumerImpl.cc | 55 ++- pulsar-client-cpp/lib/ConsumerImpl.h | 7 + pulsar-client-cpp/lib/EncryptionKeyInfo.cc | 38 ++ pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc | 39 ++ pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h | 55 +++ pulsar-client-cpp/lib/MessageCrypto.cc | 458 +++++++++++++++++++++ pulsar-client-cpp/lib/MessageCrypto.h | 142 +++++++ pulsar-client-cpp/lib/ProducerConfiguration.cc | 28 ++ pulsar-client-cpp/lib/ProducerConfigurationImpl.h | 9 +- pulsar-client-cpp/lib/ProducerImpl.cc | 59 ++- pulsar-client-cpp/lib/ProducerImpl.h | 10 + pulsar-client-cpp/lib/Result.cc | 3 + pulsar-client-cpp/perf/PerfConsumer.cc | 44 +- pulsar-client-cpp/perf/PerfProducer.cc | 49 ++- pulsar-client-cpp/tests/BasicEndToEndTest.cc | 206 +++++++++ 25 files changed, 1461 insertions(+), 13 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h index c73f942..274abd5 100644 --- a/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ConsumerConfiguration.h @@ -24,6 +24,8 @@ #include <pulsar/Result.h> #include <pulsar/ConsumerType.h> #include <pulsar/Message.h> +#include <pulsar/ConsumerCryptoFailureAction.h> +#include <pulsar/CryptoKeyReader.h> #pragma GCC visibility push(default) namespace pulsar { @@ -139,6 +141,14 @@ class ConsumerConfiguration { * @return the configured timeout in milliseconds caching BrokerConsumerStats. */ long getBrokerConsumerStatsCacheTimeInMs() const; + + bool isEncryptionEnabled() const; + const CryptoKeyReaderPtr getCryptoKeyReader() const; + ConsumerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader); + + ConsumerCryptoFailureAction getCryptoFailureAction() const; + ConsumerConfiguration& setCryptoFailureAction(ConsumerCryptoFailureAction action); + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h new file mode 100644 index 0000000..d9d845c --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ConsumerCryptoFailureAction.h @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CONSUMERCRYPTOFAILUREACTION_H_ +#define CONSUMERCRYPTOFAILUREACTION_H_ + +#pragma GCC visibility push(default) + +namespace pulsar { + +enum class ConsumerCryptoFailureAction +{ + FAIL, // This is the default option to fail consume until crypto succeeds + DISCARD, // Message is silently acknowledged and not delivered to the application + CONSUME // Deliver the encrypted message to the application. It's the application's + // responsibility to decrypt the message. If message is also compressed, + // decompression will fail. If message contain batch messages, client will + // not be able to retrieve individual messages in the batch +}; + +} /* namespace pulsar */ + +#pragma GCC visibility pop + +#endif /* CONSUMERCRYPTOFAILUREACTION_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h new file mode 100644 index 0000000..cb4cf15 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/CryptoKeyReader.h @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef CRYPTOKEYREADER_H_ +#define CRYPTOKEYREADER_H_ + +#include <pulsar/Result.h> +#include <pulsar/EncryptionKeyInfo.h> + +#pragma GCC visibility push(default) + +namespace pulsar { + +class CryptoKeyReader { + public: + CryptoKeyReader() {} + + /* + * Return the encryption key corresponding to the key name in the argument + * <p> + * This method should be implemented to return the EncryptionKeyInfo. This method will be + * called at the time of producer creation as well as consumer receiving messages. + * Hence, application should not make any blocking calls within the implementation. + * <p> + * + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @param encKeyInfo updated with details about the public key + * @return Result ResultOk is returned for success + * + */ + virtual Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata, + EncryptionKeyInfo& encKeyInfo) const = 0; + + /* + * @param keyName + * Unique name to identify the key + * @param metadata + * Additional information needed to identify the key + * @param encKeyInfo updated with details about the private key + * @return Result ResultOk is returned for success + */ + virtual Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata, + EncryptionKeyInfo& encKeyInfo) const = 0; + +}; /* namespace pulsar */ + +typedef boost::shared_ptr<CryptoKeyReader> CryptoKeyReaderPtr; +} // namespace pulsar + +#pragma GCC visibility pop + +#endif /* CRYPTOKEYREADER_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h new file mode 100644 index 0000000..9461b14 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/EncryptionKeyInfo.h @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef ENCRYPTIONKEYINFO_H_ +#define ENCRYPTIONKEYINFO_H_ + +#include <boost/shared_ptr.hpp> +#include <iostream> +#include <map> + +#pragma GCC visibility push(default) + +namespace pulsar { + +class EncryptionKeyInfoImpl; +class PulsarWrapper; + +typedef boost::shared_ptr<EncryptionKeyInfoImpl> EncryptionKeyInfoImplPtr; + +class EncryptionKeyInfo { + /* + * This object contains the encryption key and corresponding metadata which contains + * additional information about the key such as version, timestammp + */ + + public: + typedef std::map<std::string, std::string> StringMap; + + EncryptionKeyInfo(); + + EncryptionKeyInfo(std::string key, StringMap& metadata); + + std::string& getKey(); + + void setKey(std::string key); + + StringMap& getMetadata(void); + + void setMetadata(StringMap& metadata); + + private: + explicit EncryptionKeyInfo(EncryptionKeyInfoImplPtr); + + EncryptionKeyInfoImplPtr impl_; + + friend class PulsarWrapper; +}; + +} /* namespace pulsar */ + +#pragma GCC visibility pop + +#endif /* ENCRYPTIONKEYINFO_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h index d12b65d..9f7bf1f 100644 --- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h +++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h @@ -23,6 +23,11 @@ #include <pulsar/Result.h> #include <pulsar/Message.h> #include <boost/function.hpp> +#include <pulsar/ProducerCryptoFailureAction.h> +#include <pulsar/CryptoKeyReader.h> + +#include <set> + #pragma GCC visibility push(default) namespace pulsar { @@ -112,6 +117,16 @@ class ProducerConfiguration { ProducerConfiguration& setBatchingMaxPublishDelayMs(const unsigned long& batchingMaxPublishDelayMs); const unsigned long& getBatchingMaxPublishDelayMs() const; + const CryptoKeyReaderPtr getCryptoKeyReader() const; + ProducerConfiguration& setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader); + + ProducerCryptoFailureAction getCryptoFailureAction() const; + ProducerConfiguration& setCryptoFailureAction(ProducerCryptoFailureAction action); + + std::set<std::string>& getEncryptionKeys(); + bool isEncryptionEnabled() const; + ProducerConfiguration& addEncryptionKey(std::string key); + friend class PulsarWrapper; private: diff --git a/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h new file mode 100644 index 0000000..0e9a5d9 --- /dev/null +++ b/pulsar-client-cpp/include/pulsar/ProducerCryptoFailureAction.h @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef PRODUCERCRYPTOFAILUREACTION_H_ +#define PRODUCERCRYPTOFAILUREACTION_H_ + +#pragma GCC visibility push(default) + +namespace pulsar { + +enum class ProducerCryptoFailureAction +{ + FAIL, // This is the default option to fail send if crypto operation fails + SEND // Ignore crypto failure and proceed with sending unencrypted messages +}; + +} /* namespace pulsar */ + +#pragma GCC visibility pop + +#endif /* PRODUCERCRYPTOFAILUREACTION_H_ */ diff --git a/pulsar-client-cpp/include/pulsar/Result.h b/pulsar-client-cpp/include/pulsar/Result.h index 8085d48..68480e8 100644 --- a/pulsar-client-cpp/include/pulsar/Result.h +++ b/pulsar-client-cpp/include/pulsar/Result.h @@ -72,8 +72,8 @@ enum Result ResultSubscriptionNotFound, /// Subscription not found ResultConsumerNotFound, /// Consumer not found ResultUnsupportedVersionError, /// Error when an older client/version doesn't support a required feature - - ResultTopicTerminated /// Topic was already terminated + ResultTopicTerminated, /// Topic was already terminated + ResultCryptoError /// Error when crypto operation fails }; // Return string representation of result code diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc index 01be503..0986887 100644 --- a/pulsar-client-cpp/lib/BatchMessageContainer.cc +++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc @@ -93,6 +93,10 @@ void BatchMessageContainer::sendMessage() { impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size()); compressPayLoad(); + SharedBuffer encryptedPayload; + producer_.encryptMessage(impl_->metadata, impl_->payload, encryptedPayload); + impl_->payload = encryptedPayload; + Message msg; msg.impl_ = impl_; diff --git a/pulsar-client-cpp/lib/ConsumerConfiguration.cc b/pulsar-client-cpp/lib/ConsumerConfiguration.cc index 17d2c4b..1b99f58 100644 --- a/pulsar-client-cpp/lib/ConsumerConfiguration.cc +++ b/pulsar-client-cpp/lib/ConsumerConfiguration.cc @@ -82,4 +82,23 @@ void ConsumerConfiguration::setUnAckedMessagesTimeoutMs(const uint64_t milliSeco } impl_->unAckedMessagesTimeoutMs = milliSeconds; } + +bool ConsumerConfiguration::isEncryptionEnabled() const { return (impl_->cryptoKeyReader != NULL); } + +const CryptoKeyReaderPtr ConsumerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; } + +ConsumerConfiguration& ConsumerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) { + impl_->cryptoKeyReader = cryptoKeyReader; + return *this; +} + +ConsumerCryptoFailureAction ConsumerConfiguration::getCryptoFailureAction() const { + return impl_->cryptoFailureAction; +} + +ConsumerConfiguration& ConsumerConfiguration::setCryptoFailureAction(ConsumerCryptoFailureAction action) { + impl_->cryptoFailureAction = action; + return *this; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h index 1fafc9a..91434a2 100644 --- a/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ConsumerConfigurationImpl.h @@ -32,6 +32,8 @@ struct ConsumerConfigurationImpl { int maxTotalReceiverQueueSizeAcrossPartitions; std::string consumerName; long brokerConsumerStatsCacheTimeInMs; + CryptoKeyReaderPtr cryptoKeyReader; + ConsumerCryptoFailureAction cryptoFailureAction; ConsumerConfigurationImpl() : unAckedMessagesTimeoutMs(0), consumerType(ConsumerExclusive), @@ -39,7 +41,9 @@ struct ConsumerConfigurationImpl { hasMessageListener(false), brokerConsumerStatsCacheTimeInMs(30 * 1000), // 30 seconds receiverQueueSize(1000), - maxTotalReceiverQueueSizeAcrossPartitions(50000) {} + maxTotalReceiverQueueSizeAcrossPartitions(50000), + cryptoKeyReader(), + cryptoFailureAction(ConsumerCryptoFailureAction::FAIL) {} }; } // namespace pulsar #endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc index ce20747..1c62023 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.cc +++ b/pulsar-client-cpp/lib/ConsumerImpl.cc @@ -57,7 +57,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, messageListenerRunning_(true), batchAcknowledgementTracker_(topic_, subscription, (long)consumerId_), brokerConsumerStats_(), - consumerStatsBasePtr_() { + consumerStatsBasePtr_(), + msgCrypto_() { std::stringstream consumerStrStream; consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " << consumerId_ << "] "; consumerStr_ = consumerStrStream.str(); @@ -81,6 +82,11 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const std::string& topic, } else { consumerStatsBasePtr_ = boost::make_shared<ConsumerStatsDisabled>(); } + + // Create msgCrypto + if (conf.isEncryptionEnabled()) { + msgCrypto_ = boost::make_shared<MessageCrypto>(consumerStr_, false); + } } ConsumerImpl::~ConsumerImpl() { @@ -249,6 +255,11 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto:: SharedBuffer& payload) { LOG_DEBUG(getName() << "Received Message -- Size: " << payload.readableBytes()); + if (!decryptMessageIfNeeded(cnx, msg, metadata, payload)) { + // Message was discarded or not consumed due to decryption failure + return; + } + if (!uncompressMessageIfNeeded(cnx, msg, metadata, payload)) { // Message was discarded on decompression error return; @@ -339,6 +350,48 @@ uint32_t ConsumerImpl::receiveIndividualMessagesFromBatch(const ClientConnection return batchSize - skippedMessages; } +bool ConsumerImpl::decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, + const proto::MessageMetadata& metadata, SharedBuffer& payload) { + if (!metadata.encryption_keys_size()) { + return true; + } + + // If KeyReader is not configured throw exception based on config param + if (!config_.isEncryptionEnabled()) { + if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { + LOG_WARN(getName() << "CryptoKeyReader is not implemented. Consuming encrypted message."); + return true; + } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { + LOG_WARN(getName() << "Skipping decryption since CryptoKeyReader is not implemented and config " + "is set to discard"); + discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError); + } else { + LOG_ERROR(getName() << "Message delivery failed since CryptoKeyReader is not implemented to " + "consume encrypted message"); + } + return false; + } + + SharedBuffer decryptedPayload; + if (msgCrypto_->decrypt(metadata, payload, config_.getCryptoKeyReader(), decryptedPayload)) { + payload = decryptedPayload; + return true; + } + + if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::CONSUME) { + // Note, batch message will fail to consume even if config is set to consume + LOG_WARN( + getName() << "Decryption failed. Consuming encrypted message since config is set to consume."); + return true; + } else if (config_.getCryptoFailureAction() == ConsumerCryptoFailureAction::DISCARD) { + LOG_WARN(getName() << "Discarding message since decryption failed and config is set to discard"); + discardCorruptedMessage(cnx, msg.message_id(), proto::CommandAck::DecryptionError); + } else { + LOG_ERROR(getName() << "Message delivery failed since unable to decrypt incoming message"); + } + return false; +} + bool ConsumerImpl::uncompressMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, const proto::MessageMetadata& metadata, SharedBuffer& payload) { if (!metadata.has_compression()) { diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h index c68abf4..d115ed8 100644 --- a/pulsar-client-cpp/lib/ConsumerImpl.h +++ b/pulsar-client-cpp/lib/ConsumerImpl.h @@ -32,6 +32,7 @@ #include "ExecutorService.h" #include "ConsumerImplBase.h" #include "lib/UnAckedMessageTrackerDisabled.h" +#include "MessageCrypto.h" #include "CompressionCodec.h" #include <boost/dynamic_bitset.hpp> @@ -51,6 +52,7 @@ class ConsumerImpl; class BatchAcknowledgementTracker; typedef boost::shared_ptr<ConsumerImpl> ConsumerImplPtr; typedef boost::weak_ptr<ConsumerImpl> ConsumerImplWeakPtr; +typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr; enum ConsumerTopicType { @@ -124,6 +126,9 @@ class ConsumerImpl : public ConsumerImplBase, uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr& cnx, Message& batchedMessage); void brokerConsumerStatsListener(Result, BrokerConsumerStatsImpl, BrokerConsumerStatsCallback); + bool decryptMessageIfNeeded(const ClientConnectionPtr& cnx, const proto::CommandMessage& msg, + const proto::MessageMetadata& metadata, SharedBuffer& payload); + // TODO - Convert these functions to lambda when we move to C++11 Result receiveHelper(Message& msg); Result receiveHelper(Message& msg, int timeout); @@ -157,6 +162,8 @@ class ConsumerImpl : public ConsumerImplBase, BatchAcknowledgementTracker batchAcknowledgementTracker_; BrokerConsumerStatsImpl brokerConsumerStats_; + MessageCryptoPtr msgCrypto_; + friend class PulsarFriend; }; diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfo.cc b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc new file mode 100644 index 0000000..68c6c0a --- /dev/null +++ b/pulsar-client-cpp/lib/EncryptionKeyInfo.cc @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <pulsar/EncryptionKeyInfo.h> + +#include "EncryptionKeyInfoImpl.h" + +namespace pulsar { + +EncryptionKeyInfo::EncryptionKeyInfo() : impl_(new EncryptionKeyInfoImpl()) {} + +EncryptionKeyInfo::EncryptionKeyInfo(EncryptionKeyInfoImplPtr impl) : impl_(impl) {} + +std::string& EncryptionKeyInfo::getKey() { return impl_->getKey(); } + +void EncryptionKeyInfo::setKey(std::string key) { impl_->setKey(key); } + +EncryptionKeyInfo::StringMap& EncryptionKeyInfo::getMetadata() { return impl_->getMetadata(); } + +void EncryptionKeyInfo::setMetadata(StringMap& metadata) { impl_->setMetadata(metadata); } + +}; /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc new file mode 100644 index 0000000..1409c50 --- /dev/null +++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.cc @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "EncryptionKeyInfoImpl.h" + +namespace pulsar { + +EncryptionKeyInfoImpl::EncryptionKeyInfoImpl() : key_(), metadata_() {} + +EncryptionKeyInfoImpl::EncryptionKeyInfoImpl(std::string key, StringMap& metadata) { + key_ = key; + metadata_ = metadata; +} + +std::string& EncryptionKeyInfoImpl::getKey() { return key_; } + +void EncryptionKeyInfoImpl::setKey(std::string key) { key_ = key; } + +EncryptionKeyInfoImpl::StringMap& EncryptionKeyInfoImpl::getMetadata() { return metadata_; } + +void EncryptionKeyInfoImpl::setMetadata(StringMap& metadata) { metadata_ = metadata; } + +}; /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h new file mode 100644 index 0000000..80bfae1 --- /dev/null +++ b/pulsar-client-cpp/lib/EncryptionKeyInfoImpl.h @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef LIB_ENCRYPTIONKEYINFOIMPL_H_ +#define LIB_ENCRYPTIONKEYINFOIMPL_H_ + +#include <boost/shared_ptr.hpp> +#include <iostream> +#include <map> + +#pragma GCC visibility push(default) + +namespace pulsar { + +class EncryptionKeyInfoImpl { + public: + typedef std::map<std::string, std::string> StringMap; + + EncryptionKeyInfoImpl(); + + EncryptionKeyInfoImpl(std::string key, StringMap& metadata); + + std::string& getKey(); + + void setKey(std::string key); + + StringMap& getMetadata(void); + + void setMetadata(StringMap& metadata); + + private: + StringMap metadata_; + std::string key_; +}; + +} /* namespace pulsar */ + +#pragma GCC visibility pop + +#endif /* LIB_ENCRYPTIONKEYINFOIMPL_H_ */ diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc new file mode 100644 index 0000000..5e3bd3c --- /dev/null +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -0,0 +1,458 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "LogUtils.h" +#include "MessageCrypto.h" + +namespace pulsar { + +DECLARE_LOG_OBJECT() + +MessageCrypto::MessageCrypto(std::string& logCtx, bool keyGenNeeded) + : dataKeyLen_(32), + dataKey_(new unsigned char[dataKeyLen_]), + tagLen_(16), + ivLen_(12), + iv_(new unsigned char[ivLen_]), + logCtx_(logCtx) { + SSL_library_init(); + SSL_load_error_strings(); + + if (!keyGenNeeded) { + mdCtx_ = EVP_MD_CTX_create(); + EVP_MD_CTX_init(mdCtx_); + return; + } + + RAND_bytes(dataKey_.get(), dataKeyLen_); + RAND_bytes(iv_.get(), ivLen_); +} + +MessageCrypto::~MessageCrypto() {} + +RSA* MessageCrypto::loadPublicKey(std::string& pubKeyStr) { + BIO* pubBio = NULL; + RSA* rsaPub = NULL; + + pubBio = BIO_new_mem_buf((char*)pubKeyStr.c_str(), -1); + if (pubBio == NULL) { + LOG_ERROR(logCtx_ << " Failed to get memory for public key"); + return rsaPub; + } + + rsaPub = PEM_read_bio_RSA_PUBKEY(pubBio, NULL, NULL, NULL); + if (rsaPub == NULL) { + LOG_ERROR(logCtx_ << " Failed to load public key"); + } + + BIO_free(pubBio); + return rsaPub; +} + +RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { + BIO* privBio = NULL; + RSA* rsaPriv = NULL; + + privBio = BIO_new_mem_buf((char*)privateKeyStr.c_str(), -1); + if (privBio == NULL) { + LOG_ERROR(logCtx_ << " Failed to get memory for private key"); + return rsaPriv; + } + + rsaPriv = PEM_read_bio_RSAPrivateKey(privBio, NULL, NULL, NULL); + if (rsaPriv == NULL) { + LOG_ERROR(logCtx_ << " Failed to load private key"); + } + + BIO_free(privBio); + return rsaPriv; +} + +bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, + unsigned char keyDigest[], unsigned int& digestLen) { + if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { + LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); + return false; + } + + digestLen = 0; + if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { + LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); + return false; + } + + if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) { + LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); + return false; + } + + return true; +} + +void MessageCrypto::removeExpiredDataKey() { + boost::posix_time::ptime now = boost::posix_time::second_clock::universal_time(); + boost::posix_time::time_duration expireTime = boost::posix_time::hours(4); + + auto dataKeyCacheIter = dataKeyCache_.begin(); + while (dataKeyCacheIter != dataKeyCache_.end()) { + auto dataKeyEntry = dataKeyCacheIter->second; + boost::posix_time::time_duration td = now - dataKeyEntry.second; + + if ((now - dataKeyEntry.second) > expireTime) { + dataKeyCache_.erase(dataKeyCacheIter++); + } else { + dataKeyCacheIter++; + } + } +} + +Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames, + const CryptoKeyReaderPtr keyReader) { + Lock lock(mutex_); + + // Generate data key + RAND_bytes(dataKey_.get(), dataKeyLen_); + + Result result = ResultOk; + for (auto it = keyNames.begin(); it != keyNames.end(); it++) { + result = addPublicKeyCipher(*it, keyReader); + if (result != ResultOk) { + return result; + } + } + return result; +} + +Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { + if (keyName.empty()) { + LOG_ERROR(logCtx_ + "Keyname is empty "); + return ResultCryptoError; + } + + // Read the public key and its info using callback + StringMap keyMeta; + EncryptionKeyInfo keyInfo; + Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); + if (result != ResultOk) { + LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); + return result; + } + + RSA* pubKey = loadPublicKey(keyInfo.getKey()); + if (pubKey == NULL) { + LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); + return ResultCryptoError; + } + + int inSize = RSA_size(pubKey); + boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]); + + int outSize = + RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); + + if (inSize != outSize) { + LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); + return ResultCryptoError; + } + std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize); + std::shared_ptr<EncryptionKeyInfo> eki(new EncryptionKeyInfo()); + eki->setKey(encryptedKeyStr); + eki->setMetadata(keyInfo.getMetadata()); + + encryptedDataKeyMap_.insert(std::make_pair(keyName, eki)); + return ResultOk; +} + +bool MessageCrypto::removeKeyCipher(std::string& keyName) { + if (!keyName.size()) { + return false; + } + encryptedDataKeyMap_.erase(keyName); + return true; +} + +bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader, + proto::MessageMetadata& msgMetadata, SharedBuffer& payload, + SharedBuffer& encryptedPayload) { + if (!encKeys.size()) { + return false; + } + SharedBuffer emptyBuffer; + + Lock lock(mutex_); + + // Update message metadata with encrypted data key + for (auto it = encKeys.begin(); it != encKeys.end(); it++) { + const std::string& keyName = *it; + auto keyInfoIter = encryptedDataKeyMap_.find(keyName); + + if (keyInfoIter == encryptedDataKeyMap_.end()) { + // Attempt to load the key. This will allow us to load keys as soon as + // a new key is added to producer config + Result result = addPublicKeyCipher(keyName, keyReader); + if (result != ResultOk) { + return false; + } + + keyInfoIter = encryptedDataKeyMap_.find(keyName); + + if (keyInfoIter == encryptedDataKeyMap_.end()) { + LOG_ERROR(logCtx_ + "Unable to find encrypted data key for " + keyName); + return false; + } + } + EncryptionKeyInfo* keyInfo = keyInfoIter->second.get(); + + proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New(); + encKeys->set_key(keyName); + encKeys->set_value(keyInfo->getKey()); + + if (keyInfo->getMetadata().size()) { + for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end(); + metaIter++) { + proto::KeyValue* keyValue = proto::KeyValue().New(); + keyValue->set_key(metaIter->first); + keyValue->set_value(metaIter->second); + encKeys->mutable_metadata()->AddAllocated(keyValue); + } + } + + msgMetadata.mutable_encryption_keys()->AddAllocated(encKeys); + } + + // TODO: Replace random with counter and periodic refreshing based on timer/counter value + RAND_bytes(iv_.get(), ivLen_); + msgMetadata.set_encryption_param(reinterpret_cast<char*>(iv_.get()), ivLen_); + + EVP_CIPHER_CTX* cipherCtx = NULL; + encryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_); + int encLen = 0; + + if (!(cipherCtx = EVP_CIPHER_CTX_new())) { + LOG_ERROR(logCtx_ + " Failed to cipher ctx."); + return false; + } + + if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) { + LOG_ERROR(logCtx_ + " Failed to init cipher ctx."); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + + if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { + LOG_ERROR(logCtx_ + " Failed to set cipher padding."); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + + if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), + &encLen, reinterpret_cast<unsigned const char*>(payload.data()), + payload.readableBytes()) != 1) { + LOG_ERROR(logCtx_ + " Failed to encrypt payload."); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + encryptedPayload.bytesWritten(encLen); + encLen = 0; + + if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), + &encLen) != 1) { + LOG_ERROR(logCtx_ + " Failed to finalize encryption."); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + encryptedPayload.bytesWritten(encLen); + + if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) { + LOG_ERROR(logCtx_ + " Failed to get cipher tag info."); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + encryptedPayload.bytesWritten(tagLen_); + + EVP_CIPHER_CTX_free(cipherCtx); + + return true; +} + +bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey, + const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta, + const CryptoKeyReaderPtr keyReader) { + StringMap keyMeta; + for (auto iter = encKeyMeta.begin(); iter != encKeyMeta.end(); iter++) { + keyMeta[iter->key()] = iter->value(); + } + + // Read the private key info using callback + EncryptionKeyInfo keyInfo; + keyReader->getPrivateKey(keyName, keyMeta, keyInfo); + + // Convert key from string to RSA key + RSA* privKey = loadPrivateKey(keyInfo.getKey()); + if (privKey == NULL) { + LOG_ERROR(logCtx_ + " Failed to load private key " + keyName); + return false; + } + + // Decrypt data key + int outSize = RSA_private_decrypt(encryptedDataKey.size(), + reinterpret_cast<unsigned const char*>(encryptedDataKey.c_str()), + dataKey_.get(), privKey, RSA_PKCS1_OAEP_PADDING); + + if (outSize == -1) { + LOG_ERROR(logCtx_ + "Failed to decrypt AES key for " + keyName); + return false; + } + + unsigned char keyDigest[EVP_MAX_MD_SIZE]; + unsigned int digestLen = 0; + if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), keyDigest, digestLen)) { + LOG_ERROR(logCtx_ + "Failed to get digest for data key " + keyName); + return false; + } + + std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen); + std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_); + dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, boost::posix_time::second_clock::universal_time()); + + // Remove expired entries from the cache + removeExpiredDataKey(); + return true; +} + +bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata, + SharedBuffer& payload, SharedBuffer& decryptedPayload) { + // unpack iv and encrypted data + msgMetadata.encryption_param().copy(reinterpret_cast<char*>(iv_.get()), + msgMetadata.encryption_param().size()); + + EVP_CIPHER_CTX* cipherCtx = NULL; + decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_); + + if (!(cipherCtx = EVP_CIPHER_CTX_new())) { + LOG_ERROR(logCtx_ + " Failed to get cipher ctx"); + return false; + } + + if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, + reinterpret_cast<unsigned const char*>(dataKeySecret.c_str()), + reinterpret_cast<unsigned const char*>(iv_.get()))) { + LOG_ERROR(logCtx_ + " Failed to init decrypt cipher ctx"); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + + if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { + LOG_ERROR(logCtx_ + " Failed to set cipher padding"); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + + int cipherLen = payload.readableBytes() - tagLen_; + int decLen = 0; + if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), + &decLen, reinterpret_cast<unsigned const char*>(payload.data()), cipherLen)) { + LOG_ERROR(logCtx_ + " Failed to decrypt update"); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + }; + decryptedPayload.bytesWritten(decLen); + + if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, (void*)(payload.data() + cipherLen))) { + LOG_ERROR(logCtx_ + " Failed to set gcm tag"); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + + if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), + &decLen)) { + LOG_ERROR(logCtx_ + " Failed to finalize encrypted message"); + EVP_CIPHER_CTX_free(cipherCtx); + return false; + } + decryptedPayload.bytesWritten(decLen); + + EVP_CIPHER_CTX_free(cipherCtx); + + return true; +} + +bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, + SharedBuffer& decryptedPayload) { + SharedBuffer decryptedData; + bool dataDecrypted = false; + + for (auto iter = msgMetadata.encryption_keys().begin(); iter != msgMetadata.encryption_keys().end(); + iter++) { + const std::string& keyName = iter->key(); + const std::string& encDataKey = iter->value(); + unsigned char keyDigest[EVP_MAX_MD_SIZE]; + unsigned int digestLen = 0; + getDigest(keyName, encDataKey.c_str(), encDataKey.size(), keyDigest, digestLen); + + std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen); + + auto dataKeyCacheIter = dataKeyCache_.find(keyDigestStr); + if (dataKeyCacheIter != dataKeyCache_.end()) { + // Taking a small performance hit here if the hash collides. When it + // retruns a different key, decryption fails. At this point, we would + // call decryptDataKey to refresh the cache and come here again to decrypt. + auto dataKeyEntry = dataKeyCacheIter->second; + if (decryptData(dataKeyEntry.first, msgMetadata, payload, decryptedPayload)) { + dataDecrypted = true; + break; + } + } else { + // First time, entry won't be present in cache + LOG_DEBUG(logCtx_ + " Failed to decrypt data or data key is not in cache for " + keyName + + ". Will attempt to refresh."); + } + } + return dataDecrypted; +} + +bool MessageCrypto::decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, + const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload) { + // Attempt to decrypt using the existing key + if (getKeyAndDecryptData(msgMetadata, payload, decryptedPayload)) { + return true; + } + + // Either first time, or decryption failed. Attempt to regenerate data key + bool isDataKeyDecrypted = false; + for (int index = 0; index < msgMetadata.encryption_keys_size(); index++) { + const proto::EncryptionKeys& encKeys = msgMetadata.encryption_keys(index); + + const std::string& encDataKey = encKeys.value(); + const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta = encKeys.metadata(); + if (decryptDataKey(encKeys.key(), encDataKey, encKeyMeta, keyReader)) { + isDataKeyDecrypted = true; + break; + } + } + + if (!isDataKeyDecrypted) { + // Unable to decrypt data key + return false; + } + + return getKeyAndDecryptData(msgMetadata, payload, decryptedPayload); +} + +} /* namespace pulsar */ diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h new file mode 100644 index 0000000..016ce54 --- /dev/null +++ b/pulsar-client-cpp/lib/MessageCrypto.h @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef LIB_MESSAGECRYPTO_H_ +#define LIB_MESSAGECRYPTO_H_ + +#include <iostream> +#include <map> +#include <set> +#include <boost/thread/mutex.hpp> +#include <boost/scoped_array.hpp> + +#include <openssl/ssl.h> +#include <openssl/rand.h> +#include <openssl/bio.h> +#include <openssl/evp.h> +#include <openssl/rsa.h> +#include <openssl/engine.h> + +#include "SharedBuffer.h" +#include "ExecutorService.h" +#include "pulsar/CryptoKeyReader.h" +#include "PulsarApi.pb.h" + +namespace pulsar { + +class MessageCrypto { + public: + typedef std::map<std::string, std::string> StringMap; + typedef std::map<std::string, std::pair<std::string, boost::posix_time::ptime>> DataKeyCacheMap; + + MessageCrypto(std::string& logCtx, bool keyGenNeeded); + ~MessageCrypto(); + + /* + * Encrypt data key using the public key(s) in the argument. <p> If more than one key name is specified, + * data key is encrypted using each of those keys. If the public key is expired or changed, application is + * responsible to remove the old key and add the new key <p> + * + * @param keyNames List of public keys to encrypt data key + * @param keyReader Implementation to read the key values + * @return ResultOk if succeeded + * + */ + Result addPublicKeyCipher(std::set<std::string>& keyNames, const CryptoKeyReaderPtr keyReader); + + /* + * Remove a key <p> Remove the key identified by the keyName from the list of keys.<p> + * + * @param keyName Unique name to identify the key + * @return true if succeeded, false otherwise + */ + bool removeKeyCipher(std::string& keyName); + + /* + * Encrypt the payload using the data key and update message metadata with the keyname & encrypted data + * key + * + * @param encKeys One or more public keys to encrypt data key + * @param keyReader Implementation to read the key values + * @param msgMetadata Message Metadata + * @param payload Message which needs to be encrypted + * @param encryptedPayload Contains encrypted payload if success + * + * @return true if success + */ + bool encrypt(std::set<std::string>& encKeys, const CryptoKeyReaderPtr keyReader, + proto::MessageMetadata& msgMetadata, SharedBuffer& payload, SharedBuffer& encryptedPayload); + + /* + * Decrypt the payload using the data key. Keys used to encrypt data key can be retrieved from msgMetadata + * + * @param msgMetadata Message Metadata + * @param payload Message which needs to be decrypted + * @param keyReader KeyReader implementation to retrieve key value + * @param decryptedPayload Contains decrypted payload if success + * + * @return true if success + */ + bool decrypt(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, + const CryptoKeyReaderPtr keyReader, SharedBuffer& decryptedPayload); + + private: + typedef boost::unique_lock<boost::mutex> Lock; + boost::mutex mutex_; + + int dataKeyLen_; + boost::scoped_array<unsigned char> dataKey_; + + int tagLen_; + int ivLen_; + boost::scoped_array<unsigned char> iv_; + + std::string logCtx_; + + /* This cache uses the digest of encrypted data key as it's key. It's possible + * for consumers to receive messages with data key encrypted using older or + * newer version of public key. If we use the key name as the key for dataKeyCache, + * we will end up decrypting data key way too often which is costly. + */ + DataKeyCacheMap dataKeyCache_; + + // Map of key name and encrypted gcm key, metadata pair which is sent with encrypted message + std::map<std::string, std::shared_ptr<EncryptionKeyInfo>> encryptedDataKeyMap_; + + EVP_MD_CTX* mdCtx_; + + RSA* loadPublicKey(std::string& pubKeyStr); + RSA* loadPrivateKey(std::string& privateKeyStr); + bool getDigest(const std::string& keyName, const void* input, unsigned int inputLen, + unsigned char keyDigest[], unsigned int& digestLen); + void removeExpiredDataKey(); + + Result addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader); + + bool decryptDataKey(const std::string& keyName, const std::string& encryptedDataKey, + const google::protobuf::RepeatedPtrField<proto::KeyValue>& encKeyMeta, + const CryptoKeyReaderPtr keyReader); + bool decryptData(const std::string& dataKeySecret, const proto::MessageMetadata& msgMetadata, + SharedBuffer& payload, SharedBuffer& decPayload); + bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, + SharedBuffer& decryptedPayload); +}; + +} /* namespace pulsar */ + +#endif /* LIB_MESSAGECRYPTO_H_ */ diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc index c010a9f..ab70db2 100644 --- a/pulsar-client-cpp/lib/ProducerConfiguration.cc +++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc @@ -157,4 +157,32 @@ ProducerConfiguration& ProducerConfiguration::setBatchingMaxPublishDelayMs( const unsigned long& ProducerConfiguration::getBatchingMaxPublishDelayMs() const { return impl_->batchingMaxPublishDelayMs; } + +const CryptoKeyReaderPtr ProducerConfiguration::getCryptoKeyReader() const { return impl_->cryptoKeyReader; } + +ProducerConfiguration& ProducerConfiguration::setCryptoKeyReader(CryptoKeyReaderPtr cryptoKeyReader) { + impl_->cryptoKeyReader = cryptoKeyReader; + return *this; +} + +ProducerCryptoFailureAction ProducerConfiguration::getCryptoFailureAction() const { + return impl_->cryptoFailureAction; +} + +ProducerConfiguration& ProducerConfiguration::setCryptoFailureAction(ProducerCryptoFailureAction action) { + impl_->cryptoFailureAction = action; + return *this; +} + +std::set<std::string>& ProducerConfiguration::getEncryptionKeys() { return impl_->encryptionKeys; } + +bool ProducerConfiguration::isEncryptionEnabled() const { + return (!impl_->encryptionKeys.empty() && (impl_->cryptoKeyReader != NULL)); +} + +ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key) { + impl_->encryptionKeys.insert(key); + return *this; +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h index 11fe7cc..3f788a9 100644 --- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h +++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h @@ -41,6 +41,9 @@ struct ProducerConfigurationImpl { unsigned int batchingMaxMessages; unsigned long batchingMaxAllowedSizeInBytes; unsigned long batchingMaxPublishDelayMs; + CryptoKeyReaderPtr cryptoKeyReader; + std::set<std::string> encryptionKeys; + ProducerCryptoFailureAction cryptoFailureAction; ProducerConfigurationImpl() : sendTimeoutMs(30000), compressionType(CompressionNone), @@ -52,8 +55,10 @@ struct ProducerConfigurationImpl { batchingEnabled(false), batchingMaxMessages(1000), batchingMaxAllowedSizeInBytes(128 * 1024), // 128 KB - batchingMaxPublishDelayMs(10) { // 10 milli seconds - } + batchingMaxPublishDelayMs(10), // 10 milli seconds + cryptoKeyReader(), + encryptionKeys(), + cryptoFailureAction(ProducerCryptoFailureAction::FAIL) {} }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index 2d36c7f..b2ed33f 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -50,7 +50,9 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const producerStr_("[" + topic_ + ", " + producerName_ + "] "), producerId_(client->newProducerId()), msgSequenceGenerator_(0), - sendTimer_() { + sendTimer_(), + msgCrypto_(), + dataKeyGenIntervalSec_(4 * 60 * 60) { LOG_DEBUG("ProducerName - " << producerName_ << " Created producer on topic " << topic_ << " id: " << producerId_); @@ -70,10 +72,26 @@ ProducerImpl::ProducerImpl(ClientImplPtr client, const std::string& topic, const } else { producerStatsBasePtr_ = boost::make_shared<ProducerStatsDisabled>(); } + + if (conf_.isEncryptionEnabled()) { + std::ostringstream logCtxStream; + logCtxStream << "[" << topic_ << ", " << producerName_ << ", " << producerId_ << "]"; + std::string logCtx = logCtxStream.str(); + msgCrypto_ = boost::make_shared<MessageCrypto>(logCtx, true); + msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader()); + + dataKeyGenTImer_ = executor_->createDeadlineTimer(); + dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_)); + dataKeyGenTImer_->async_wait( + boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, boost::asio::placeholders::error)); + } } ProducerImpl::~ProducerImpl() { LOG_DEBUG(getName() << "~ProducerImpl"); + if (dataKeyGenTImer_) { + dataKeyGenTImer_->cancel(); + } closeAsync(ResultCallback()); printStats(); } @@ -84,6 +102,19 @@ const std::string& ProducerImpl::getProducerName() const { return producerName_; int64_t ProducerImpl::getLastSequenceId() const { return lastSequenceIdPublished_; } +void ProducerImpl::refreshEncryptionKey(const boost::system::error_code& ec) { + if (ec) { + LOG_DEBUG("Ignoring timer cancelled event, code[" << ec << "]"); + return; + } + + msgCrypto_->addPublicKeyCipher(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader()); + + dataKeyGenTImer_->expires_from_now(boost::posix_time::seconds(dataKeyGenIntervalSec_)); + dataKeyGenTImer_->async_wait( + boost::bind(&pulsar::ProducerImpl::refreshEncryptionKey, this, boost::asio::placeholders::error)); +} + void ProducerImpl::connectionOpened(const ClientConnectionPtr& cnx) { Lock lock(mutex_); if (state_ == Closed) { @@ -262,14 +293,23 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { SharedBuffer& payload = msg.impl_->payload; uint32_t uncompressedSize = payload.readableBytes(); + uint32_t payloadSize = uncompressedSize; if (!batchMessageContainer) { // If batching is enabled we compress all the payloads together before sending the batch payload = CompressionCodecProvider::getCodec(conf_.getCompressionType()).encode(payload); + payloadSize = payload.readableBytes(); + + // Encrypt the payload if enabled + SharedBuffer encryptedPayload; + if (!encryptMessage(msg.impl_->metadata, payload, encryptedPayload)) { + cb(ResultCryptoError, msg); + return; + } + payload = encryptedPayload; } - uint32_t compressedSize = payload.readableBytes(); - if (compressedSize > Commands::MaxMessageSize) { - LOG_DEBUG(getName() << " - compressed Message payload size" << compressedSize << "cannot exceed " + if (payloadSize > Commands::MaxMessageSize) { + LOG_DEBUG(getName() << " - compressed Message payload size" << payloadSize << "cannot exceed " << Commands::MaxMessageSize << " bytes"); cb(ResultMessageTooBig, msg); return; @@ -559,6 +599,17 @@ bool ProducerImpl::ackReceived(uint64_t sequenceId) { } } +bool ProducerImpl::encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, + SharedBuffer& encryptedPayload) { + if (!conf_.isEncryptionEnabled() || msgCrypto_ == NULL) { + encryptedPayload = payload; + return true; + } + + return msgCrypto_->encrypt(conf_.getEncryptionKeys(), conf_.getCryptoKeyReader(), metadata, payload, + encryptedPayload); +} + void ProducerImpl::disconnectProducer() { LOG_DEBUG("Broker notification of Closed producer: " << producerId_); Lock lock(mutex_); diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index f89823f..2907d40 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -27,6 +27,7 @@ #include "HandlerBase.h" #include "SharedBuffer.h" #include "CompressionCodec.h" +#include "MessageCrypto.h" #include "stats/ProducerStatsDisabled.h" #include "stats/ProducerStatsImpl.h" @@ -37,6 +38,7 @@ namespace pulsar { class BatchMessageContainer; typedef boost::shared_ptr<BatchMessageContainer> BatchMessageContainerPtr; +typedef boost::shared_ptr<MessageCrypto> MessageCryptoPtr; class PulsarFriend; @@ -120,6 +122,10 @@ class ProducerImpl : public HandlerBase, void resendMessages(ClientConnectionPtr cnx); + void refreshEncryptionKey(const boost::system::error_code& ec); + bool encryptMessage(proto::MessageMetadata& metadata, SharedBuffer& payload, + SharedBuffer& encryptedPayload); + typedef boost::unique_lock<boost::mutex> Lock; ProducerConfiguration conf_; @@ -144,6 +150,10 @@ class ProducerImpl : public HandlerBase, Promise<Result, ProducerImplBaseWeakPtr> producerCreatedPromise_; void failPendingMessages(Result result); + + MessageCryptoPtr msgCrypto_; + DeadlineTimerPtr dataKeyGenTImer_; + uint32_t dataKeyGenIntervalSec_; }; struct ProducerImplCmp { diff --git a/pulsar-client-cpp/lib/Result.cc b/pulsar-client-cpp/lib/Result.cc index 1deb5f2..64e23e1 100644 --- a/pulsar-client-cpp/lib/Result.cc +++ b/pulsar-client-cpp/lib/Result.cc @@ -122,6 +122,9 @@ const char* pulsar::strResult(Result result) { case ResultTopicTerminated: return "TopicTerminated"; + + case ResultCryptoError: + return "CryptoError"; }; // NOTE : Do not add default case in the switch above. In future if we get new cases for // ServerError and miss them in the switch above we would like to get notified. Adding diff --git a/pulsar-client-cpp/perf/PerfConsumer.cc b/pulsar-client-cpp/perf/PerfConsumer.cc index 0ddef60..97aa8db 100644 --- a/pulsar-client-cpp/perf/PerfConsumer.cc +++ b/pulsar-client-cpp/perf/PerfConsumer.cc @@ -68,6 +68,8 @@ struct Arguments { int receiverQueueSize; int ioThreads; int listenerThreads; + std::string encKeyName; + std::string encKeyValueFile; }; namespace pulsar { @@ -87,6 +89,37 @@ class PulsarFriend { #include <atomic> #endif +class EncKeyReader: public CryptoKeyReader { + + private: + std::string privKeyContents; + + void readFile(std::string fileName, std::string& fileContents) const { + std::ifstream ifs(fileName); + std::stringstream fileStream; + fileStream << ifs.rdbuf(); + fileContents = fileStream.str(); + } + + public: + + EncKeyReader(std::string keyFile) { + if (keyFile.empty()) { + return; + } + readFile(keyFile, privKeyContents); + } + + Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const { + return ResultInvalidConfiguration; + } + + Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const { + encKeyInfo.setKey(privKeyContents); + return ResultOk; + } +}; + // Counters std::atomic<uint32_t> messagesReceived; std::atomic<uint32_t> bytesReceived; @@ -149,6 +182,10 @@ void startPerfConsumer(const Arguments& args) { ConsumerConfiguration consumerConf; consumerConf.setMessageListener(messageListener); consumerConf.setReceiverQueueSize(args.receiverQueueSize); + boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(args.encKeyValueFile); + if (!args.encKeyName.empty()) { + consumerConf.setCryptoKeyReader(keyReader); + } Latch latch(args.numTopics * args.numConsumers); @@ -261,7 +298,12 @@ int main(int argc, char** argv) { "Number of IO threads to use") // ("listener-threads,l", po::value<int>(&args.listenerThreads)->default_value(1), - "Number of listener threads"); + "Number of listener threads") // + + ("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""), "The private key name to decrypt payload") // + + ("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""), + "The file which contains the private key to decrypt payload"); // po::options_description hidden; hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name"); diff --git a/pulsar-client-cpp/perf/PerfProducer.cc b/pulsar-client-cpp/perf/PerfProducer.cc index d6cea4c..d591d34 100644 --- a/pulsar-client-cpp/perf/PerfProducer.cc +++ b/pulsar-client-cpp/perf/PerfProducer.cc @@ -62,6 +62,8 @@ struct Arguments { unsigned int batchingMaxMessages; long batchingMaxAllowedSizeInBytes; long batchingMaxPublishDelayMs; + std::string encKeyName; + std::string encKeyValueFile; }; namespace pulsar { @@ -74,11 +76,43 @@ class PulsarFriend { }; } -// Stats unsigned long messagesProduced; unsigned long bytesProduced; using namespace boost::accumulators; +using namespace pulsar; + +class EncKeyReader: public CryptoKeyReader { + + private: + std::string pubKeyContents; + + void readFile(std::string fileName, std::string& fileContents) const { + std::ifstream ifs(fileName); + std::stringstream fileStream; + fileStream << ifs.rdbuf(); + fileContents = fileStream.str(); + } + public: + + EncKeyReader(std::string keyFile) { + if (keyFile.empty()) { + return; + } + readFile(keyFile, pubKeyContents); + } + + Result getPublicKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const { + encKeyInfo.setKey(pubKeyContents); + return ResultOk; + } + + Result getPrivateKey(const std::string &keyName, std::map<std::string, std::string>& metadata, EncryptionKeyInfo& encKeyInfo) const { + return ResultInvalidConfiguration; + } +}; + +// Stats typedef accumulator_set<uint64_t, stats<tag::mean, tag::p_square_quantile> > LatencyAccumulator; LatencyAccumulator e2eLatencyAccumulator(quantile_probability = 0.99); std::vector<pulsar::Producer> producerList; @@ -230,7 +264,12 @@ int main(int argc, char** argv) { "Use only is batch-size > 1, Default is 128 KB") // ("max-batch-publish-delay-in-ms", po::value<long>(&args.batchingMaxPublishDelayMs)->default_value(3000), - "Use only is batch-size > 1, Default is 3 seconds"); + "Use only is batch-size > 1, Default is 3 seconds") // + + ("encryption-key-name,k", po::value<std::string>(&args.encKeyName)->default_value(""), "The public key name to encrypt payload") // + + ("encryption-key-value-file,f", po::value<std::string>(&args.encKeyValueFile)->default_value(""), + "The file which contains the public key to encrypt payload"); // po::options_description hidden; hidden.add_options()("topic", po::value<std::string>(&args.topic), "Topic name"); @@ -291,6 +330,12 @@ int main(int argc, char** argv) { // Block if queue is full else we will start seeing errors in sendAsync producerConf.setBlockIfQueueFull(true); + boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(args.encKeyValueFile); + if (!args.encKeyName.empty()) { + producerConf.addEncryptionKey(args.encKeyName); + producerConf.setCryptoKeyReader(keyReader); + } + pulsar::ClientConfiguration conf; conf.setUseTls(args.isUseTls); conf.setTlsAllowInsecureConnection(args.isTlsAllowInsecureConnection); diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index 8191d99..0c3f2b4 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -65,6 +65,42 @@ static void sendCallBack(Result r, const Message& msg, std::string prefix, doubl sendCallBack(r, msg, prefix); } +class EncKeyReader : public CryptoKeyReader { + private: + void readFile(std::string fileName, std::string& fileContents) const { + std::ifstream ifs(fileName); + std::stringstream fileStream; + fileStream << ifs.rdbuf(); + + fileContents = fileStream.str(); + } + + public: + EncKeyReader() {} + + Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata, + EncryptionKeyInfo& encKeyInfo) const { + std::string CERT_FILE_PATH = + "../../pulsar-broker/src/test/resources/certificate/public-key." + keyName; + std::string keyContents; + readFile(CERT_FILE_PATH, keyContents); + + encKeyInfo.setKey(keyContents); + return ResultOk; + } + + Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata, + EncryptionKeyInfo& encKeyInfo) const { + std::string CERT_FILE_PATH = + "../../pulsar-broker/src/test/resources/certificate/private-key." + keyName; + std::string keyContents; + readFile(CERT_FILE_PATH, keyContents); + + encKeyInfo.setKey(keyContents); + return ResultOk; + } +}; + TEST(BasicEndToEndTest, testBatchMessages) { ClientConfiguration config; Client client(lookupUrl); @@ -1103,3 +1139,173 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) { ASSERT_TRUE(receivedMsgIndex.find(boost::lexical_cast<std::string>(i)) != receivedMsgIndex.end()); } } + +TEST(BasicEndToEndTest, testRSAEncryption) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://prop/unit/ns1/my-rsaenctopic"; + std::string subName = "my-sub-name"; + Producer producer; + + boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(); + ProducerConfiguration conf; + conf.setCompressionType(CompressionLZ4); + conf.addEncryptionKey("client-rsa.pem"); + conf.setCryptoKeyReader(keyReader); + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + ConsumerConfiguration consConfig; + consConfig.setCryptoKeyReader(keyReader); + // consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME); + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // Send 1000 messages synchronously + std::string msgContent = "msg-content"; + LOG_INFO("Publishing 1000 messages synchronously"); + int msgNum = 0; + for (; msgNum < 1000; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + + LOG_INFO("Trying to receive 1000 messages"); + Message msgReceived; + for (msgNum = 0; msgNum < 1000; msgNum++) { + consumer.receive(msgReceived, 1000); + LOG_DEBUG("Received message :" << msgReceived.getMessageId()); + std::stringstream expected; + expected << msgContent << msgNum; + ASSERT_EQ(expected.str(), msgReceived.getDataAsString()); + ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived)); + } + + ASSERT_EQ(ResultOk, consumer.unsubscribe()); + ASSERT_EQ(ResultAlreadyClosed, consumer.close()); + ASSERT_EQ(ResultOk, producer.close()); + ASSERT_EQ(ResultOk, client.close()); +} + +TEST(BasicEndToEndTest, testEncryptionFailure) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://prop/unit/ns1/my-rsaencfailtopic"; + std::string subName = "my-sub-name"; + Producer producer; + + boost::shared_ptr<EncKeyReader> keyReader = boost::make_shared<EncKeyReader>(); + + ConsumerConfiguration consConfig; + + Consumer consumer; + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + Result result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + std::string msgContent = "msg-content"; + int msgNum = 0; + int totalMsgs = 10; + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(msgContent).build(); + + // 1. Non existing key + + { + ProducerConfiguration prodConf; + prodConf.setCryptoKeyReader(keyReader); + prodConf.addEncryptionKey("client-non-existing-rsa.pem"); + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, prodConf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + ASSERT_EQ(ResultCryptoError, producer.send(msg)); + } + + // 2. Add valid key + { + ProducerConfiguration prodConf; + prodConf.setCryptoKeyReader(keyReader); + prodConf.addEncryptionKey("client-rsa.pem"); + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, prodConf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + msgNum++; + for (; msgNum < totalMsgs; msgNum++) { + std::stringstream stream; + stream << msgContent << msgNum; + Message msg = MessageBuilder().setContent(stream.str()).build(); + ASSERT_EQ(ResultOk, producer.send(msg)); + } + } + + // 3. Key reader is not set by consumer + Message msgReceived; + ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000)); + ASSERT_EQ(ResultOk, consumer.close()); + + // 4. Set consumer config to consume even if decryption fails + consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::CONSUME); + + Promise<Result, Consumer> consumerPromise2; + client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise2)); + consumerFuture = consumerPromise2.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000)); + + // Received message 0. Skip message comparision since its encrypted + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(ResultOk, consumer.close()); + + // 5. Set valid keyreader and consume messages + msgNum = 1; + consConfig.setCryptoKeyReader(keyReader); + consConfig.setCryptoFailureAction(ConsumerCryptoFailureAction::FAIL); + Promise<Result, Consumer> consumerPromise3; + client.subscribeAsync(topicName, subName, consConfig, WaitForCallbackValue<Consumer>(consumerPromise3)); + consumerFuture = consumerPromise3.getFuture(); + result = consumerFuture.get(consumer); + + for (; msgNum < totalMsgs - 1; msgNum++) { + ASSERT_EQ(ResultOk, consumer.receive(msgReceived, 1000)); + LOG_DEBUG("Received message :" << msgReceived.getMessageId()); + std::stringstream expected; + expected << msgContent << msgNum; + ASSERT_EQ(expected.str(), msgReceived.getDataAsString()); + ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived)); + } + ASSERT_EQ(ResultOk, consumer.close()); + + // 6. Discard message if decryption fails + ConsumerConfiguration consConfig2; + consConfig2.setCryptoFailureAction(ConsumerCryptoFailureAction::DISCARD); + + Promise<Result, Consumer> consumerPromise4; + client.subscribeAsync(topicName, subName, consConfig2, WaitForCallbackValue<Consumer>(consumerPromise4)); + consumerFuture = consumerPromise4.getFuture(); + result = consumerFuture.get(consumer); + + // Since messag is discarded, no message will be received. + ASSERT_EQ(ResultTimeout, consumer.receive(msgReceived, 5000)); +} -- To stop receiving notification emails like this one, please contact mme...@apache.org.