http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrMessage.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrMessage.hpp b/src/cppcache/src/TcrMessage.hpp index 30e95e0..60dd5a6 100644 --- a/src/cppcache/src/TcrMessage.hpp +++ b/src/cppcache/src/TcrMessage.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_TCRMESSAGE_H_ -#define GEODE_TCRMESSAGE_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,13 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_TCRMESSAGE_H_ +#define GEODE_TCRMESSAGE_H_ + +#include <ace/OS.h> + #include <geode/geode_globals.hpp> #include <atomic> #include <geode/Cacheable.hpp> @@ -40,6 +42,7 @@ #include "FixedPartitionAttributesImpl.hpp" #include "VersionTag.hpp" #include "VersionedCacheableObjectPartList.hpp" +#include "SerializationRegistry.hpp" #include <string> #include <map> #include <vector> @@ -175,8 +178,6 @@ class CPPCACHE_EXPORT TcrMessage { } MsgType; - static bool init(); - static void cleanup(); static bool isKeepAlive() { return *m_keepalive; } static bool isUserInitiativeOps(const TcrMessage& msg) { int32_t msgType = msg.getMessageType(); @@ -207,11 +208,14 @@ class CPPCACHE_EXPORT TcrMessage { } return false; } - static VersionTagPtr readVersionTagPart(DataInput& input, - uint16_t endpointMemId); + static VersionTagPtr readVersionTagPart( + DataInput& input, uint16_t endpointMemId, + MemberListForVersionStamp& memberListForVersionStamp); /* constructors */ - void setData(const char* bytearray, int32_t len, uint16_t memId); + void setData(const char* bytearray, int32_t len, uint16_t memId, + const SerializationRegistry& serializationRegistry, + MemberListForVersionStamp& memberListForVersionStamp); void startProcessChunk(ACE_Semaphore& finalizeSema); // nullptr chunk means that this is the last chunk @@ -336,11 +340,11 @@ class CPPCACHE_EXPORT TcrMessage { /* we need a static method to generate ping */ /* The caller should not delete the message since it is global. */ - static TcrMessagePing* getPingMessage(); + static TcrMessagePing* getPingMessage(Cache* cache); static TcrMessage* getAllEPDisMess(); /* we need a static method to generate close connection message */ /* The caller should not delete the message since it is global. */ - static TcrMessage* getCloseConnMessage(); + static TcrMessage* getCloseConnMessage(Cache* cache); static void setKeepAlive(bool keepalive); bool isDurable() const { return m_isDurable; } bool receiveValues() const { return m_receiveValues; } @@ -372,7 +376,7 @@ class CPPCACHE_EXPORT TcrMessage { return m_versionObjPartListptr; } - DataInput* getDelta() { return m_delta; } + DataInput* getDelta() { return m_delta.get(); } // getDeltaBytes( ) is called *only* by CqService, returns a CacheableBytes // that @@ -456,7 +460,7 @@ class CPPCACHE_EXPORT TcrMessage { m_securityHeaderLength(0), m_isMetaRegion(false), exceptionMessage(), - m_request(new DataOutput), + m_request(nullptr), m_msgType(TcrMessage::INVALID), m_msgLength(-1), m_msgTypeRequest(0), @@ -514,9 +518,6 @@ class CPPCACHE_EXPORT TcrMessage { SerializablePtr readCacheableString(DataInput& input, int lenObj); static std::atomic<int32_t> m_transactionId; - static TcrMessagePing* m_pingMsg; - static TcrMessage* m_closeConnMsg; - static TcrMessage* m_allEPDisconnected; static uint8_t* m_keepalive; const static int m_flag_empty; const static int m_flag_concurrency_checks; @@ -531,13 +532,14 @@ class CPPCACHE_EXPORT TcrMessage { CacheableStringPtr exceptionMessage; - // Disallow copy constructor and assignment operator. - TcrMessage(const TcrMessage&); - TcrMessage& operator=(const TcrMessage&); + TcrMessage(const TcrMessage&) = delete; + TcrMessage& operator=(const TcrMessage&) = delete; // some private methods to handle things internally. - void handleByteArrayResponse(const char* bytearray, int32_t len, - uint16_t endpointMemId); + void handleByteArrayResponse( + const char* bytearray, int32_t len, uint16_t endpointMemId, + const SerializationRegistry& serializationRegistry, + MemberListForVersionStamp& memberListForVersionStamp); void readObjectPart(DataInput& input, bool defaultString = false); void readFailedNodePart(DataInput& input, bool defaultString = false); void readCallbackObjectPart(DataInput& input, bool defaultString = false); @@ -547,7 +549,8 @@ class CPPCACHE_EXPORT TcrMessage { void readLongPart(DataInput& input, uint64_t* intValue); bool readExceptionPart(DataInput& input, uint8_t isLastChunk, bool skipFirstPart = true); - void readVersionTag(DataInput& input, uint16_t endpointMemId); + void readVersionTag(DataInput& input, uint16_t endpointMemId, + MemberListForVersionStamp& memberListForVersionStamp); void readOldValue(DataInput& input); void readPrMetaData(DataInput& input); void writeObjectPart(const SerializablePtr& se, bool isDelta = false, @@ -578,7 +581,7 @@ class CPPCACHE_EXPORT TcrMessage { CacheableHashSetPtr& value); DSMemberForVersionStampPtr readDSMember( apache::geode::client::DataInput& input); - DataOutput* m_request; + std::unique_ptr<DataOutput> m_request; int32_t m_msgType; int32_t m_msgLength; int32_t m_msgTypeRequest; // the msgType of the request if this TcrMessage is @@ -622,7 +625,7 @@ class CPPCACHE_EXPORT TcrMessage { std::map<std::string, int>* m_cqs; int32_t m_messageResponseTimeout; bool m_boolValue; - DataInput* m_delta; + std::unique_ptr<DataInput> m_delta; uint8_t* m_deltaBytes; int32_t m_deltaBytesLen; bool m_isCallBackArguement; @@ -641,425 +644,568 @@ class CPPCACHE_EXPORT TcrMessage { class TcrMessageDestroyRegion : public TcrMessage { public: - TcrMessageDestroyRegion(const Region* region, + TcrMessageDestroyRegion(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const UserDataPtr& aCallbackArgument, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageDestroyRegion() {} + + private: }; class TcrMessageClearRegion : public TcrMessage { public: - TcrMessageClearRegion(const Region* region, + TcrMessageClearRegion(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const UserDataPtr& aCallbackArgument, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageClearRegion() {} + + private: }; class TcrMessageQuery : public TcrMessage { public: - TcrMessageQuery(const std::string& regionName, int messageResponsetimeout, + TcrMessageQuery(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageQuery() {} + + private: }; class TcrMessageStopCQ : public TcrMessage { public: - TcrMessageStopCQ(const std::string& regionName, int messageResponsetimeout, + TcrMessageStopCQ(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageStopCQ() {} + + private: }; class TcrMessageCloseCQ : public TcrMessage { public: - TcrMessageCloseCQ(const std::string& regionName, int messageResponsetimeout, + TcrMessageCloseCQ(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageCloseCQ() {} + + private: }; class TcrMessageQueryWithParameters : public TcrMessage { public: - TcrMessageQueryWithParameters(const std::string& regionName, + TcrMessageQueryWithParameters(std::unique_ptr<DataOutput> dataOutput, + const std::string& regionName, const UserDataPtr& aCallbackArgument, CacheableVectorPtr paramList, int messageResponsetimeout, ThinClientBaseDM* connectionDM); virtual ~TcrMessageQueryWithParameters() {} + + private: }; class TcrMessageContainsKey : public TcrMessage { public: - TcrMessageContainsKey(const Region* region, const CacheableKeyPtr& key, + TcrMessageContainsKey(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, bool isContainsKey, ThinClientBaseDM* connectionDM); virtual ~TcrMessageContainsKey() {} + + private: }; class TcrMessageGetDurableCqs : public TcrMessage { public: - TcrMessageGetDurableCqs(ThinClientBaseDM* connectionDM); + TcrMessageGetDurableCqs(std::unique_ptr<DataOutput> dataOutput, + ThinClientBaseDM* connectionDM); virtual ~TcrMessageGetDurableCqs() {} + + private: }; class TcrMessageRequest : public TcrMessage { public: - TcrMessageRequest(const Region* region, const CacheableKeyPtr& key, + TcrMessageRequest(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageRequest() {} + + private: }; class TcrMessageInvalidate : public TcrMessage { public: - TcrMessageInvalidate(const Region* region, const CacheableKeyPtr& key, + TcrMessageInvalidate(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM = nullptr); + + private: }; class TcrMessageDestroy : public TcrMessage { public: - TcrMessageDestroy(const Region* region, const CacheableKeyPtr& key, + TcrMessageDestroy(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const CacheableKeyPtr& key, const CacheablePtr& value, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM = nullptr); + + private: }; class TcrMessageRegisterInterestList : public TcrMessage { public: TcrMessageRegisterInterestList( - const Region* region, const VectorOfCacheableKey& keys, - bool isDurable = false, bool isCachingEnabled = false, - bool receiveValues = true, + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const VectorOfCacheableKey& keys, bool isDurable = false, + bool isCachingEnabled = false, bool receiveValues = true, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageRegisterInterestList() {} + + private: }; class TcrMessageUnregisterInterestList : public TcrMessage { public: TcrMessageUnregisterInterestList( - const Region* region, const VectorOfCacheableKey& keys, - bool isDurable = false, bool isCachingEnabled = false, - bool receiveValues = true, + std::unique_ptr<DataOutput> dataOutput, const Region* region, + const VectorOfCacheableKey& keys, bool isDurable = false, + bool isCachingEnabled = false, bool receiveValues = true, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageUnregisterInterestList() {} + + private: }; class TcrMessagePut : public TcrMessage { public: - TcrMessagePut(const Region* region, const CacheableKeyPtr& key, - const CacheablePtr& value, const UserDataPtr& aCallbackArgument, - bool isDelta = false, ThinClientBaseDM* connectionDM = nullptr, + TcrMessagePut(std::unique_ptr<DataOutput> dataOutput, const Region* region, + const CacheableKeyPtr& key, const CacheablePtr& value, + const UserDataPtr& aCallbackArgument, bool isDelta = false, + ThinClientBaseDM* connectionDM = nullptr, bool isMetaRegion = false, bool fullValueAfterDeltaFail = false, const char* regionName = nullptr); virtual ~TcrMessagePut() {} + + private: }; class TcrMessageCreateRegion : public TcrMessage { public: TcrMessageCreateRegion( - const std::string& str1, const std::string& str2, + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageCreateRegion() {} + + private: }; class TcrMessageRegisterInterest : public TcrMessage { public: TcrMessageRegisterInterest( - const std::string& str1, const std::string& str2, + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageRegisterInterest() {} + + private: }; class TcrMessageUnregisterInterest : public TcrMessage { public: TcrMessageUnregisterInterest( - const std::string& str1, const std::string& str2, + std::unique_ptr<DataOutput> dataOutput, const std::string& str1, + const std::string& str2, InterestResultPolicy interestPolicy = InterestResultPolicy::NONE, bool isDurable = false, bool isCachingEnabled = false, bool receiveValues = true, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageUnregisterInterest() {} + + private: }; class TcrMessageTxSynchronization : public TcrMessage { public: - TcrMessageTxSynchronization(int ordinal, int txid, int status); + TcrMessageTxSynchronization(std::unique_ptr<DataOutput> dataOutput, + int ordinal, int txid, int status); virtual ~TcrMessageTxSynchronization() {} + + private: }; class TcrMessageClientReady : public TcrMessage { public: - TcrMessageClientReady(); + TcrMessageClientReady(std::unique_ptr<DataOutput> dataOutput); virtual ~TcrMessageClientReady() {} + + private: }; class TcrMessageCommit : public TcrMessage { public: - TcrMessageCommit(); + TcrMessageCommit(std::unique_ptr<DataOutput> dataOutput); virtual ~TcrMessageCommit() {} + + private: }; class TcrMessageRollback : public TcrMessage { public: - TcrMessageRollback(); + TcrMessageRollback(std::unique_ptr<DataOutput> dataOutput); virtual ~TcrMessageRollback() {} + + private: }; class TcrMessageTxFailover : public TcrMessage { public: - TcrMessageTxFailover(); + TcrMessageTxFailover(std::unique_ptr<DataOutput> dataOutput); virtual ~TcrMessageTxFailover() {} + + private: }; class TcrMessageMakePrimary : public TcrMessage { public: - TcrMessageMakePrimary(bool processedMarker); + TcrMessageMakePrimary(std::unique_ptr<DataOutput> dataOutput, + bool processedMarker); virtual ~TcrMessageMakePrimary() {} + + private: }; class TcrMessagePutAll : public TcrMessage { public: - TcrMessagePutAll(const Region* region, const HashMapOfCacheable& map, - int messageResponsetimeout, ThinClientBaseDM* connectionDM, + TcrMessagePutAll(std::unique_ptr<DataOutput> dataOutput, const Region* region, + const HashMapOfCacheable& map, int messageResponsetimeout, + ThinClientBaseDM* connectionDM, const UserDataPtr& aCallbackArgument); virtual ~TcrMessagePutAll() {} + + private: }; class TcrMessageRemoveAll : public TcrMessage { public: - TcrMessageRemoveAll(const Region* region, const VectorOfCacheableKey& keys, + TcrMessageRemoveAll(std::unique_ptr<DataOutput> dataOutput, + const Region* region, const VectorOfCacheableKey& keys, const UserDataPtr& aCallbackArgument, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageRemoveAll() {} + + private: }; class TcrMessageExecuteCq : public TcrMessage { public: - TcrMessageExecuteCq(const std::string& str1, const std::string& str2, + TcrMessageExecuteCq(std::unique_ptr<DataOutput> dataOutput, + const std::string& str1, const std::string& str2, int state, bool isDurable, ThinClientBaseDM* connectionDM); virtual ~TcrMessageExecuteCq() {} + + private: }; class TcrMessageExecuteCqWithIr : public TcrMessage { public: - TcrMessageExecuteCqWithIr(const std::string& str1, const std::string& str2, + TcrMessageExecuteCqWithIr(std::unique_ptr<DataOutput> dataOutput, + const std::string& str1, const std::string& str2, int state, bool isDurable, ThinClientBaseDM* connectionDM); virtual ~TcrMessageExecuteCqWithIr() {} + + private: }; class TcrMessageExecuteRegionFunction : public TcrMessage { public: TcrMessageExecuteRegionFunction( - const std::string& funcName, const Region* region, - const CacheablePtr& args, CacheableVectorPtr routingObj, - uint8_t getResult, CacheableHashSetPtr failedNodes, int32_t timeout, + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + const Region* region, const CacheablePtr& args, + CacheableVectorPtr routingObj, uint8_t getResult, + CacheableHashSetPtr failedNodes, int32_t timeout, ThinClientBaseDM* connectionDM = nullptr, int8_t reExecute = 0); virtual ~TcrMessageExecuteRegionFunction() {} + + private: }; class TcrMessageExecuteRegionFunctionSingleHop : public TcrMessage { public: TcrMessageExecuteRegionFunctionSingleHop( - const std::string& funcName, const Region* region, - const CacheablePtr& args, CacheableHashSetPtr routingObj, - uint8_t getResult, CacheableHashSetPtr failedNodes, bool allBuckets, - int32_t timeout, ThinClientBaseDM* connectionDM); + std::unique_ptr<DataOutput> dataOutput, const std::string& funcName, + const Region* region, const CacheablePtr& args, + CacheableHashSetPtr routingObj, uint8_t getResult, + CacheableHashSetPtr failedNodes, bool allBuckets, int32_t timeout, + ThinClientBaseDM* connectionDM); virtual ~TcrMessageExecuteRegionFunctionSingleHop() {} + + private: }; class TcrMessageGetClientPartitionAttributes : public TcrMessage { public: - TcrMessageGetClientPartitionAttributes(const char* regionName); + TcrMessageGetClientPartitionAttributes(std::unique_ptr<DataOutput> dataOutput, + const char* regionName); virtual ~TcrMessageGetClientPartitionAttributes() {} + + private: }; class TcrMessageGetClientPrMetadata : public TcrMessage { public: - TcrMessageGetClientPrMetadata(const char* regionName); + TcrMessageGetClientPrMetadata(std::unique_ptr<DataOutput> dataOutput, + const char* regionName); virtual ~TcrMessageGetClientPrMetadata() {} + + private: }; class TcrMessageSize : public TcrMessage { public: - TcrMessageSize(const char* regionName); + TcrMessageSize(std::unique_ptr<DataOutput> dataOutput, + const char* regionName); virtual ~TcrMessageSize() {} + + private: }; class TcrMessageUserCredential : public TcrMessage { public: - TcrMessageUserCredential(PropertiesPtr creds, + TcrMessageUserCredential(std::unique_ptr<DataOutput> dataOutput, + PropertiesPtr creds, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageUserCredential() {} + + private: }; class TcrMessageRemoveUserAuth : public TcrMessage { public: - TcrMessageRemoveUserAuth(bool keepAlive, ThinClientBaseDM* connectionDM); + TcrMessageRemoveUserAuth(std::unique_ptr<DataOutput> dataOutput, + bool keepAlive, ThinClientBaseDM* connectionDM); virtual ~TcrMessageRemoveUserAuth() {} + + private: }; class TcrMessageGetPdxIdForType : public TcrMessage { public: - TcrMessageGetPdxIdForType(const CacheablePtr& pdxType, + TcrMessageGetPdxIdForType(std::unique_ptr<DataOutput> dataOutput, + const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0); virtual ~TcrMessageGetPdxIdForType() {} + + private: }; class TcrMessageAddPdxType : public TcrMessage { public: - TcrMessageAddPdxType(const CacheablePtr& pdxType, + TcrMessageAddPdxType(std::unique_ptr<DataOutput> dataOutput, + const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0); virtual ~TcrMessageAddPdxType() {} + + private: }; class TcrMessageGetPdxIdForEnum : public TcrMessage { public: - TcrMessageGetPdxIdForEnum(const CacheablePtr& pdxType, + TcrMessageGetPdxIdForEnum(std::unique_ptr<DataOutput> dataOutput, + const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0); virtual ~TcrMessageGetPdxIdForEnum() {} + + private: }; class TcrMessageAddPdxEnum : public TcrMessage { public: - TcrMessageAddPdxEnum(const CacheablePtr& pdxType, + TcrMessageAddPdxEnum(std::unique_ptr<DataOutput> dataOutput, + const CacheablePtr& pdxType, ThinClientBaseDM* connectionDM, int32_t pdxTypeId = 0); virtual ~TcrMessageAddPdxEnum() {} + + private: }; class TcrMessageGetPdxTypeById : public TcrMessage { public: - TcrMessageGetPdxTypeById(int32_t typeId, ThinClientBaseDM* connectionDM); + TcrMessageGetPdxTypeById(std::unique_ptr<DataOutput> dataOutput, + int32_t typeId, ThinClientBaseDM* connectionDM); virtual ~TcrMessageGetPdxTypeById() {} + + private: }; class TcrMessageGetPdxEnumById : public TcrMessage { public: - TcrMessageGetPdxEnumById(int32_t typeId, ThinClientBaseDM* connectionDM); + TcrMessageGetPdxEnumById(std::unique_ptr<DataOutput> dataOutput, + int32_t typeId, ThinClientBaseDM* connectionDM); virtual ~TcrMessageGetPdxEnumById() {} + + private: }; class TcrMessageGetFunctionAttributes : public TcrMessage { public: - TcrMessageGetFunctionAttributes(const std::string& funcName, + TcrMessageGetFunctionAttributes(std::unique_ptr<DataOutput> dataOutput, + const std::string& funcName, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageGetFunctionAttributes() {} + + private: }; class TcrMessageKeySet : public TcrMessage { public: - TcrMessageKeySet(const std::string& funcName, + TcrMessageKeySet(std::unique_ptr<DataOutput> dataOutput, + const std::string& funcName, ThinClientBaseDM* connectionDM = nullptr); virtual ~TcrMessageKeySet() {} + + private: }; class TcrMessageRequestEventValue : public TcrMessage { public: - TcrMessageRequestEventValue(EventIdPtr eventId); + TcrMessageRequestEventValue(std::unique_ptr<DataOutput> dataOutput, + EventIdPtr eventId); virtual ~TcrMessageRequestEventValue() {} + + private: }; class TcrMessagePeriodicAck : public TcrMessage { public: - TcrMessagePeriodicAck(const EventIdMapEntryList& entries); + TcrMessagePeriodicAck(std::unique_ptr<DataOutput> dataOutput, + const EventIdMapEntryList& entries); virtual ~TcrMessagePeriodicAck() {} + + private: }; class TcrMessageUpdateClientNotification : public TcrMessage { public: - TcrMessageUpdateClientNotification(int32_t port); + TcrMessageUpdateClientNotification(std::unique_ptr<DataOutput> dataOutput, + int32_t port); virtual ~TcrMessageUpdateClientNotification() {} + + private: }; class TcrMessageGetAll : public TcrMessage { public: - TcrMessageGetAll(const Region* region, const VectorOfCacheableKey* keys, + TcrMessageGetAll(std::unique_ptr<DataOutput> dataOutput, const Region* region, + const VectorOfCacheableKey* keys, ThinClientBaseDM* connectionDM = nullptr, const UserDataPtr& aCallbackArgument = nullptr); virtual ~TcrMessageGetAll() {} + + private: }; class TcrMessageExecuteFunction : public TcrMessage { public: - TcrMessageExecuteFunction(const std::string& funcName, + TcrMessageExecuteFunction(std::unique_ptr<DataOutput> dataOutput, + const std::string& funcName, const CacheablePtr& args, uint8_t getResult, ThinClientBaseDM* connectionDM, int32_t timeout); virtual ~TcrMessageExecuteFunction() {} + + private: }; class TcrMessagePing : public TcrMessage { public: - TcrMessagePing(bool decodeAll); + TcrMessagePing(std::unique_ptr<DataOutput> dataOutput, bool decodeAll); virtual ~TcrMessagePing() {} + + private: }; class TcrMessageCloseConnection : public TcrMessage { public: - TcrMessageCloseConnection(bool decodeAll); + TcrMessageCloseConnection(std::unique_ptr<DataOutput> dataOutput, + bool decodeAll); virtual ~TcrMessageCloseConnection() {} + + private: }; class TcrMessageClientMarker : public TcrMessage { public: - TcrMessageClientMarker(bool decodeAll); + TcrMessageClientMarker(std::unique_ptr<DataOutput> dataOutput, + bool decodeAll); virtual ~TcrMessageClientMarker() {} + + private: }; class TcrMessageReply : public TcrMessage {
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TcrPoolEndPoint.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TcrPoolEndPoint.cpp b/src/cppcache/src/TcrPoolEndPoint.cpp index c88554d..9021fcd 100644 --- a/src/cppcache/src/TcrPoolEndPoint.cpp +++ b/src/cppcache/src/TcrPoolEndPoint.cpp @@ -64,13 +64,12 @@ GfErrType TcrPoolEndPoint::registerDM(bool clientNotification, bool isSecondary, GfErrType err = GF_NOERR; ACE_Guard<ACE_Recursive_Thread_Mutex> _guard(m_dm->getPoolLock()); ACE_Guard<ACE_Recursive_Thread_Mutex> guardQueueHosted(getQueueHostedMutex()); - + auto& sysProp = m_cacheImpl->getDistributedSystem().getSystemProperties(); if (!connected()) { TcrConnection* newConn; - if ((err = createNewConnection( - newConn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout(), 0, - connected())) != GF_NOERR) { + if ((err = createNewConnection(newConn, false, false, + sysProp.connectTimeout(), 0, connected())) != + GF_NOERR) { setConnected(false); return err; } @@ -85,10 +84,9 @@ GfErrType TcrPoolEndPoint::registerDM(bool clientNotification, bool isSecondary, name().c_str()); if (m_numRegionListener == 0) { - if ((err = createNewConnection( - m_notifyConnection, true, isSecondary, - DistributedSystem::getSystemProperties()->connectTimeout() * 3, - 0)) != GF_NOERR) { + if ((err = createNewConnection(m_notifyConnection, true, isSecondary, + sysProp.connectTimeout() * 3, 0)) != + GF_NOERR) { setConnected(false); LOGWARN("Failed to start subscription channel for endpoint %s", name().c_str()); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientBaseDM.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientBaseDM.cpp b/src/cppcache/src/ThinClientBaseDM.cpp index 0291c5a..adf882e 100644 --- a/src/cppcache/src/ThinClientBaseDM.cpp +++ b/src/cppcache/src/ThinClientBaseDM.cpp @@ -43,19 +43,21 @@ ThinClientBaseDM::ThinClientBaseDM(TcrConnectionManager& connManager, ThinClientBaseDM::~ThinClientBaseDM() {} void ThinClientBaseDM::init() { - if (!DistributedSystem::getSystemProperties()->isGridClient()) { - // start the chunk processing thread - if (!DistributedSystem::getSystemProperties() - ->disableChunkHandlerThread()) { - startChunkProcessor(); - } + const auto& systemProperties = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + if (!(systemProperties.isGridClient() && + systemProperties.disableChunkHandlerThread())) { + startChunkProcessor(); } m_initDone = true; } bool ThinClientBaseDM::isSecurityOn() { - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); - return sysProp->isSecurityOn(); + return m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .isSecurityOn(); } void ThinClientBaseDM::destroy(bool keepalive) { http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientBaseDM.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientBaseDM.hpp b/src/cppcache/src/ThinClientBaseDM.hpp index ad10981..b3d3f74 100644 --- a/src/cppcache/src/ThinClientBaseDM.hpp +++ b/src/cppcache/src/ThinClientBaseDM.hpp @@ -124,8 +124,10 @@ class ThinClientBaseDM { LOGFINE("Delta enabled on server: %s", s_isDeltaEnabledOnServer ? "true" : "false"); } - TcrConnectionManager& getConnectionManager() { return m_connManager; } + TcrConnectionManager& getConnectionManager() const { return m_connManager; } + virtual size_t getNumberOfEndPoints() const { return 0; } + bool isNotAuthorizedException(const char* exceptionMsg) { if (exceptionMsg != nullptr && strstr(exceptionMsg, @@ -138,6 +140,7 @@ class ThinClientBaseDM { } return false; } + bool isPutAllPartialResultException(const char* exceptionMsg) { if (exceptionMsg != nullptr && strstr( http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientCacheDistributionManager.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientCacheDistributionManager.hpp b/src/cppcache/src/ThinClientCacheDistributionManager.hpp index 1d859a9..20a5fb8 100644 --- a/src/cppcache/src/ThinClientCacheDistributionManager.hpp +++ b/src/cppcache/src/ThinClientCacheDistributionManager.hpp @@ -48,8 +48,8 @@ class CPPCACHE_EXPORT ThinClientCacheDistributionManager GfErrType sendRequestToPrimary(TcrMessage& request, TcrMessageReply& reply); protected: - bool preFailoverAction(); - bool postFailoverAction(TcrEndpoint* endpoint); + virtual bool preFailoverAction(); + virtual bool postFailoverAction(TcrEndpoint* endpoint); private: // Disallow default/copy constructor and assignment operator. http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientDistributionManager.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientDistributionManager.cpp b/src/cppcache/src/ThinClientDistributionManager.cpp index bc97b96..5fd092d 100644 --- a/src/cppcache/src/ThinClientDistributionManager.cpp +++ b/src/cppcache/src/ThinClientDistributionManager.cpp @@ -306,31 +306,23 @@ bool ThinClientDistributionManager::postFailoverAction(TcrEndpoint* endpoint) { } PropertiesPtr ThinClientDistributionManager::getCredentials(TcrEndpoint* ep) { - PropertiesPtr tmpSecurityProperties = - DistributedSystem::getSystemProperties()->getSecurityProperties(); + const auto& distributedSystem = + m_connManager.getCacheImpl()->getDistributedSystem(); + const auto& tmpSecurityProperties = + distributedSystem.getSystemProperties().getSecurityProperties(); - AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader(); - - if (authInitialize != nullptr) { + if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) { LOGFINER( "ThinClientDistributionManager::getCredentials: acquired handle to " "authLoader, " "invoking getCredentials %s", ep->name().c_str()); - /* adongre - * CID 28900: Copy into fixed size buffer (STRING_OVERFLOW) - * You might overrun the 100 byte fixed-size string "tmpEndpoint" by copying - * the return - * value of "stlp_std::basic_string<char, stlp_std::char_traits<char>, - * stlp_std::allocator<char> >::c_str() const" without checking the - * length. - */ - // char tmpEndpoint[100] = { '\0' } ; - // strcpy(tmpEndpoint, ep->name().c_str()); - PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials( - tmpSecurityProperties, /*tmpEndpoint*/ ep->name().c_str()); + const auto& tmpAuthIniSecurityProperties = authInitialize->getCredentials( + tmpSecurityProperties, ep->name().c_str()); + LOGFINER("Done getting credentials"); return tmpAuthIniSecurityProperties; } + return nullptr; } @@ -340,7 +332,9 @@ GfErrType ThinClientDistributionManager::sendUserCredentials( GfErrType err = GF_NOERR; - TcrMessageUserCredential request(credentials, this); + TcrMessageUserCredential request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), credentials, + this); TcrMessageReply reply(true, this); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientHARegion.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientHARegion.cpp b/src/cppcache/src/ThinClientHARegion.cpp index d59366e..5fd63fb 100644 --- a/src/cppcache/src/ThinClientHARegion.cpp +++ b/src/cppcache/src/ThinClientHARegion.cpp @@ -43,7 +43,9 @@ void ThinClientHARegion::initTCR() { try { bool isPool = m_attribute->getPoolName() != nullptr && strlen(m_attribute->getPoolName()) > 0; - if (DistributedSystem::getSystemProperties()->isGridClient()) { + if (m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .isGridClient()) { LOGWARN( "Region: HA region having notification channel created for grid " "client; force starting required notification, cleanup and " @@ -61,7 +63,10 @@ void ThinClientHARegion::initTCR() { m_tcrdm->init(); } else { m_tcrdm = dynamic_cast<ThinClientPoolHADM*>( - PoolManager::find(m_attribute->getPoolName()).get()); + m_cacheImpl->getCache() + ->getPoolManager() + .find(m_attribute->getPoolName()) + .get()); if (m_tcrdm) { m_poolDM = true; // Pool DM should only be inited once and it @@ -109,7 +114,7 @@ void ThinClientHARegion::handleMarker() { if (m_listener != nullptr && !m_processedMarker) { RegionEvent event(shared_from_this(), nullptr, false); - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); try { m_listener->afterRegionLive(event); } catch (const Exception& ex) { @@ -118,11 +123,9 @@ void ThinClientHARegion::handleMarker() { } catch (...) { LOGERROR("Unknown exception in CacheListener::afterRegionLive"); } - m_cacheImpl->m_cacheStats->incListenerCalls(); - Utils::updateStatOpTime( - m_regionStats->getStat(), - RegionStatType::getInstance()->getListenerCallTimeId(), - sampleStartNanos); + m_cacheImpl->getCachePerfStats().incListenerCalls(); + updateStatOpTime(m_regionStats->getStat(), + m_regionStats->getListenerCallTimeId(), sampleStartNanos); m_regionStats->incListenerCallsCompleted(); } m_processedMarker = true; @@ -155,7 +158,8 @@ void ThinClientHARegion::addDisMessToQueue() { if (poolDM->m_redundancyManager->m_globalProcessedMarker && !m_processedMarker) { - TcrMessage* regionMsg = new TcrMessageClientMarker(true); + TcrMessage* regionMsg = + new TcrMessageClientMarker(m_cache->createDataOutput(), true); receiveNotification(regionMsg); } } @@ -164,7 +168,8 @@ void ThinClientHARegion::addDisMessToQueue() { GfErrType ThinClientHARegion::getNoThrow_FullObject(EventIdPtr eventId, CacheablePtr& fullObject, VersionTagPtr& versionTag) { - TcrMessageRequestEventValue fullObjectMsg(eventId); + TcrMessageRequestEventValue fullObjectMsg(m_cache->createDataOutput(), + eventId); TcrMessageReply reply(true, nullptr); ThinClientPoolHADM* poolHADM = dynamic_cast<ThinClientPoolHADM*>(m_tcrdm); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientLocatorHelper.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientLocatorHelper.cpp b/src/cppcache/src/ThinClientLocatorHelper.cpp index bb67290..59075f5 100644 --- a/src/cppcache/src/ThinClientLocatorHelper.cpp +++ b/src/cppcache/src/ThinClientLocatorHelper.cpp @@ -65,8 +65,15 @@ Connector* ThinClientLocatorHelper::createConnection(Connector*& conn, uint32_t waitSeconds, int32_t maxBuffSizePool) { Connector* socket = nullptr; - if (DistributedSystem::getSystemProperties()->sslEnabled()) { - socket = new TcpSslConn(hostname, port, waitSeconds, maxBuffSizePool); + auto& systemProperties = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + if (m_poolDM && systemProperties.sslEnabled()) { + socket = new TcpSslConn(hostname, port, waitSeconds, maxBuffSizePool, + systemProperties.sslTrustStore(), + systemProperties.sslKeyStore(), + systemProperties.sslKeystorePassword()); } else { socket = new TcpConn(hostname, port, waitSeconds, maxBuffSizePool); } @@ -78,6 +85,11 @@ Connector* ThinClientLocatorHelper::createConnection(Connector*& conn, GfErrType ThinClientLocatorHelper::getAllServers( std::vector<ServerLocation>& servers, const std::string& serverGrp) { ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock); + + auto& sysProps = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); for (unsigned i = 0; i < m_locHostPort.size(); i++) { ServerLocation loc = m_locHostPort[i]; try { @@ -89,15 +101,14 @@ GfErrType ThinClientLocatorHelper::getAllServers( } Connector* conn = nullptr; ConnectionWrapper cw(conn); - createConnection( - conn, loc.getServerName().c_str(), loc.getPort(), - DistributedSystem::getSystemProperties()->connectTimeout(), buffSize); + createConnection(conn, loc.getServerName().c_str(), loc.getPort(), + sysProps.connectTimeout(), buffSize); GetAllServersRequest request(serverGrp); - DataOutput data; - data.writeInt((int32_t)1001); // GOSSIPVERSION - data.writeObject(&request); + auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + data->writeInt((int32_t)1001); // GOSSIPVERSION + data->writeObject(&request); int sentLength = conn->send( - (char*)(data.getBuffer()), data.getBufferLength(), + (char*)(data->getBuffer()), data->getBufferLength(), m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 : 10 * 1000 * 1000, 0); @@ -117,23 +128,23 @@ GfErrType ThinClientLocatorHelper::getAllServers( continue; } - DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength); + auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + reinterpret_cast<uint8_t*>(buff), receivedLength); GetAllServersResponsePtr response(nullptr); /* adongre * SSL Enabled on Location and not in the client */ int8_t acceptanceCode; - di.read(&acceptanceCode); - if (acceptanceCode == REPLY_SSL_ENABLED && - !DistributedSystem::getSystemProperties()->sslEnabled()) { + di->read(&acceptanceCode); + if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) { LOGERROR("SSL is enabled on locator, enable SSL in client as well"); throw AuthenticationRequiredException( "SSL is enabled on locator, enable SSL in client as well"); } - di.rewindCursor(1); + di->rewindCursor(1); - di.readObject(response); + di->readObject(response); servers = response->getServers(); return GF_NOERR; } catch (const AuthenticationRequiredException&) { @@ -154,6 +165,10 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn( /*const std::set<TcrEndpoint*>& exclEndPts,*/ const std::string& serverGrp) { ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock); + auto& sysProps = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); int locatorsRetry = 3; if (m_poolDM) { int poolRetry = m_poolDM->getRetryAttempts(); @@ -183,20 +198,17 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn( } Connector* conn = nullptr; ConnectionWrapper cw(conn); - createConnection( - conn, loc.getServerName().c_str(), loc.getPort(), - DistributedSystem::getSystemProperties()->connectTimeout(), buffSize); + createConnection(conn, loc.getServerName().c_str(), loc.getPort(), + sysProps.connectTimeout(), buffSize); QueueConnectionRequest request(memId, exclEndPts, redundancy, false, serverGrp); - DataOutput data; - data.writeInt((int32_t)1001); // GOSSIPVERSION - data.writeObject(&request); + auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + data->writeInt((int32_t)1001); // GOSSIPVERSION + data->writeObject(&request); int sentLength = conn->send( - (char*)(data.getBuffer()), data.getBufferLength(), - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + (char*)(data->getBuffer()), data->getBufferLength(), + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); if (sentLength <= 0) { // conn->close(); delete conn; conn = nullptr; @@ -205,32 +217,30 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewCallBackConn( char buff[BUFF_SIZE]; int receivedLength = conn->receive( buff, BUFF_SIZE, - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); // conn->close(); // delete conn; conn = nullptr; if (receivedLength <= 0) { continue; } - DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength); + auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + reinterpret_cast<uint8_t*>(buff), receivedLength); QueueConnectionResponsePtr response(nullptr); /* adongre * ssl defect */ int8_t acceptanceCode; - di.read(&acceptanceCode); - if (acceptanceCode == REPLY_SSL_ENABLED && - !DistributedSystem::getSystemProperties()->sslEnabled()) { + di->read(&acceptanceCode); + if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) { LOGERROR("SSL is enabled on locator, enable SSL in client as well"); throw AuthenticationRequiredException( "SSL is enabled on locator, enable SSL in client as well"); } - di.rewindCursor(1); - di.readObject(response); + di->rewindCursor(1); + di->readObject(response); outEndpoint = response->getServers(); return GF_NOERR; } catch (const AuthenticationRequiredException& excp) { @@ -251,6 +261,11 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn( bool locatorFound = false; int locatorsRetry = 3; ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock); + auto& sysProps = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + if (m_poolDM) { int poolRetry = m_poolDM->getRetryAttempts(); locatorsRetry = poolRetry <= 0 ? locatorsRetry : poolRetry; @@ -278,28 +293,25 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn( } Connector* conn = nullptr; ConnectionWrapper cw(conn); - createConnection( - conn, serLoc.getServerName().c_str(), serLoc.getPort(), - DistributedSystem::getSystemProperties()->connectTimeout(), buffSize); - DataOutput data; - data.writeInt(1001); // GOSSIPVERSION + createConnection(conn, serLoc.getServerName().c_str(), serLoc.getPort(), + sysProps.connectTimeout(), buffSize); + auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + data->writeInt(1001); // GOSSIPVERSION if (currentServer == nullptr) { LOGDEBUG("Creating ClientConnectionRequest"); ClientConnectionRequest request(exclEndPts, serverGrp); - data.writeObject(&request); + data->writeObject(&request); } else { LOGDEBUG("Creating ClientReplacementRequest for connection: ", currentServer->getEndpointObject()->name().c_str()); ClientReplacementRequest request( currentServer->getEndpointObject()->name(), exclEndPts, serverGrp); - data.writeObject(&request); + data->writeObject(&request); } int sentLength = conn->send( - (char*)(data.getBuffer()), data.getBufferLength(), - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + (char*)(data->getBuffer()), data->getBufferLength(), + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); if (sentLength <= 0) { // conn->close(); @@ -309,33 +321,31 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn( char buff[BUFF_SIZE]; int receivedLength = conn->receive( buff, BUFF_SIZE, - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); // conn->close(); // delete conn; if (receivedLength <= 0) { continue; // return GF_EUNDEF; } - DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength); + auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + reinterpret_cast<uint8_t*>(buff), receivedLength); ClientConnectionResponsePtr response; /* adongre * SSL is enabled on locator and not in the client */ int8_t acceptanceCode; - di.read(&acceptanceCode); - if (acceptanceCode == REPLY_SSL_ENABLED && - !DistributedSystem::getSystemProperties()->sslEnabled()) { + di->read(&acceptanceCode); + if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) { LOGERROR("SSL is enabled on locator, enable SSL in client as well"); throw AuthenticationRequiredException( "SSL is enabled on locator, enable SSL in client as well"); } - di.rewindCursor(1); + di->rewindCursor(1); - di.readObject(response); + di->readObject(response); response->printInfo(); if (!response->serverFound()) { LOGFINE("Server not found"); @@ -366,6 +376,11 @@ GfErrType ThinClientLocatorHelper::getEndpointForNewFwdConn( GfErrType ThinClientLocatorHelper::updateLocators( const std::string& serverGrp) { ACE_Guard<ACE_Thread_Mutex> guard(m_locatorLock); + auto& sysProps = m_poolDM->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + for (unsigned attempts = 0; attempts < m_locHostPort.size(); attempts++) { ServerLocation serLoc = m_locHostPort[attempts]; Connector* conn = nullptr; @@ -378,19 +393,16 @@ GfErrType ThinClientLocatorHelper::updateLocators( serLoc.getServerName().c_str(), serLoc.getPort(), serverGrp.c_str()); ConnectionWrapper cw(conn); - createConnection( - conn, serLoc.getServerName().c_str(), serLoc.getPort(), - DistributedSystem::getSystemProperties()->connectTimeout(), buffSize); + createConnection(conn, serLoc.getServerName().c_str(), serLoc.getPort(), + sysProps.connectTimeout(), buffSize); LocatorListRequest request(serverGrp); - DataOutput data; - data.writeInt((int32_t)1001); // GOSSIPVERSION - data.writeObject(&request); + auto data = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataOutput(); + data->writeInt((int32_t)1001); // GOSSIPVERSION + data->writeObject(&request); int sentLength = conn->send( - (char*)(data.getBuffer()), data.getBufferLength(), - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + (char*)(data->getBuffer()), data->getBufferLength(), + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); if (sentLength <= 0) { // conn->close(); @@ -401,33 +413,31 @@ GfErrType ThinClientLocatorHelper::updateLocators( char buff[BUFF_SIZE]; int receivedLength = conn->receive( buff, BUFF_SIZE, - m_poolDM - ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 - : DistributedSystem::getSystemProperties()->connectTimeout() * - 1000 * 1000, + m_poolDM ? (m_poolDM->getReadTimeout() / 1000) * 1000 * 1000 + : sysProps.connectTimeout() * 1000 * 1000, 0); // conn->close(); // delete conn; conn = nullptr; if (receivedLength <= 0) { continue; } - DataInput di(reinterpret_cast<uint8_t*>(buff), receivedLength); + auto di = m_poolDM->getConnectionManager().getCacheImpl()->getCache()->createDataInput( + reinterpret_cast<uint8_t*>(buff), receivedLength); auto response = std::make_shared<LocatorListResponse>(); /* adongre * SSL Enabled on Location and not in the client */ int8_t acceptanceCode; - di.read(&acceptanceCode); - if (acceptanceCode == REPLY_SSL_ENABLED && - !DistributedSystem::getSystemProperties()->sslEnabled()) { + di->read(&acceptanceCode); + if (acceptanceCode == REPLY_SSL_ENABLED && !sysProps.sslEnabled()) { LOGERROR("SSL is enabled on locator, enable SSL in client as well"); throw AuthenticationRequiredException( "SSL is enabled on locator, enable SSL in client as well"); } - di.rewindCursor(1); + di->rewindCursor(1); - di.readObject(response); + di->readObject(response); std::vector<ServerLocation> locators = response->getLocators(); if (locators.size() > 0) { RandGen randGen; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolDM.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolDM.cpp b/src/cppcache/src/ThinClientPoolDM.cpp index 8205ba6..3950e5c 100644 --- a/src/cppcache/src/ThinClientPoolDM.cpp +++ b/src/cppcache/src/ThinClientPoolDM.cpp @@ -35,9 +35,6 @@ using namespace apache::geode::client; using namespace apache::geode::statistics; -ExpiryTaskManager* getCacheImplExpiryTaskManager(); -void removePool(const char*); - /* adongre * CID 28730: Other violation (MISSING_COPY) * Class "GetAllWork" owns resources that are managed in its constructor and @@ -81,7 +78,8 @@ class GetAllWork : public PooledWork<GfErrType>, m_keys(keys), m_region(region), m_aCallbackArgument(aCallbackArgument) { - m_request = new TcrMessageGetAll(region.get(), m_keys.get(), m_poolDM, + m_request = new TcrMessageGetAll(region->getCache()->createDataOutput(), + region.get(), m_keys.get(), m_poolDM, m_aCallbackArgument); m_reply = new TcrMessageReply(true, m_poolDM); if (m_poolDM->isMultiUserMode()) { @@ -159,17 +157,19 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name, if (firstGurd) ClientProxyMembershipID::increaseSynchCounter(); firstGurd = true; - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); + auto& distributedSystem = + m_connManager.getCacheImpl()->getDistributedSystem(); + + auto& sysProp = distributedSystem.getSystemProperties(); // to set security flag at pool level - this->m_isSecurityOn = sysProp->isSecurityOn(); + this->m_isSecurityOn = sysProp.isSecurityOn(); ACE_TCHAR hostName[256]; ACE_OS::hostname(hostName, sizeof(hostName) - 1); ACE_INET_Addr driver(hostName); uint32_t hostAddr = driver.get_ip_address(); uint16_t hostPort = 0; - const char* durableId = - (sysProp != nullptr) ? sysProp->durableClientId() : nullptr; + const char* durableId = sysProp.durableClientId(); std::string poolSeparator = "_gem_"; std::string clientDurableId = @@ -179,10 +179,11 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name, ? (poolSeparator + m_poolName) : ""); - const uint32_t durableTimeOut = - (sysProp != nullptr) ? sysProp->durableTimeout() : 0; - m_memId = new ClientProxyMembershipID( - hostName, hostAddr, hostPort, clientDurableId.c_str(), durableTimeOut); + const uint32_t durableTimeOut = sysProp.durableTimeout(); + m_memId = + m_connManager.getCacheImpl()->getClientProxyMembershipIDFactory().create( + hostName, hostAddr, hostPort, clientDurableId.c_str(), + durableTimeOut); if (m_attrs->m_initLocList.size() == 0 && m_attrs->m_initServList.size() == 0) { @@ -193,9 +194,12 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name, reset(); m_locHelper = new ThinClientLocatorHelper(m_attrs->m_initLocList, this); - m_stats = new PoolStats(m_poolName.c_str()); + auto statisticsManager = distributedSystem.getStatisticsManager(); + m_stats = + new PoolStats(statisticsManager->getStatisticsFactory(), m_poolName); + statisticsManager->forceSample(); - if (!sysProp->isEndpointShufflingDisabled()) { + if (!sysProp.isEndpointShufflingDisabled()) { if (m_attrs->m_initServList.size() > 0) { RandGen randgen; m_server = randgen(static_cast<uint32_t>(m_attrs->m_initServList.size())); @@ -210,26 +214,26 @@ ThinClientPoolDM::ThinClientPoolDM(const char* name, void ThinClientPoolDM::init() { LOGDEBUG("ThinClientPoolDM::init: Starting pool initialization"); - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); + auto& sysProp = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); m_isMultiUserMode = this->getMultiuserAuthentication(); if (m_isMultiUserMode) { LOGINFO("Multiuser authentication is enabled for pool %s", m_poolName.c_str()); } // to set security flag at pool level - this->m_isSecurityOn = sysProp->isSecurityOn(); + this->m_isSecurityOn = sysProp.isSecurityOn(); LOGDEBUG("ThinClientPoolDM::init: security in on/off = %d ", this->m_isSecurityOn); m_connManager.init(true); - SystemProperties* props = DistributedSystem::getSystemProperties(); - LOGDEBUG("ThinClientPoolDM::init: is grid client = %d ", - props->isGridClient()); + sysProp.isGridClient()); - if (!props->isGridClient()) { + if (!sysProp.isGridClient()) { ThinClientPoolDM::startBackgroundThreads(); } @@ -237,20 +241,22 @@ void ThinClientPoolDM::init() { } PropertiesPtr ThinClientPoolDM::getCredentials(TcrEndpoint* ep) { - PropertiesPtr tmpSecurityProperties = - DistributedSystem::getSystemProperties()->getSecurityProperties(); + const auto& distributedSystem = + m_connManager.getCacheImpl()->getDistributedSystem(); + const auto& tmpSecurityProperties = + distributedSystem.getSystemProperties().getSecurityProperties(); - AuthInitializePtr authInitialize = DistributedSystem::m_impl->getAuthLoader(); - - if (authInitialize != nullptr) { + if (const auto& authInitialize = distributedSystem.m_impl->getAuthLoader()) { LOGFINER( "ThinClientPoolDM::getCredentials: acquired handle to authLoader, " "invoking getCredentials %s", ep->name().c_str()); - PropertiesPtr tmpAuthIniSecurityProperties = authInitialize->getCredentials( + const auto& tmpAuthIniSecurityProperties = authInitialize->getCredentials( tmpSecurityProperties, ep->name().c_str()); + LOGFINER("Done getting credentials"); return tmpAuthIniSecurityProperties; } + return nullptr; } @@ -260,9 +266,11 @@ void ThinClientPoolDM::startBackgroundThreads() { NC_Ping_Thread); m_pingTask->start(); - SystemProperties* props = DistributedSystem::getSystemProperties(); + auto& props = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); - if (props->onClientDisconnectClearPdxTypeIds() == true) { + if (props.onClientDisconnectClearPdxTypeIds() == true) { m_cliCallbackTask = new Task<ThinClientPoolDM>(this, &ThinClientPoolDM::cliCallback); m_cliCallbackTask->start(); @@ -277,8 +285,9 @@ void ThinClientPoolDM::startBackgroundThreads() { LOGDEBUG( "ThinClientPoolDM::startBackgroundThreads: Scheduling ping task at %ld", pingInterval); - m_pingTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask( - pingHandler, 1, pingInterval, false); + m_pingTaskId = + m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( + pingHandler, 1, pingInterval, false); } else { LOGDEBUG( "ThinClientPoolDM::startBackgroundThreads: Not Scheduling ping task as " @@ -306,7 +315,7 @@ void ThinClientPoolDM::startBackgroundThreads() { "task at %ld", updateLocatorListInterval); m_updateLocatorListTaskId = - getCacheImplExpiryTaskManager()->scheduleExpiryTask( + m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( updateLocatorListHandler, 1, updateLocatorListInterval, false); } @@ -337,8 +346,9 @@ void ThinClientPoolDM::startBackgroundThreads() { LOGDEBUG( "ThinClientPoolDM::startBackgroundThreads: Scheduling " "manageConnections task"); - m_connManageTaskId = getCacheImplExpiryTaskManager()->scheduleExpiryTask( - connHandler, 1, idle / 1000 + 1, false); + m_connManageTaskId = + m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( + connHandler, 1, idle / 1000 + 1, false); } LOGDEBUG( @@ -352,7 +362,7 @@ void ThinClientPoolDM::startBackgroundThreads() { LOGDEBUG( "ThinClientPoolDM::startBackgroundThreads: Starting pool stat sampler"); if (m_PoolStatsSampler == nullptr && getStatisticInterval() > -1 && - DistributedSystem::getSystemProperties()->statisticsEnabled()) { + props.statisticsEnabled()) { m_PoolStatsSampler = new PoolStatsSampler( getStatisticInterval() / 1000 + 1, m_connManager.getCacheImpl(), this); m_PoolStatsSampler->start(); @@ -470,7 +480,7 @@ void ThinClientPoolDM::cleanStaleConnections(volatile bool& isRunning) { } } if (m_connManageTaskId >= 0 && isRunning && - getCacheImplExpiryTaskManager()->resetTask( + m_connManager.getCacheImpl()->getExpiryTaskManager().resetTask( m_connManageTaskId, static_cast<uint32_t>(_nextIdle.sec() + 1))) { LOGERROR("Failed to reschedule connection manager"); } else { @@ -619,7 +629,7 @@ GfErrType ThinClientPoolDM::sendRequestToAllServers( int feIndex = 0; FunctionExecution* fePtrList = new FunctionExecution[csArray->length()]; - ThreadPool* threadPool = TPSingleton::instance(); + auto* threadPool = m_connManager.getCacheImpl()->getThreadPool(); UserAttributesPtr userAttr = TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes(); for (int i = 0; i < csArray->length(); i++) { @@ -746,7 +756,8 @@ void ThinClientPoolDM::stopPingThread() { m_pingTask->wait(); GF_SAFE_DELETE(m_pingTask); if (m_pingTaskId >= 0) { - getCacheImplExpiryTaskManager()->cancelTask(m_pingTaskId); + m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask( + m_pingTaskId); } } } @@ -759,7 +770,8 @@ void ThinClientPoolDM::stopUpdateLocatorListThread() { m_updateLocatorListTask->wait(); GF_SAFE_DELETE(m_updateLocatorListTask); if (m_updateLocatorListTaskId >= 0) { - getCacheImplExpiryTaskManager()->cancelTask(m_updateLocatorListTaskId); + m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask( + m_updateLocatorListTaskId); } } } @@ -798,7 +810,8 @@ void ThinClientPoolDM::destroy(bool keepAlive) { m_connManageTask->wait(); GF_SAFE_DELETE(m_connManageTask); if (m_connManageTaskId >= 0) { - getCacheImplExpiryTaskManager()->cancelTask(m_connManageTaskId); + m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask( + m_connManageTaskId); } } @@ -827,12 +840,17 @@ void ThinClientPoolDM::destroy(bool keepAlive) { // Close Stats getStats().close(); + m_connManager.getCacheImpl() + ->getDistributedSystem() + .getStatisticsManager() + ->forceSample(); if (m_clientMetadataService != nullptr) { GF_SAFE_DELETE(m_clientMetadataService); } - removePool(m_poolName.c_str()); + m_connManager.getCacheImpl()->getCache()->getPoolManager().removePool( + m_poolName.c_str()); stopChunkProcessor(); m_manager->closeAllStickyConnections(); @@ -868,9 +886,11 @@ QueryServicePtr ThinClientPoolDM::getQueryServiceWithoutCheck() { if (!(m_remoteQueryServicePtr == nullptr)) { return m_remoteQueryServicePtr; } - SystemProperties* props = DistributedSystem::getSystemProperties(); + auto& props = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); - if (props->isGridClient()) { + if (props.isGridClient()) { LOGWARN("Initializing query service while grid-client setting is enabled."); // Init Query Service m_remoteQueryServicePtr = std::make_shared<RemoteQueryService>( @@ -895,7 +915,9 @@ void ThinClientPoolDM::sendUserCacheCloseMessage(bool keepAlive) { for (it = uca.begin(); it != uca.end(); it++) { UserConnectionAttributes* uca = (*it).second; if (uca->isAuthenticated() && uca->getEndpoint()->connected()) { - TcrMessageRemoveUserAuth request(keepAlive, this); + TcrMessageRemoveUserAuth request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), + keepAlive, this); TcrMessageReply reply(true, this); sendRequestToEP(request, reply, uca->getEndpoint()); @@ -927,7 +949,9 @@ int32_t ThinClientPoolDM::GetPDXIdForType(SerializablePtr pdxType) { GfErrType err = GF_NOERR; - TcrMessageGetPdxIdForType request(pdxType, this); + TcrMessageGetPdxIdForType request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), pdxType, + this); TcrMessageReply reply(true, this); @@ -946,7 +970,9 @@ int32_t ThinClientPoolDM::GetPDXIdForType(SerializablePtr pdxType) { // need to broadcast this id to all other pool { - for (const auto& iter : PoolManager::getAll()) { + auto& poolManager = + m_connManager.getCacheImpl()->getCache()->getPoolManager(); + for (const auto& iter : poolManager.getAll()) { auto currPool = static_cast<ThinClientPoolDM*>(iter.second.get()); if (currPool != this) { @@ -963,7 +989,9 @@ void ThinClientPoolDM::AddPdxType(SerializablePtr pdxType, int32_t pdxTypeId) { GfErrType err = GF_NOERR; - TcrMessageAddPdxType request(pdxType, this, pdxTypeId); + TcrMessageAddPdxType request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), pdxType, + this, pdxTypeId); TcrMessageReply reply(true, this); @@ -983,7 +1011,9 @@ SerializablePtr ThinClientPoolDM::GetPDXTypeById(int32_t typeId) { GfErrType err = GF_NOERR; - TcrMessageGetPdxTypeById request(typeId, this); + TcrMessageGetPdxTypeById request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), typeId, + this); TcrMessageReply reply(true, this); @@ -1005,7 +1035,9 @@ int32_t ThinClientPoolDM::GetEnumValue(SerializablePtr enumInfo) { GfErrType err = GF_NOERR; - TcrMessageGetPdxIdForEnum request(enumInfo, this); + TcrMessageGetPdxIdForEnum request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), enumInfo, + this); TcrMessageReply reply(true, this); @@ -1024,7 +1056,9 @@ int32_t ThinClientPoolDM::GetEnumValue(SerializablePtr enumInfo) { // need to broadcast this id to all other pool { - for (const auto& iter : PoolManager::getAll()) { + auto& poolManager = + m_connManager.getCacheImpl()->getCache()->getPoolManager(); + for (const auto& iter : poolManager.getAll()) { const auto& currPool = std::dynamic_pointer_cast<ThinClientPoolDM>(iter.second); @@ -1042,7 +1076,8 @@ SerializablePtr ThinClientPoolDM::GetEnum(int32_t val) { GfErrType err = GF_NOERR; - TcrMessageGetPdxEnumById request(val, this); + TcrMessageGetPdxEnumById request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), val, this); TcrMessageReply reply(true, this); @@ -1064,7 +1099,9 @@ void ThinClientPoolDM::AddEnum(SerializablePtr enumInfo, int enumVal) { GfErrType err = GF_NOERR; - TcrMessageAddPdxEnum request(enumInfo, this, enumVal); + TcrMessageAddPdxEnum request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), enumInfo, + this, enumVal); TcrMessageReply reply(true, this); @@ -1087,7 +1124,9 @@ GfErrType ThinClientPoolDM::sendUserCredentials(PropertiesPtr credentials, GfErrType err = GF_NOERR; - TcrMessageUserCredential request(credentials, this); + TcrMessageUserCredential request( + m_connManager.getCacheImpl()->getCache()->createDataOutput(), credentials, + this); TcrMessageReply reply(true, this); @@ -1257,7 +1296,7 @@ GfErrType ThinClientPoolDM::sendSyncRequest(TcrMessage& request, nullptr); } std::vector<GetAllWork*> getAllWorkers; - ThreadPool* threadPool = TPSingleton::instance(); + auto* threadPool = m_connManager.getCacheImpl()->getThreadPool(); ChunkedGetAllResponse* responseHandler = static_cast<ChunkedGetAllResponse*>(reply.getChunkedResultHandler()); @@ -1719,10 +1758,12 @@ GfErrType ThinClientPoolDM::createPoolConnectionToAEndPoint( "connection to the endpoint %s", theEP->name().c_str()); // if the pool size is within limits, create a new connection. - error = theEP->createNewConnection( - conn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout(), false, true, - appThreadrequest); + error = theEP->createNewConnection(conn, false, false, + m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .connectTimeout(), + false, true, appThreadrequest); if (conn == nullptr || error != GF_NOERR) { LOGFINE("2Failed to connect to %s", theEP->name().c_str()); if (conn != nullptr) GF_SAFE_DELETE(conn); @@ -1801,9 +1842,12 @@ GfErrType ThinClientPoolDM::createPoolConnection( conn->updateCreationTime(); break; } else { - error = ep->createNewConnection( - conn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout(), false); + error = ep->createNewConnection(conn, false, false, + m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .connectTimeout(), + false); } if (conn == nullptr || error != GF_NOERR) { @@ -1854,14 +1898,20 @@ TcrConnection* ThinClientPoolDM::getConnectionFromQueue( getStats().incWaitingConnections(); /*get the start time for connectionWaitTime stat*/ - int64_t sampleStartNanos = Utils::startStatOpTime(); + bool enableTimeStatistics = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .getEnableTimeStatistics(); + int64_t sampleStartNanos = + enableTimeStatistics ? Utils::startStatOpTime() : 0; TcrConnection* mp = getUntil(timeoutTime, error, excludeServers, maxConnLimit); /*Update the time stat for clientOpsTime */ - Utils::updateStatOpTime( - getStats().getStats(), - PoolStatType::getInstance()->getTotalWaitingConnTimeId(), - sampleStartNanos); + if (enableTimeStatistics) { + Utils::updateStatOpTime(getStats().getStats(), + getStats().getTotalWaitingConnTimeId(), + sampleStartNanos); + } return mp; } @@ -1892,9 +1942,13 @@ GfErrType ThinClientPoolDM::sendRequestToEP(const TcrMessage& request, LOGDEBUG( "ThinClientPoolDM::sendRequestToEP(): couldnt create a pool " "connection, creating a temporary connection."); - error = currentEndpoint->createNewConnection( - conn, false, false, - DistributedSystem::getSystemProperties()->connectTimeout(), false); + error = + currentEndpoint->createNewConnection(conn, false, false, + m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .connectTimeout(), + false); putConnInPool = false; currentEndpoint->setConnectionStatus(true); } @@ -2076,7 +2130,7 @@ int ThinClientPoolDM::updateLocatorList(volatile bool& isRunning) { LOGFINE("Starting updateLocatorList thread for pool %s", m_poolName.c_str()); while (isRunning) { m_updateLocatorListSema.acquire(); - if (isRunning && !TcrConnectionManager::isNetDown) { + if (isRunning && !m_connManager.isNetDown()) { ((ThinClientLocatorHelper*)m_locHelper) ->updateLocators(this->getServerGroup()); } @@ -2089,7 +2143,7 @@ int ThinClientPoolDM::pingServer(volatile bool& isRunning) { LOGFINE("Starting ping thread for pool %s", m_poolName.c_str()); while (isRunning) { m_pingSema.acquire(); - if (isRunning && !TcrConnectionManager::isNetDown) { + if (isRunning && !m_connManager.isNetDown()) { pingServerLocal(); while (m_pingSema.tryacquire() != -1) { ; @@ -2107,9 +2161,10 @@ int ThinClientPoolDM::cliCallback(volatile bool& isRunning) { if (isRunning) { LOGFINE("Clearing Pdx Type Registry"); // this call for csharp client - DistributedSystemImpl::CallCliCallBack(); + DistributedSystemImpl::CallCliCallBack( + *(m_connManager.getCacheImpl()->getCache())); // this call for cpp client - PdxTypeRegistry::clear(); + m_connManager.getCacheImpl()->getPdxTypeRegistry()->clear(); while (m_cliCallbackSema.tryacquire() != -1) { ; } @@ -2317,7 +2372,8 @@ void ThinClientPoolDM::updateNotificationStats(bool isDeltaSuccess, GfErrType ThinClientPoolDM::doFailover(TcrConnection* conn) { m_manager->setStickyConnection(conn, true); - TcrMessageTxFailover request; + TcrMessageTxFailover request( + m_connManager.getCacheImpl()->getCache()->createDataOutput()); TcrMessageReply reply(true, nullptr); GfErrType err = this->sendSyncRequest(request, reply); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolDM.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolDM.hpp b/src/cppcache/src/ThinClientPoolDM.hpp index 73802f7..1da32d8 100644 --- a/src/cppcache/src/ThinClientPoolDM.hpp +++ b/src/cppcache/src/ThinClientPoolDM.hpp @@ -119,16 +119,13 @@ class ThinClientPoolDM virtual ~ThinClientPoolDM() { destroy(); - GF_SAFE_DELETE(m_memId); GF_SAFE_DELETE(m_locHelper); GF_SAFE_DELETE(m_stats); GF_SAFE_DELETE(m_clientMetadataService); GF_SAFE_DELETE(m_manager); } // void updateQueue(const char* regionPath) ; - ClientProxyMembershipID* getMembershipId() { - return (ClientProxyMembershipID*)m_memId; - } + ClientProxyMembershipID* getMembershipId() { return m_memId.get(); } virtual void processMarker(){}; virtual bool checkDupAndAdd(EventIdPtr eventid) { return m_connManager.checkDupAndAdd(eventid); @@ -386,7 +383,8 @@ class ThinClientPoolDM std::string selectEndpoint(std::set<ServerLocation>&, const TcrConnection* currentServer = nullptr); - volatile ClientProxyMembershipID* m_memId; + // TODO global - m_memId was volatile + std::unique_ptr<ClientProxyMembershipID> m_memId; virtual TcrEndpoint* createEP(const char* endpointName) { return new TcrPoolEndPoint(endpointName, m_connManager.getCacheImpl(), m_connManager.m_failoverSema, @@ -491,7 +489,11 @@ class FunctionExecution : public PooledWork<GfErrType> { if (m_userAttr != nullptr) gua.setProxyCache(m_userAttr->getProxyCache()); std::string funcName(m_func); - TcrMessageExecuteFunction request(funcName, m_args, m_getResult, m_poolDM, + TcrMessageExecuteFunction request(m_poolDM->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + funcName, m_args, m_getResult, m_poolDM, m_timeout); TcrMessageReply reply(true, m_poolDM); ChunkedFunctionExecutionResponse* resultProcessor( @@ -509,29 +511,7 @@ class FunctionExecution : public PooledWork<GfErrType> { m_error = m_poolDM->handleEPError(m_ep, reply, m_error); if (m_error != GF_NOERR) { if (m_error == GF_NOTCON || m_error == GF_IOERR) { - /* - ==25848== 650 (72 direct, 578 indirect) bytes in 2 blocks are definitely - lost in loss record 184 of 218 - ==25848== at 0x4007D75: operator new(unsigned int) - (vg_replace_malloc.c:313) - ==25848== by 0x439BD41: - apache::geode::client::FunctionExecution::execute() - (ThinClientPoolDM.hpp:417) - ==25848== by 0x439A5A1: - apache::geode::client::PooledWork<GfErrType>::call() - (ThreadPool.hpp:25) - ==25848== by 0x43C335F: - apache::geode::client::ThreadPoolWorker::svc() - (ThreadPool.cpp:43) - ==25848== by 0x440521D: ACE_6_1_0::ACE_Task_Base::svc_run(void*) (in - /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) - ==25848== by 0x441E16A: ACE_6_1_0::ACE_Thread_Adapter::invoke_i() (in - /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) - ==25848== by 0x441E307: ACE_6_1_0::ACE_Thread_Adapter::invoke() (in - /export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) - ==25848== by 0x8CFA48: start_thread (in /lib/libpthread-2.12.so) - ==25848== by 0x34BE1D: clone (in /lib/libc-2.12.so) - */ + delete resultProcessor; resultProcessor = nullptr; return GF_NOERR; // if server is unavailable its not an error for @@ -543,26 +523,7 @@ class FunctionExecution : public PooledWork<GfErrType> { if (reply.getMessageType() == TcrMessage::EXCEPTION) { exceptionPtr = CacheableString::create(reply.getException()); } - /** - * ==13294== 48,342 (1,656 direct, 46,686 indirect) bytes in 46 blocks are -definitely lost in loss record 241 of 244 -==13294== at 0x4007D75: operator new(unsigned int) (vg_replace_malloc.c:313) -==13294== by 0x439BE11: apache::geode::client::FunctionExecution::execute() -(ThinClientPoolDM.hpp:417) -==13294== by 0x439A671: apache::geode::client::PooledWork<GfErrType>::call() -(ThreadPool.hpp:25) -==13294== by 0x43C33FF: apache::geode::client::ThreadPoolWorker::svc() -(ThreadPool.cpp:43) -==13294== by 0x44052BD: ACE_6_1_0::ACE_Task_Base::svc_run(void*) (in -/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) -==13294== by 0x441E20A: ACE_6_1_0::ACE_Thread_Adapter::invoke_i() (in -/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) -==13294== by 0x441E3A7: ACE_6_1_0::ACE_Thread_Adapter::invoke() (in -/export/pnq-gst-dev01a/users/adongre/cedar_dev_Nov12/build-artifacts/linux/product/lib/libgfcppcache.so) -==13294== by 0x8CFA48: start_thread (in /lib/libpthread-2.12.so) -==13294== by 0x34BE1D: clone (in /lib/libc-2.12.so) - * - */ + delete resultProcessor; resultProcessor = nullptr; return m_error; @@ -573,13 +534,7 @@ definitely lost in loss record 241 of 244 exceptionPtr = CacheableString::create(reply.getException()); } if (resultProcessor->getResult() == true) { - // CacheableVectorPtr values = - // resultProcessor->getFunctionExecutionResults(); - // ACE_Guard< ACE_Recursive_Thread_Mutex > guard( - // *m_resultCollectorLock ); - // //(*m_rc)->addResult(values); - // ExecutionImpl::addResults(*m_rc,values); - // resultProcessor->reset(); + } delete resultProcessor; resultProcessor = nullptr; @@ -629,6 +584,10 @@ class OnRegionFunctionExecution : public PooledWork<GfErrType> { std::string funcName(m_func); m_request = new TcrMessageExecuteRegionFunctionSingleHop( + m_poolDM->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), funcName, m_region, m_args, m_routingObj, m_getResult, nullptr, m_allBuckets, timeout, m_poolDM); m_reply = new TcrMessageReply(true, m_poolDM); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolHADM.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolHADM.cpp b/src/cppcache/src/ThinClientPoolHADM.cpp index b18aeeb..6c44296 100644 --- a/src/cppcache/src/ThinClientPoolHADM.cpp +++ b/src/cppcache/src/ThinClientPoolHADM.cpp @@ -40,9 +40,11 @@ void ThinClientPoolHADM::init() { } void ThinClientPoolHADM::startBackgroundThreads() { - SystemProperties* props = DistributedSystem::getSystemProperties(); + auto& props = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); - if (props->isGridClient()) { + if (props.isGridClient()) { LOGWARN("Starting background threads and ignoring grid-client setting"); ThinClientPoolDM::startBackgroundThreads(); } @@ -55,10 +57,11 @@ void ThinClientPoolHADM::startBackgroundThreads() { ACE_Event_Handler* redundancyChecker = new ExpiryHandler_T<ThinClientPoolHADM>( this, &ThinClientPoolHADM::checkRedundancy); - int32_t redundancyMonitorInterval = props->redundancyMonitorInterval(); + int32_t redundancyMonitorInterval = props.redundancyMonitorInterval(); - m_servermonitorTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask( - redundancyChecker, 1, redundancyMonitorInterval, false); + m_servermonitorTaskId = + m_connManager.getCacheImpl()->getExpiryTaskManager().scheduleExpiryTask( + redundancyChecker, 1, redundancyMonitorInterval, false); LOGFINE( "ThinClientPoolHADM::ThinClientPoolHADM Registered server " "monitor task with id = %ld, interval = %ld", @@ -133,7 +136,7 @@ GfErrType ThinClientPoolHADM::sendSyncRequestCq(TcrMessage& request, bool ThinClientPoolHADM::preFailoverAction() { return true; } bool ThinClientPoolHADM::postFailoverAction(TcrEndpoint* endpoint) { - m_theTcrConnManager.triggerRedundancyThread(); + m_connManager.triggerRedundancyThread(); return true; } @@ -141,7 +144,7 @@ int ThinClientPoolHADM::redundancy(volatile bool& isRunning) { LOGFINE("ThinClientPoolHADM: Starting maintain redundancy thread."); while (isRunning) { m_redundancySema.acquire(); - if (isRunning && !TcrConnectionManager::isNetDown) { + if (isRunning && !m_connManager.isNetDown()) { m_redundancyManager->maintainRedundancyLevel(); while (m_redundancySema.tryacquire() != -1) { ; @@ -183,7 +186,8 @@ void ThinClientPoolHADM::destroy(bool keepAlive) { void ThinClientPoolHADM::sendNotificationCloseMsgs() { if (m_redundancyTask) { if (m_servermonitorTaskId >= 0) { - CacheImpl::expiryTaskManager->cancelTask(m_servermonitorTaskId); + m_connManager.getCacheImpl()->getExpiryTaskManager().cancelTask( + m_servermonitorTaskId); } m_redundancyTask->stopNoblock(); m_redundancySema.release(); @@ -193,21 +197,6 @@ void ThinClientPoolHADM::sendNotificationCloseMsgs() { } } -/* -void ThinClientPoolHADM::stopNotificationThreads() -{ - ACE_Guard< ACE_Recursive_Thread_Mutex > guard( m_endpointsLock ); - for( ACE_Map_Manager< std::string, TcrEndpoint *, ACE_Recursive_Thread_Mutex ->::iterator it = m_endpoints.begin(); it != m_endpoints.end(); it++){ - ((*it).int_id_)->stopNoBlock(); - } - for( ACE_Map_Manager< std::string, TcrEndpoint *, ACE_Recursive_Thread_Mutex ->::iterator it = m_endpoints.begin(); it != m_endpoints.end(); it++){ - ((*it).int_id_)->stopNotifyReceiverAndCleanup(); - } -} -*/ - GfErrType ThinClientPoolHADM::registerInterestAllRegions( TcrEndpoint* ep, const TcrMessage* request, TcrMessageReply* reply) { GfErrType err = GF_NOERR; @@ -248,12 +237,14 @@ void ThinClientPoolHADM::removeRegion(ThinClientRegion* theTCR) { } void ThinClientPoolHADM::readyForEvents() { - if (!DistributedSystem::getSystemProperties()->autoReadyForEvents()) { + auto& sysProp = m_connManager.getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); + if (!sysProp.autoReadyForEvents()) { init(); } - const char* durable = - DistributedSystem::getSystemProperties()->durableClientId(); + const char* durable = sysProp.durableClientId(); if (durable != nullptr && strlen(durable) > 0) { m_redundancyManager->readyForEvents(); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolHADM.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolHADM.hpp b/src/cppcache/src/ThinClientPoolHADM.hpp index 9783293..ab5dc78 100644 --- a/src/cppcache/src/ThinClientPoolHADM.hpp +++ b/src/cppcache/src/ThinClientPoolHADM.hpp @@ -110,8 +110,8 @@ class ThinClientPoolHADM : public ThinClientPoolDM { // Disallow copy constructor and assignment operator. ThinClientRedundancyManager* m_redundancyManager; ThinClientPoolHADM(const ThinClientPoolHADM&); - ThinClientPoolHADM& operator=(const ThinClientPoolHADM&); - // const char* m_name; // COVERITY -> 30305 Uninitialized pointer field + ThinClientPoolHADM& operator=(const ThinClientPoolHADM&) = delete; + TcrConnectionManager& m_theTcrConnManager; ACE_Semaphore m_redundancySema; Task<ThinClientPoolHADM>* m_redundancyTask; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolRegion.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolRegion.cpp b/src/cppcache/src/ThinClientPoolRegion.cpp index 834520c..b888e1c 100644 --- a/src/cppcache/src/ThinClientPoolRegion.cpp +++ b/src/cppcache/src/ThinClientPoolRegion.cpp @@ -40,7 +40,10 @@ ThinClientPoolRegion::~ThinClientPoolRegion() { m_tcrdm = nullptr; } void ThinClientPoolRegion::initTCR() { try { ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>( - PoolManager::find(m_regionAttributes->getPoolName()).get()); + getCache() + ->getPoolManager() + .find(m_regionAttributes->getPoolName()) + .get()); m_tcrdm = dynamic_cast<ThinClientBaseDM*>(poolDM); if (!m_tcrdm) { // TODO: create a PoolNotFound exception.