rdhabalia closed pull request #1233: Added debug logs in MessageCrypto URL: https://github.com/apache/incubator-pulsar/pull/1233
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-client-cpp/lib/MessageCrypto.cc b/pulsar-client-cpp/lib/MessageCrypto.cc index 26f1b484e..0cc5decac 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.cc +++ b/pulsar-client-cpp/lib/MessageCrypto.cc @@ -87,18 +87,18 @@ RSA* MessageCrypto::loadPrivateKey(std::string& privateKeyStr) { bool MessageCrypto::getDigest(const std::string& keyName, const void* input, unsigned int inputLen, unsigned char keyDigest[], unsigned int& digestLen) { if (EVP_DigestInit_ex(mdCtx_, EVP_md5(), NULL) != 1) { - LOG_ERROR(logCtx_ + "Failed to initialize md5 digest for key " + keyName); + LOG_ERROR(logCtx_ << "Failed to initialize md5 digest for key " << keyName); return false; } digestLen = 0; if (EVP_DigestUpdate(mdCtx_, input, inputLen) != 1) { - LOG_ERROR(logCtx_ + "Failed to get md5 hash for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get md5 hash for data key " << keyName); return false; } if (EVP_DigestFinal_ex(mdCtx_, keyDigest, &digestLen) != 1) { - LOG_ERROR(logCtx_ + "Failed to finalize md hash for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to finalize md hash for data key " << keyName); return false; } @@ -122,6 +122,21 @@ void MessageCrypto::removeExpiredDataKey() { } } +std::string MessageCrypto::stringToHex(const std::string& inputStr, size_t len) { + static const char* hexVals = "0123456789ABCDEF"; + + std::string outHex; + outHex.reserve(2 * len + 2); + outHex.push_back('0'); + outHex.push_back('x'); + for (size_t i = 0; i < len; ++i) { + const unsigned char c = inputStr[i]; + outHex.push_back(hexVals[c >> 4]); + outHex.push_back(hexVals[c & 15]); + } + return outHex; +} + Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames, const CryptoKeyReaderPtr keyReader) { Lock lock(mutex_); @@ -141,7 +156,7 @@ Result MessageCrypto::addPublicKeyCipher(std::set<std::string>& keyNames, Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const CryptoKeyReaderPtr keyReader) { if (keyName.empty()) { - LOG_ERROR(logCtx_ + "Keyname is empty "); + LOG_ERROR(logCtx_ << "Keyname is empty "); return ResultCryptoError; } @@ -150,15 +165,16 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt EncryptionKeyInfo keyInfo; Result result = keyReader->getPublicKey(keyName, keyMeta, keyInfo); if (result != ResultOk) { - LOG_ERROR(logCtx_ + "Failed to get public key from KeyReader for key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get public key from KeyReader for key " << keyName); return result; } RSA* pubKey = loadPublicKey(keyInfo.getKey()); if (pubKey == NULL) { - LOG_ERROR(logCtx_ + "Failed to load public key " + keyName); + LOG_ERROR(logCtx_ << "Failed to load public key " << keyName); return ResultCryptoError; } + LOG_DEBUG(logCtx_ << " Public key " << keyName << " loaded successfully."); int inSize = RSA_size(pubKey); boost::scoped_array<unsigned char> encryptedKey(new unsigned char[inSize]); @@ -167,7 +183,7 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt RSA_public_encrypt(dataKeyLen_, dataKey_.get(), encryptedKey.get(), pubKey, RSA_PKCS1_OAEP_PADDING); if (inSize != outSize) { - LOG_ERROR(logCtx_ + "Ciphertext is length not matching input key length for key " + keyName); + LOG_ERROR(logCtx_ << "Ciphertext is length not matching input key length for key " << keyName); return ResultCryptoError; } std::string encryptedKeyStr(reinterpret_cast<char*>(encryptedKey.get()), inSize); @@ -176,6 +192,11 @@ Result MessageCrypto::addPublicKeyCipher(const std::string& keyName, const Crypt eki->setMetadata(keyInfo.getMetadata()); encryptedDataKeyMap_.insert(std::make_pair(keyName, eki)); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(encryptedKeyStr, encryptedKeyStr.size()); + LOG_DEBUG(logCtx_ << " Data key encrypted for key " << keyName + << ". Encrypted key size = " << encryptedKeyStr.size() << ", value = " << strHex); + } return ResultOk; } @@ -212,7 +233,7 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade keyInfoIter = encryptedDataKeyMap_.find(keyName); if (keyInfoIter == encryptedDataKeyMap_.end()) { - LOG_ERROR(logCtx_ + "Unable to find encrypted data key for " + keyName); + LOG_ERROR(logCtx_ << "Unable to find encrypted data key for " << keyName); return false; } } @@ -221,6 +242,11 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade proto::EncryptionKeys* encKeys = proto::EncryptionKeys().New(); encKeys->set_key(keyName); encKeys->set_value(keyInfo->getKey()); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(keyInfo->getKey(), keyInfo->getKey().size()); + LOG_DEBUG(logCtx_ << " Encrypted data key added for key " << keyName << ". Encrypted key size = " + << keyInfo->getKey().size() << ", value = " << strHex); + } if (keyInfo->getMetadata().size()) { for (auto metaIter = keyInfo->getMetadata().begin(); metaIter != keyInfo->getMetadata().end(); @@ -229,6 +255,8 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade keyValue->set_key(metaIter->first); keyValue->set_value(metaIter->second); encKeys->mutable_metadata()->AddAllocated(keyValue); + LOG_DEBUG(logCtx_ << " Adding metadata for key " << keyName << ". Metadata key = " + << metaIter->first << ", value =" << metaIter->second); } } @@ -244,18 +272,18 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade int encLen = 0; if (!(cipherCtx = EVP_CIPHER_CTX_new())) { - LOG_ERROR(logCtx_ + " Failed to cipher ctx."); + LOG_ERROR(logCtx_ << " Failed to cipher ctx."); return false; } if (EVP_EncryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, dataKey_.get(), iv_.get()) != 1) { - LOG_ERROR(logCtx_ + " Failed to init cipher ctx."); + LOG_ERROR(logCtx_ << " Failed to init cipher ctx."); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { - LOG_ERROR(logCtx_ + " Failed to set cipher padding."); + LOG_ERROR(logCtx_ << " Failed to set cipher padding."); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -263,7 +291,7 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade if (EVP_EncryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), &encLen, reinterpret_cast<unsigned const char*>(payload.data()), payload.readableBytes()) != 1) { - LOG_ERROR(logCtx_ + " Failed to encrypt payload."); + LOG_ERROR(logCtx_ << " Failed to encrypt payload."); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -272,18 +300,25 @@ bool MessageCrypto::encrypt(std::set<std::string>& encKeys, const CryptoKeyReade if (EVP_EncryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(encryptedPayload.mutableData()), &encLen) != 1) { - LOG_ERROR(logCtx_ + " Failed to finalize encryption."); + LOG_ERROR(logCtx_ << " Failed to finalize encryption."); EVP_CIPHER_CTX_free(cipherCtx); return false; } encryptedPayload.bytesWritten(encLen); if (EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_GET_TAG, tagLen_, encryptedPayload.mutableData()) != 1) { - LOG_ERROR(logCtx_ + " Failed to get cipher tag info."); + LOG_ERROR(logCtx_ << " Failed to get cipher tag info."); EVP_CIPHER_CTX_free(cipherCtx); return false; } encryptedPayload.bytesWritten(tagLen_); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strPayloadHex = stringToHex(payload.data(), payload.readableBytes()); + std::string strHex = stringToHex(encryptedPayload.data(), encryptedPayload.readableBytes()); + LOG_DEBUG(logCtx_ << " Original size = " << payload.readableBytes() << ", value = " << strPayloadHex + << ". Encrypted size " << encryptedPayload.readableBytes() + << ", value =" << strHex); + } EVP_CIPHER_CTX_free(cipherCtx); @@ -305,9 +340,10 @@ bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string // Convert key from string to RSA key RSA* privKey = loadPrivateKey(keyInfo.getKey()); if (privKey == NULL) { - LOG_ERROR(logCtx_ + " Failed to load private key " + keyName); + LOG_ERROR(logCtx_ << " Failed to load private key " << keyName); return false; } + LOG_DEBUG(logCtx_ << " Private key " << keyName << " loaded successfully."); // Decrypt data key int outSize = RSA_private_decrypt(encryptedDataKey.size(), @@ -315,20 +351,25 @@ bool MessageCrypto::decryptDataKey(const std::string& keyName, const std::string dataKey_.get(), privKey, RSA_PKCS1_OAEP_PADDING); if (outSize == -1) { - LOG_ERROR(logCtx_ + "Failed to decrypt AES key for " + keyName); + LOG_ERROR(logCtx_ << "Failed to decrypt AES key for " << keyName); return false; } unsigned char keyDigest[EVP_MAX_MD_SIZE]; unsigned int digestLen = 0; if (!getDigest(keyName, encryptedDataKey.c_str(), encryptedDataKey.size(), keyDigest, digestLen)) { - LOG_ERROR(logCtx_ + "Failed to get digest for data key " + keyName); + LOG_ERROR(logCtx_ << "Failed to get digest for data key " << keyName); return false; } std::string keyDigestStr(reinterpret_cast<char*>(keyDigest), digestLen); std::string dataKeyStr(reinterpret_cast<char*>(dataKey_.get()), dataKeyLen_); dataKeyCache_[keyDigestStr] = make_pair(dataKeyStr, boost::posix_time::second_clock::universal_time()); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(dataKeyStr, dataKeyStr.size()); + LOG_DEBUG(logCtx_ << "Data key for key " << keyName << " decrypted. Decrypted data key is " + << strHex); + } // Remove expired entries from the cache removeExpiredDataKey(); @@ -343,22 +384,27 @@ bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::M EVP_CIPHER_CTX* cipherCtx = NULL; decryptedPayload = SharedBuffer::allocate(payload.readableBytes() + EVP_MAX_BLOCK_LENGTH + tagLen_); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(payload.data(), payload.readableBytes()); + LOG_DEBUG(logCtx_ << "Attempting to decrypt data with encrypted size " << payload.readableBytes() + << ", data = " << strHex); + } if (!(cipherCtx = EVP_CIPHER_CTX_new())) { - LOG_ERROR(logCtx_ + " Failed to get cipher ctx"); + LOG_ERROR(logCtx_ << " Failed to get cipher ctx"); return false; } if (!EVP_DecryptInit_ex(cipherCtx, EVP_aes_256_gcm(), NULL, reinterpret_cast<unsigned const char*>(dataKeySecret.c_str()), reinterpret_cast<unsigned const char*>(iv_.get()))) { - LOG_ERROR(logCtx_ + " Failed to init decrypt cipher ctx"); + LOG_ERROR(logCtx_ << " Failed to init decrypt cipher ctx"); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (EVP_CIPHER_CTX_set_padding(cipherCtx, EVP_CIPH_NO_PADDING) != 1) { - LOG_ERROR(logCtx_ + " Failed to set cipher padding"); + LOG_ERROR(logCtx_ << " Failed to set cipher padding"); EVP_CIPHER_CTX_free(cipherCtx); return false; } @@ -367,25 +413,30 @@ bool MessageCrypto::decryptData(const std::string& dataKeySecret, const proto::M int decLen = 0; if (!EVP_DecryptUpdate(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), &decLen, reinterpret_cast<unsigned const char*>(payload.data()), cipherLen)) { - LOG_ERROR(logCtx_ + " Failed to decrypt update"); + LOG_ERROR(logCtx_ << " Failed to decrypt update"); EVP_CIPHER_CTX_free(cipherCtx); return false; }; decryptedPayload.bytesWritten(decLen); if (!EVP_CIPHER_CTX_ctrl(cipherCtx, EVP_CTRL_GCM_SET_TAG, tagLen_, (void*)(payload.data() + cipherLen))) { - LOG_ERROR(logCtx_ + " Failed to set gcm tag"); + LOG_ERROR(logCtx_ << " Failed to set gcm tag"); EVP_CIPHER_CTX_free(cipherCtx); return false; } if (!EVP_DecryptFinal_ex(cipherCtx, reinterpret_cast<unsigned char*>(decryptedPayload.mutableData()), &decLen)) { - LOG_ERROR(logCtx_ + " Failed to finalize encrypted message"); + LOG_ERROR(logCtx_ << " Failed to finalize encrypted message"); EVP_CIPHER_CTX_free(cipherCtx); return false; } decryptedPayload.bytesWritten(decLen); + if (LOG4CXX_UNLIKELY(logger()->isDebugEnabled())) { + std::string strHex = stringToHex(decryptedPayload.data(), decryptedPayload.readableBytes()); + LOG_DEBUG(logCtx_ << "Data decrypted. Decrypted size = " << decryptedPayload.readableBytes() + << ", data = " << strHex); + } EVP_CIPHER_CTX_free(cipherCtx); @@ -419,8 +470,8 @@ bool MessageCrypto::getKeyAndDecryptData(const proto::MessageMetadata& msgMetada } } else { // First time, entry won't be present in cache - LOG_DEBUG(logCtx_ + " Failed to decrypt data or data key is not in cache for " + keyName + - ". Will attempt to refresh."); + LOG_DEBUG(logCtx_ << " Failed to decrypt data or data key is not in cache for " + << keyName + ". Will attempt to refresh."); } } return dataDecrypted; diff --git a/pulsar-client-cpp/lib/MessageCrypto.h b/pulsar-client-cpp/lib/MessageCrypto.h index 016ce54f9..a4b77f08d 100644 --- a/pulsar-client-cpp/lib/MessageCrypto.h +++ b/pulsar-client-cpp/lib/MessageCrypto.h @@ -135,6 +135,7 @@ class MessageCrypto { SharedBuffer& payload, SharedBuffer& decPayload); bool getKeyAndDecryptData(const proto::MessageMetadata& msgMetadata, SharedBuffer& payload, SharedBuffer& decryptedPayload); + std::string stringToHex(const std::string& inputStr, size_t len); }; } /* namespace pulsar */ ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services