This is an automated email from the ASF dual-hosted git repository. rdhabalia pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new fbb42e7 Added debug logs in MessageCrypto (#1233) fbb42e7 is described below commit fbb42e711aa9044ec5a1b30f448173432c1d805a Author: Andrews <sahaya.andr...@gmail.com> AuthorDate: Tue Feb 13 18:12:13 2018 -0800 Added debug logs in MessageCrypto (#1233) --- pulsar-client-cpp/lib/MessageCrypto.cc | 101 +++++++++++++++++++++++++-------- pulsar-client-cpp/lib/MessageCrypto.h | 1 + 2 files changed, 77 insertions(+), 25 deletions(-) diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc index 26f1b48..0cc5dec 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.cc +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, unsigned char keyDigest[], unsigned int& digestLen) { if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { - LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); + LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName); return false; } digestLen = 0; if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { - LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName); return false; } if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) { - LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName); return false; } @@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() { } } +std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) { + static const char* hexVals = "0123456789ABCDEF"; + + std::string outHex; + outHex.reserve(2 * len + 2); + outHex.push_back('0'); + outHex.push_back('x'); + for (size_t i = 0; i < len; ++i) { + const unsigned char c = inputStr[i]; + outHex.push_back(hexVals[c >> 4]); + outHex.push_back(hexVals[c & 15]); + } + return outHex; +} + Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames, const CryptoKeyReaderPtr keyReader) { Lock lock(mutex_); @@ -141,7 +156,7 @@ Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames, Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { if (keyName.empty()) { - LOG_ERROR(logCtx_ + "Keyname is empty "); + LOG_ERROR(logCtx_ << "Keyname is empty "); return ResultCryptoError; } @@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt EncryptionKeyInfo keyInfo; Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); if (result != ResultOk) { - LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName); return result; } RSA* pubKey = loadPublicKey(keyInfo.getKey()); if (pubKey == NULL) { - LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); + LOG_ERROR(logCtx_ << "Failed to load public key " << keyName); return ResultCryptoError; } + LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully."); int inSize = RSA_size(pubKey); boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]); @@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); if (inSize != outSize) { - LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); + LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName); return ResultCryptoError; } std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize); @@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt eki->setMetadata(keyInfo.getMetadata()); encryptedDataKeyMap_.insert(std::make_pair(keyName, eki)); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(encryptedKeyStr, encryptedKeyStr.size()); + LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName + << ". Encrypted key size = " << encryptedKeyStr.size() << ", value = " << strHex); + } return ResultOk; } @@ -212,7 +233,7 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade keyInfoIter = encryptedDataKeyMap_.find(keyName); if (keyInfoIter == encryptedDataKeyMap_.end()) { - LOG_ERROR(logCtx_ + "Unable to find encrypted data key for " + keyName); + LOG_ERROR(logCtx_ << "Unable to find encrypted data key for " << keyName); return false; } } @@ -221,6 +242,11 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New(); encKeys->set_key(keyName); encKeys->set_value(keyInfo->getKey()); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(keyInfo->getKey(), keyInfo->getKey().size()); + LOG_DEBUG(logCtx_ << " Encrypted data key added for key " << keyName << ". Encrypted key size = " + << keyInfo->getKey().size() << ", value = " << strHex); + } if (keyInfo->getMetadata().size()) { for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end(); @@ -229,6 +255,8 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade keyValue->set_key(metaIter->first); keyValue->set_value(metaIter->second); encKeys->mutable_metadata()->AddAllocated(keyValue); + LOG_DEBUG(logCtx_ << " Adding metadata for key " << keyName << ". Metadata key = " + << metaIter->first << ", value =" << metaIter->second); } } @@ -244,18 +272,18 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade int encLen = 0; if (!(cipherCtx = EVP_CIPHER_CTX_new())) { - LOG_ERROR(logCtx_ + " Failed to cipher ctx."); + LOG_ERROR(logCtx_ << " Failed to cipher ctx."); return false; } if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) { - LOG_ERROR(logCtx_ + " Failed to init cipher ctx."); + LOG_ERROR(logCtx_ << " Failed to init cipher ctx."); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { - LOG_ERROR(logCtx_ + " Failed to set cipher padding."); + LOG_ERROR(logCtx_ << " Failed to set cipher padding."); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -263,7 +291,7 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), &encLen, reinterpret_cast<unsigned const char*>(payload.data()), payload.readableBytes()) != 1) { - LOG_ERROR(logCtx_ + " Failed to encrypt payload."); + LOG_ERROR(logCtx_ << " Failed to encrypt payload."); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -272,18 +300,25 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), &encLen) != 1) { - LOG_ERROR(logCtx_ + " Failed to finalize encryption."); + LOG_ERROR(logCtx_ << " Failed to finalize encryption."); EVP_CIPHER_CTX_free(cipherCtx); return false; } encryptedPayload.bytesWritten(encLen); if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) { - LOG_ERROR(logCtx_ + " Failed to get cipher tag info."); + LOG_ERROR(logCtx_ << " Failed to get cipher tag info."); EVP_CIPHER_CTX_free(cipherCtx); return false; } encryptedPayload.bytesWritten(tagLen_); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strPayloadHex = stringToHex(payload.data(), payload.readableBytes()); + std::string strHex = stringToHex(encryptedPayload.data(), encryptedPayload.readableBytes()); + LOG_DEBUG(logCtx_ << " Original size = " << payload.readableBytes() << ", value = " << strPayloadHex + << ". Encrypted size " << encryptedPayload.readableBytes() + << ", value =" << strHex); + } EVP_CIPHER_CTX_free(cipherCtx); @@ -305,9 +340,10 @@ bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string // Convert key from string to RSA key RSA* privKey = loadPrivateKey(keyInfo.getKey()); if (privKey == NULL) { - LOG_ERROR(logCtx_ + " Failed to load private key " + keyName); + LOG_ERROR(logCtx_ << " Failed to load private key " << keyName); return false; } + LOG_DEBUG(logCtx_ << " Private key " << keyName << " loaded successfully."); // Decrypt data key int outSize = RSA_private_decrypt(encryptedDataKey.size(), @@ -315,20 +351,25 @@ bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string dataKey_.get(), privKey, RSA_PKCS1_OAEP_PADDING); if (outSize == -1) { - LOG_ERROR(logCtx_ + "Failed to decrypt AES key for " + keyName); + LOG_ERROR(logCtx_ << "Failed to decrypt AES key for " << keyName); return false; } unsigned char keyDigest[EVP_MAX_MD_SIZE]; unsigned int digestLen = 0; if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), keyDigest, digestLen)) { - LOG_ERROR(logCtx_ + "Failed to get digest for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get digest for data key " << keyName); return false; } std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen); std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_); dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, boost::posix_time::second_clock::universal_time()); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(dataKeyStr, dataKeyStr.size()); + LOG_DEBUG(logCtx_ << "Data key for key " << keyName << " decrypted. Decrypted data key is " + << strHex); + } // Remove expired entries from the cache removeExpiredDataKey(); @@ -343,22 +384,27 @@ bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::M EVP_CIPHER_CTX* cipherCtx = NULL; decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(payload.data(), payload.readableBytes()); + LOG_DEBUG(logCtx_ << "Attempting to decrypt data with encrypted size " << payload.readableBytes() + << ", data = " << strHex); + } if (!(cipherCtx = EVP_CIPHER_CTX_new())) { - LOG_ERROR(logCtx_ + " Failed to get cipher ctx"); + LOG_ERROR(logCtx_ << " Failed to get cipher ctx"); return false; } if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, reinterpret_cast<unsigned const char*>(dataKeySecret.c_str()), reinterpret_cast<unsigned const char*>(iv_.get()))) { - LOG_ERROR(logCtx_ + " Failed to init decrypt cipher ctx"); + LOG_ERROR(logCtx_ << " Failed to init decrypt cipher ctx"); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { - LOG_ERROR(logCtx_ + " Failed to set cipher padding"); + LOG_ERROR(logCtx_ << " Failed to set cipher padding"); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -367,25 +413,30 @@ bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::M int decLen = 0; if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), &decLen, reinterpret_cast<unsigned const char*>(payload.data()), cipherLen)) { - LOG_ERROR(logCtx_ + " Failed to decrypt update"); + LOG_ERROR(logCtx_ << " Failed to decrypt update"); EVP_CIPHER_CTX_free(cipherCtx); return false; }; decryptedPayload.bytesWritten(decLen); if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, (void*)(payload.data() + cipherLen))) { - LOG_ERROR(logCtx_ + " Failed to set gcm tag"); + LOG_ERROR(logCtx_ << " Failed to set gcm tag"); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), &decLen)) { - LOG_ERROR(logCtx_ + " Failed to finalize encrypted message"); + LOG_ERROR(logCtx_ << " Failed to finalize encrypted message"); EVP_CIPHER_CTX_free(cipherCtx); return false; } decryptedPayload.bytesWritten(decLen); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(decryptedPayload.data(), decryptedPayload.readableBytes()); + LOG_DEBUG(logCtx_ << "Data decrypted. Decrypted size = " << decryptedPayload.readableBytes() + << ", data = " << strHex); + } EVP_CIPHER_CTX_free(cipherCtx); @@ -419,8 +470,8 @@ bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& msgMetada } } else { // First time, entry won't be present in cache - LOG_DEBUG(logCtx_ + " Failed to decrypt data or data key is not in cache for " + keyName + - ". Will attempt to refresh."); + LOG_DEBUG(logCtx_ << " Failed to decrypt data or data key is not in cache for " + << keyName + ". Will attempt to refresh."); } } return dataDecrypted; diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h index 016ce54..a4b77f0 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.h +++ b/pulsar-client-cpp/lib/MessageCrypto.h @@ -135,6 +135,7 @@ class MessageCrypto { SharedBuffer& payload, SharedBuffer& decPayload); bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, SharedBuffer& decryptedPayload); + std::string stringToHex(const std::string& inputStr, size_t len); }; } /* namespace pulsar */ -- To stop receiving notification emails like this one, please contact rdhaba...@apache.org.