http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqQueryImpl.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqQueryImpl.cpp b/src/cppcache/src/CqQueryImpl.cpp index b221f95..ffb603e 100644 --- a/src/cppcache/src/CqQueryImpl.cpp +++ b/src/cppcache/src/CqQueryImpl.cpp @@ -31,7 +31,7 @@ CqQueryImpl::CqQueryImpl(const CqServicePtr& cqService, const std::string& cqName, const std::string& queryString, const CqAttributesPtr& cqAttributes, - const bool isDurable, + StatisticsFactory* factory, const bool isDurable, const UserAttributesPtr& userAttributesPtr) : m_cqName(cqName), m_queryString(queryString), @@ -39,11 +39,8 @@ CqQueryImpl::CqQueryImpl(const CqServicePtr& cqService, m_serverCqName( cqName), // On Client Side serverCqName and cqName will be same. m_isDurable(isDurable), - m_stats(new CqQueryVsdStats(m_cqName.c_str())), + m_stats(std::make_shared<CqQueryVsdStats>(factory, m_cqName)), m_cqState(CqState::STOPPED), // Initial state is stopped - /* adongre - * CID 28930: Uninitialized scalar field (UNINIT_CTOR) - */ m_cqOperation(CqOperation::OP_TYPE_INVALID), m_tccdm(m_cqService->getDM()) { CqAttributesFactory cqAf(cqAttributes); @@ -259,7 +256,12 @@ GfErrType CqQueryImpl::execute(TcrEndpoint* endpoint) { LOGFINE("Executing CQ [%s]", m_cqName.c_str()); - TcrMessageExecuteCq request(m_cqName, m_queryString, CqState::RUNNING, + TcrMessageExecuteCq request(m_cqService->getDM() + ->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_cqName, m_queryString, CqState::RUNNING, isDurable(), m_tccdm); TcrMessageReply reply(true, m_tccdm); @@ -324,7 +326,12 @@ bool CqQueryImpl::executeCq(TcrMessage::MsgType requestType) { } LOGDEBUG("CqQueryImpl::executeCq"); - TcrMessageExecuteCq msg(m_cqName, m_queryString, CqState::RUNNING, + TcrMessageExecuteCq msg(m_cqService->getDM() + ->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_cqName, m_queryString, CqState::RUNNING, isDurable(), m_tccdm); TcrMessageReply reply(true, m_tccdm); @@ -368,7 +375,12 @@ CqResultsPtr CqQueryImpl::executeWithInitialResults(uint32_t timeout) { "CqQuery::executeWithInitialResults: cq is already running"); } // QueryResult values; - TcrMessageExecuteCqWithIr msg(m_cqName, m_queryString, CqState::RUNNING, + TcrMessageExecuteCqWithIr msg(m_cqService->getDM() + ->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_cqName, m_queryString, CqState::RUNNING, isDurable(), m_tccdm); TcrMessageReply reply(true, m_tccdm); auto resultCollector = (new ChunkedQueryResponse(reply)); @@ -458,10 +470,20 @@ void CqQueryImpl::sendStopOrClose(TcrMessage::MsgType requestType) { TcrMessageReply reply(true, m_tccdm); if (requestType == TcrMessage::STOPCQ_MSG_TYPE) { - TcrMessageStopCQ msg(m_cqName, -1, m_tccdm); + TcrMessageStopCQ msg(m_cqService->getDM() + ->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_cqName, -1, m_tccdm); err = m_tccdm->sendSyncRequest(msg, reply); } else if (requestType == TcrMessage::CLOSECQ_MSG_TYPE) { - TcrMessageCloseCQ msg(m_cqName, -1, m_tccdm); + TcrMessageCloseCQ msg(m_cqService->getDM() + ->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_cqName, -1, m_tccdm); err = m_tccdm->sendSyncRequest(msg, reply); }
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqQueryImpl.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqQueryImpl.hpp b/src/cppcache/src/CqQueryImpl.hpp index a4fcd04..8a7920a 100644 --- a/src/cppcache/src/CqQueryImpl.hpp +++ b/src/cppcache/src/CqQueryImpl.hpp @@ -94,7 +94,8 @@ class CqQueryImpl : public CqQuery, public: CqQueryImpl(const CqServicePtr& cqService, const std::string& cqName, const std::string& queryString, - const CqAttributesPtr& cqAttributes, const bool isDurable = false, + const CqAttributesPtr& cqAttributes, statistics::StatisticsFactory* factory, + const bool isDurable = false, const UserAttributesPtr& userAttributesPtr = nullptr); ~CqQueryImpl(); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqQueryVsdStats.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqQueryVsdStats.cpp b/src/cppcache/src/CqQueryVsdStats.cpp index ceb40a6..473165c 100644 --- a/src/cppcache/src/CqQueryVsdStats.cpp +++ b/src/cppcache/src/CqQueryVsdStats.cpp @@ -15,12 +15,6 @@ * limitations under the License. */ -#include <geode/geode_globals.hpp> - -#include "CqQueryVsdStats.hpp" -//#include "StatisticsFactory.hpp" - -#include <ace/Singleton.h> #include <mutex> @@ -28,8 +22,12 @@ const char* cqStatsName = "CqQueryStatistics"; const char* cqStatsDesc = "Statistics for this cq query"; +#include <ace/Thread_Mutex.h> +#include <ace/Singleton.h> -//////////////////////////////////////////////////////////////////////////////// +#include <geode/geode_globals.hpp> + +#include "CqQueryVsdStats.hpp" namespace apache { namespace geode { @@ -39,80 +37,38 @@ using statistics::StatisticsFactory; using util::concurrent::spinlock_mutex; using std::lock_guard; -//////////////////////////////////////////////////////////////////////////////// - -spinlock_mutex CqQueryStatType::m_statTypeLock; +constexpr const char* CqQueryVsdStats::STATS_NAME; +constexpr const char* CqQueryVsdStats::STATS_DESC; -StatisticsType* CqQueryStatType::getStatType() { - const bool largerIsBetter = true; - lock_guard<spinlock_mutex> guard(m_statTypeLock); - StatisticsFactory* factory = StatisticsFactory::getExistingInstance(); - GF_D_ASSERT(!!factory); - - StatisticsType* statsType = factory->findType("CqQueryStatistics"); - - if (statsType == nullptr) { - m_stats[0] = factory->createIntCounter( +CqQueryVsdStats::CqQueryVsdStats(StatisticsFactory* factory, + const std::string& cqqueryName) { + auto statsType = factory->findType(STATS_NAME); + if (!statsType) { + const bool largerIsBetter = true; + auto stats = new StatisticDescriptor*[4]; + stats[0] = factory->createIntCounter( "inserts", "The total number of inserts this cq qurey", "entries", largerIsBetter); - m_stats[1] = factory->createIntCounter( + stats[1] = factory->createIntCounter( "updates", "The total number of updates for this cq query", "entries", largerIsBetter); - m_stats[2] = factory->createIntCounter( + stats[2] = factory->createIntCounter( "deletes", "The total number of deletes for this cq query", "entries", largerIsBetter); - m_stats[3] = factory->createIntCounter( + stats[3] = factory->createIntCounter( "events", "The total number of events for this cq query", "entries", largerIsBetter); - statsType = factory->createType(cqStatsName, cqStatsDesc, m_stats, 4); - - m_numInsertsId = statsType->nameToId("inserts"); - m_numUpdatesId = statsType->nameToId("updates"); - m_numDeletesId = statsType->nameToId("deletes"); - m_numEventsId = statsType->nameToId("events"); + statsType = factory->createType(STATS_NAME, STATS_DESC, stats, 4); } - return statsType; -} - -CqQueryStatType& CqQueryStatType::getInstance() { - // C++11 initializes statics threads safe - static CqQueryStatType instance; - return instance; -} - -CqQueryStatType::CqQueryStatType() - : m_numInsertsId(0), - m_numUpdatesId(0), - m_numDeletesId(0), - m_numEventsId(0) { - memset(m_stats, 0, sizeof(m_stats)); -} - -//////////////////////////////////////////////////////////////////////////////// - -// typedef ACE_Singleton<CqQueryVsdStatsInit, ACE_Thread_Mutex> -// TheCqQueryVsdStatsInit; - -//////////////////////////////////////////////////////////////////////////////// - -CqQueryVsdStats::CqQueryVsdStats(const char* cqqueryName) { - auto& regStatType = CqQueryStatType::getInstance(); - - StatisticsType* statsType = regStatType.getStatType(); - - GF_D_ASSERT(statsType != nullptr); - - StatisticsFactory* factory = StatisticsFactory::getExistingInstance(); - - m_cqQueryVsdStats = factory->createAtomicStatistics( - statsType, const_cast<char*>(cqqueryName)); + m_cqQueryVsdStats = + factory->createAtomicStatistics(statsType, cqqueryName.c_str()); - m_numInsertsId = regStatType.getNumInsertsId(); - m_numUpdatesId = regStatType.getNumUpdatesId(); - m_numDeletesId = regStatType.getNumDeletesId(); - m_numEventsId = regStatType.getNumEventsId(); + m_numInsertsId = statsType->nameToId("inserts"); + m_numUpdatesId = statsType->nameToId("updates"); + m_numDeletesId = statsType->nameToId("deletes"); + m_numEventsId = statsType->nameToId("events"); m_cqQueryVsdStats->setInt(m_numInsertsId, 0); m_cqQueryVsdStats->setInt(m_numUpdatesId, 0); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqQueryVsdStats.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqQueryVsdStats.hpp b/src/cppcache/src/CqQueryVsdStats.hpp index 3cc5672..33beb02 100644 --- a/src/cppcache/src/CqQueryVsdStats.hpp +++ b/src/cppcache/src/CqQueryVsdStats.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_CQQUERYVSDSTATS_H_ -#define GEODE_CQQUERYVSDSTATS_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,13 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_CQQUERYVSDSTATS_H_ +#define GEODE_CQQUERYVSDSTATS_H_ + +#include <string> + #include <geode/geode_globals.hpp> #include <geode/statistics/Statistics.hpp> #include <geode/statistics/StatisticsFactory.hpp> @@ -39,7 +41,7 @@ using util::concurrent::spinlock_mutex; class CPPCACHE_EXPORT CqQueryVsdStats : public CqStatistics { public: /** hold statistics for a cq. */ - CqQueryVsdStats(const char* cqName); + CqQueryVsdStats(statistics::StatisticsFactory* factory, const std::string& cqqueryName); /** disable stat collection for this item. */ virtual ~CqQueryVsdStats(); @@ -74,39 +76,11 @@ class CPPCACHE_EXPORT CqQueryVsdStats : public CqStatistics { int32_t m_numUpdatesId; int32_t m_numDeletesId; int32_t m_numEventsId; -}; - -class CqQueryStatType { - private: - static spinlock_mutex m_statTypeLock; - - public: - static CqQueryStatType& getInstance(); - - StatisticsType* getStatType(); - - private: - CqQueryStatType(); - ~CqQueryStatType() = default; - CqQueryStatType(const CqQueryStatType&) = delete; - CqQueryStatType& operator=(const CqQueryStatType&) = delete; - - StatisticDescriptor* m_stats[4]; - - int32_t m_numInsertsId; - int32_t m_numUpdatesId; - int32_t m_numDeletesId; - int32_t m_numEventsId; - - public: - inline int32_t getNumInsertsId() { return m_numInsertsId; } - inline int32_t getNumUpdatesId() { return m_numUpdatesId; } - - inline int32_t getNumDeletesId() { return m_numDeletesId; } - - inline int32_t getNumEventsId() { return m_numEventsId; } + static constexpr const char* STATS_NAME = "CqQueryStatistics"; + static constexpr const char* STATS_DESC = "Statistics for this cq query"; }; + } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqService.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqService.cpp b/src/cppcache/src/CqService.cpp index d2d5e7f..e5cbeb9 100644 --- a/src/cppcache/src/CqService.cpp +++ b/src/cppcache/src/CqService.cpp @@ -14,6 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "CqService.hpp" #include "ReadWriteLock.hpp" #include <geode/DistributedSystem.hpp> @@ -26,10 +27,12 @@ #include <geode/CqStatusListener.hpp> using namespace apache::geode::client; -CqService::CqService(ThinClientBaseDM* tccdm) +CqService::CqService(ThinClientBaseDM* tccdm, + StatisticsFactory* statisticsFactory) : m_tccdm(tccdm), + m_statisticsFactory(statisticsFactory), m_notificationSema(1), - m_stats(std::make_shared<CqServiceVsdStats>()) { + m_stats(std::make_shared<CqServiceVsdStats>(m_statisticsFactory)) { m_cqQueryMap = new MapOfCqQueryWithLock(); m_running = true; LOGDEBUG("CqService Started"); @@ -100,9 +103,11 @@ CqQueryPtr CqService::newCq(const std::string& cqName, // check for durable client if (isDurable) { - auto sysProps = DistributedSystem::getSystemProperties(); - const auto durableID = - (sysProps != nullptr) ? sysProps->durableClientId() : nullptr; + const auto durableID = m_tccdm->getConnectionManager() + .getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .durableClientId(); if (durableID == nullptr || strlen(durableID) == 0) { LOGERROR("Cannot create durable CQ because client is not durable."); throw IllegalStateException( @@ -123,7 +128,8 @@ CqQueryPtr CqService::newCq(const std::string& cqName, } auto cQuery = std::make_shared<CqQueryImpl>( - shared_from_this(), cqName, queryString, cqAttributes, isDurable, ua); + shared_from_this(), cqName, queryString, cqAttributes, + m_statisticsFactory, isDurable, ua); cQuery->initCq(); return cQuery; } @@ -570,7 +576,11 @@ CqOperation::CqOperationType CqService::getOperation(int eventType) { * cqs. */ CacheableArrayListPtr CqService::getAllDurableCqsFromServer() { - TcrMessageGetDurableCqs msg(m_tccdm); + TcrMessageGetDurableCqs msg(m_tccdm->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + m_tccdm); TcrMessageReply reply(true, m_tccdm); // intialize the chunked response hadler for durable cqs list http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqService.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqService.hpp b/src/cppcache/src/CqService.hpp index 554abc7..88ff1a8 100644 --- a/src/cppcache/src/CqService.hpp +++ b/src/cppcache/src/CqService.hpp @@ -65,6 +65,7 @@ class CPPCACHE_EXPORT CqService public std::enable_shared_from_this<CqService> { private: ThinClientBaseDM* m_tccdm; + statistics::StatisticsFactory* m_statisticsFactory; ACE_Recursive_Thread_Mutex m_mutex; std::string m_queryString; ACE_Semaphore m_notificationSema; @@ -85,7 +86,7 @@ class CPPCACHE_EXPORT CqService /** * Constructor. */ - CqService(ThinClientBaseDM* tccdm); + CqService(ThinClientBaseDM* tccdm, statistics::StatisticsFactory* statisticsFactory); ThinClientBaseDM* getDM() { return m_tccdm; } void receiveNotification(TcrMessage* msg); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqServiceVsdStats.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqServiceVsdStats.cpp b/src/cppcache/src/CqServiceVsdStats.cpp index 08f331b..c4d5999 100644 --- a/src/cppcache/src/CqServiceVsdStats.cpp +++ b/src/cppcache/src/CqServiceVsdStats.cpp @@ -15,21 +15,13 @@ * limitations under the License. */ -#include <geode/geode_globals.hpp> - -#include "CqServiceVsdStats.hpp" -//#include "StatisticsFactory.hpp" - +#include <ace/Thread_Mutex.h> #include <ace/Singleton.h> - #include <mutex> +#include <geode/geode_globals.hpp> +#include <geode/statistics/StatisticsFactory.hpp> -#include "util/concurrent/spinlock_mutex.hpp" - -const char* cqServiceStatsName = "CqServiceStatistics"; -const char* cqServiceStatsDesc = "Statistics for this cq Service"; - -//////////////////////////////////////////////////////////////////////////////// +#include "CqServiceVsdStats.hpp" namespace apache { namespace geode { @@ -39,90 +31,47 @@ using statistics::StatisticsFactory; using util::concurrent::spinlock_mutex; using std::lock_guard; -//////////////////////////////////////////////////////////////////////////////// - -spinlock_mutex CqServiceStatType::m_statTypeLock; +constexpr const char* CqServiceVsdStats::STATS_NAME; +constexpr const char* CqServiceVsdStats::STATS_DESC; -StatisticsType* CqServiceStatType::getStatType() { - const bool largerIsBetter = true; - lock_guard<spinlock_mutex> guard(m_statTypeLock); - StatisticsFactory* factory = StatisticsFactory::getExistingInstance(); - GF_D_ASSERT(!!factory); - - StatisticsType* statsType = factory->findType("CqServiceStatistics"); - - if (statsType == nullptr) { - m_stats[0] = factory->createIntCounter( +CqServiceVsdStats::CqServiceVsdStats(StatisticsFactory* factory, + const std::string& cqServiceName) { + auto statsType = factory->findType(STATS_NAME); + if (!statsType) { + const bool largerIsBetter = true; + auto stats = new StatisticDescriptor*[5]; + stats[0] = factory->createIntCounter( "CqsActive", "The total number of CqsActive this cq qurey", "entries", largerIsBetter); - m_stats[1] = factory->createIntCounter( + stats[1] = factory->createIntCounter( "CqsCreated", "The total number of CqsCreated for this cq Service", "entries", largerIsBetter); - m_stats[2] = factory->createIntCounter( + stats[2] = factory->createIntCounter( "CqsClosed", "The total number of CqsClosed for this cq Service", "entries", largerIsBetter); - m_stats[3] = factory->createIntCounter( + stats[3] = factory->createIntCounter( "CqsStopped", "The total number of CqsStopped for this cq Service", "entries", largerIsBetter); - m_stats[4] = factory->createIntCounter( + stats[4] = factory->createIntCounter( "CqsOnClient", "The total number of Cqs on the client for this cq Service", "entries", largerIsBetter); - statsType = - factory->createType(cqServiceStatsName, cqServiceStatsDesc, m_stats, 5); - - m_numCqsActiveId = statsType->nameToId("CqsActive"); - m_numCqsCreatedId = statsType->nameToId("CqsCreated"); - m_numCqsOnClientId = statsType->nameToId("CqsOnClient"); - m_numCqsClosedId = statsType->nameToId("CqsClosed"); - m_numCqsStoppedId = statsType->nameToId("CqsStopped"); + statsType = factory->createType(STATS_NAME, STATS_DESC, stats, 5); } - return statsType; -} - -CqServiceStatType& CqServiceStatType::getInstance() { - static CqServiceStatType instance; - return instance; -} - -CqServiceStatType::CqServiceStatType() - : m_numCqsActiveId(0), - m_numCqsCreatedId(0), - m_numCqsOnClientId(0), - m_numCqsClosedId(0), - m_numCqsStoppedId(0) { - memset(m_stats, 0, sizeof(m_stats)); -} - -//////////////////////////////////////////////////////////////////////////////// - -// typedef ACE_Singleton<CqServiceVsdStatsInit, ACE_Thread_Mutex> -// TheCqServiceVsdStatsInit; - -//////////////////////////////////////////////////////////////////////////////// - -CqServiceVsdStats::CqServiceVsdStats(const char* cqServiceName) { - auto& regStatType = CqServiceStatType::getInstance(); - - StatisticsType* statsType = regStatType.getStatType(); - - GF_D_ASSERT(statsType != nullptr); - - StatisticsFactory* factory = StatisticsFactory::getExistingInstance(); - - m_cqServiceVsdStats = factory->createAtomicStatistics( - statsType, const_cast<char*>(cqServiceName)); + m_cqServiceVsdStats = + factory->createAtomicStatistics(statsType, cqServiceName.c_str()); - m_numCqsActiveId = regStatType.getNumCqsActiveId(); - m_numCqsCreatedId = regStatType.getNumCqsCreatedId(); - m_numCqsOnClientId = regStatType.getNumCqsOnClientId(); - m_numCqsClosedId = regStatType.getNumCqsClosedId(); - m_numCqsStoppedId = regStatType.getNumCqsStoppedId(); + m_numCqsActiveId = statsType->nameToId("CqsActive"); + m_numCqsCreatedId = statsType->nameToId("CqsCreated"); + m_numCqsOnClientId = statsType->nameToId("CqsOnClient"); + m_numCqsClosedId = statsType->nameToId("CqsClosed"); + m_numCqsStoppedId = statsType->nameToId("CqsStopped"); m_cqServiceVsdStats->setInt(m_numCqsActiveId, 0); m_cqServiceVsdStats->setInt(m_numCqsCreatedId, 0); + m_cqServiceVsdStats->setInt(m_numCqsOnClientId, 0); m_cqServiceVsdStats->setInt(m_numCqsClosedId, 0); m_cqServiceVsdStats->setInt(m_numCqsStoppedId, 0); } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/CqServiceVsdStats.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/CqServiceVsdStats.hpp b/src/cppcache/src/CqServiceVsdStats.hpp index 710dfdb..2d48056 100644 --- a/src/cppcache/src/CqServiceVsdStats.hpp +++ b/src/cppcache/src/CqServiceVsdStats.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_CQSERVICEVSDSTATS_H_ -#define GEODE_CQSERVICEVSDSTATS_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,13 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_CQSERVICEVSDSTATS_H_ +#define GEODE_CQSERVICEVSDSTATS_H_ + +#include <string> + #include <geode/geode_globals.hpp> #include <geode/statistics/Statistics.hpp> #include <geode/statistics/StatisticsFactory.hpp> @@ -39,7 +41,8 @@ using util::concurrent::spinlock_mutex; class CPPCACHE_EXPORT CqServiceVsdStats : public CqServiceStatistics { public: /** hold statistics for a cq. */ - CqServiceVsdStats(const char* cqName = "CqServiceVsdStats"); + CqServiceVsdStats(statistics::StatisticsFactory* factory, + const std::string& cqName = "CqServiceVsdStats"); /** disable stat collection for this item. */ virtual ~CqServiceVsdStats(); @@ -108,42 +111,11 @@ class CPPCACHE_EXPORT CqServiceVsdStats : public CqServiceStatistics { int32_t m_numCqsOnClientId; int32_t m_numCqsClosedId; int32_t m_numCqsStoppedId; -}; - -class CqServiceStatType { - private: - static spinlock_mutex m_statTypeLock; - - public: - static CqServiceStatType& getInstance(); - - StatisticsType* getStatType(); - - private: - CqServiceStatType(); - ~CqServiceStatType() = default; - CqServiceStatType(const CqServiceStatType&) = delete; - CqServiceStatType& operator=(const CqServiceStatType&) = delete; - - StatisticDescriptor* m_stats[5]; - - int32_t m_numCqsActiveId; - int32_t m_numCqsCreatedId; - int32_t m_numCqsOnClientId; - int32_t m_numCqsClosedId; - int32_t m_numCqsStoppedId; - - public: - inline int32_t getNumCqsActiveId() { return m_numCqsActiveId; } - inline int32_t getNumCqsCreatedId() { return m_numCqsCreatedId; } - - inline int32_t getNumCqsOnClientId() { return m_numCqsOnClientId; } - - inline int32_t getNumCqsClosedId() { return m_numCqsClosedId; } - - inline int32_t getNumCqsStoppedId() { return m_numCqsStoppedId; } + static constexpr const char* STATS_NAME = "CqServiceStatistics"; + static constexpr const char* STATS_DESC = "Statistics for this cq Service"; }; + } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DataInput.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DataInput.cpp b/src/cppcache/src/DataInput.cpp index 402a07a..1de3ae3 100644 --- a/src/cppcache/src/DataInput.cpp +++ b/src/cppcache/src/DataInput.cpp @@ -17,15 +17,23 @@ #include <geode/DataInput.hpp> +#include "CacheRegionHelper.hpp" #include <SerializationRegistry.hpp> +#include "CacheImpl.hpp" namespace apache { namespace geode { namespace client { void DataInput::readObjectInternal(SerializablePtr& ptr, int8_t typeId) { - ptr = SerializationRegistry::deserialize(*this, typeId); + ptr = getSerializationRegistry().deserialize(*this, typeId); } + +const SerializationRegistry& DataInput::getSerializationRegistry() const { + return *CacheRegionHelper::getCacheImpl(m_cache)->getSerializationRegistry(); +} + +const Cache* DataInput::getCache() { return m_cache; } } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DataInputInternal.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DataInputInternal.hpp b/src/cppcache/src/DataInputInternal.hpp new file mode 100644 index 0000000..c566d82 --- /dev/null +++ b/src/cppcache/src/DataInputInternal.hpp @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#ifndef GEODE_DATAINPUTINTERNAL_H_ +#define GEODE_DATAINPUTINTERNAL_H_ + +#include <geode/DataInput.hpp> + +namespace apache { +namespace geode { +namespace client { + +class DataInputInternal : public DataInput { + public: + DataInputInternal(const uint8_t* m_buffer, int32_t len, const Cache* cache) + : DataInput(m_buffer, len, cache) {} + + virtual const Cache* getCache() override { + throw FatalInternalException("DataInputInternal does not have a Cache"); + } +}; + +} // namespace client +} // namespace geode +} // namespace apache + +#endif // GEODE_DATAINPUTINTERNAL_H_ http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DataOutput.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DataOutput.cpp b/src/cppcache/src/DataOutput.cpp index f718d12..061f822 100644 --- a/src/cppcache/src/DataOutput.cpp +++ b/src/cppcache/src/DataOutput.cpp @@ -22,6 +22,8 @@ #include <ace/Recursive_Thread_Mutex.h> #include <vector> +#include "CacheImpl.hpp" +#include "CacheRegionHelper.hpp" namespace apache { namespace geode { @@ -105,8 +107,8 @@ TSSDataOutput::~TSSDataOutput() { ACE_TSS<TSSDataOutput> TSSDataOutput::s_tssDataOutput; -DataOutput::DataOutput() - : m_poolName(nullptr), m_size(0), m_haveBigBuffer(false) { +DataOutput::DataOutput(const Cache* cache) + : m_cache(cache), m_poolName(nullptr), m_size(0), m_haveBigBuffer(false) { m_buf = m_bytes = DataOutput::checkoutBuffer(&m_size); } @@ -119,12 +121,18 @@ void DataOutput::checkinBuffer(uint8_t* buffer, uint32_t size) { } void DataOutput::writeObjectInternal(const Serializable* ptr, bool isDelta) { - SerializationRegistry::serialize(ptr, *this, isDelta); + getSerializationRegistry().serialize(ptr, *this, isDelta); } void DataOutput::acquireLock() { g_bigBufferLock.acquire(); } void DataOutput::releaseLock() { g_bigBufferLock.release(); } + +const SerializationRegistry& DataOutput::getSerializationRegistry() const { + return *CacheRegionHelper::getCacheImpl(m_cache)->getSerializationRegistry(); +} + +const Cache* DataOutput::getCache() { return m_cache; } } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DataOutputInternal.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DataOutputInternal.hpp b/src/cppcache/src/DataOutputInternal.hpp new file mode 100644 index 0000000..fd0412e --- /dev/null +++ b/src/cppcache/src/DataOutputInternal.hpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#ifndef GEODE_DATAOUTPUTINTERNAL_H_ +#define GEODE_DATAOUTPUTINTERNAL_H_ + +#include <geode/DataOutput.hpp> + +namespace apache { +namespace geode { +namespace client { + +class DataOutputInternal : public DataOutput { + public: + DataOutputInternal() : DataOutput() {} + + DataOutputInternal(Cache* cache) : DataOutput(cache) {} + + virtual const Cache* getCache() override { + throw FatalInternalException("DataOutputInternal does not have a Cache"); + } +}; + +} // namespace client +} // namespace geode +} // namespace apache + +#endif // GEODE_DATAOUTPUTINTERNAL_H_ http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/Delta.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/Delta.cpp b/src/cppcache/src/Delta.cpp index 5cae7cd..98f28e0 100644 --- a/src/cppcache/src/Delta.cpp +++ b/src/cppcache/src/Delta.cpp @@ -22,15 +22,18 @@ */ #include <geode/Delta.hpp> +#include <geode/Cache.hpp> using namespace apache::geode::client; +Delta::Delta(Cache* cache) : m_cache(cache) {} + DeltaPtr Delta::clone() { - DataOutput out; - Cacheable* ptr = dynamic_cast<Cacheable*>(this); - out.writeObject(ptr); - DataInput in(out.getBuffer(), out.getBufferLength()); + auto out = m_cache->createDataOutput(); + auto ptr = dynamic_cast<Cacheable*>(this); + out->writeObject(ptr); + auto in = m_cache->createDataInput(out->getBuffer(), out->getBufferLength()); CacheablePtr theClonePtr; - in.readObject(theClonePtr); + in->readObject(theClonePtr); return std::dynamic_pointer_cast<Delta>(theClonePtr); } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DiffieHellman.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DiffieHellman.cpp b/src/cppcache/src/DiffieHellman.cpp index 6f07d71..80748bd 100644 --- a/src/cppcache/src/DiffieHellman.cpp +++ b/src/cppcache/src/DiffieHellman.cpp @@ -25,8 +25,6 @@ namespace geode { namespace client { ACE_DLL DiffieHellman::m_dll; -bool DiffieHellman::m_inited = false; -ACE_Recursive_Thread_Mutex DiffieHellman::s_mutex; #define INIT_DH_FUNC_PTR(OrigName) \ DiffieHellman::OrigName##_Type DiffieHellman::OrigName##_Ptr = nullptr; @@ -53,7 +51,9 @@ void* DiffieHellman::getOpenSSLFuncPtr(const char* function_name) { } void DiffieHellman::initOpenSSLFuncPtrs() { - if (DiffieHellman::m_inited) { + static bool inited = false; + + if (inited) { return; } @@ -78,15 +78,14 @@ void DiffieHellman::initOpenSSLFuncPtrs() { ASSIGN_DH_FUNC_PTR(gf_decryptDH) ASSIGN_DH_FUNC_PTR(gf_verifyDH) - DiffieHellman::m_inited = true; + inited = true; } void DiffieHellman::initDhKeys(const PropertiesPtr& props) { - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(DiffieHellman::s_mutex); m_dhCtx = nullptr; - CacheableStringPtr dhAlgo = props->find(SecurityClientDhAlgo); - CacheableStringPtr ksPath = props->find(SecurityClientKsPath); + const auto& dhAlgo = props->find(SecurityClientDhAlgo); + const auto& ksPath = props->find(SecurityClientKsPath); // Null check only for DH Algo if (dhAlgo == nullptr) { @@ -123,20 +122,6 @@ void DiffieHellman::clearDhKeys(void) { m_dhCtx = nullptr; - /* - //reset all pointers -#define CLEAR_DH_FUNC_PTR(OrigName) \ - OrigName##_Ptr = nullptr; - - CLEAR_DH_FUNC_PTR(gf_initDhKeys) - CLEAR_DH_FUNC_PTR(gf_clearDhKeys) - CLEAR_DH_FUNC_PTR(gf_getPublicKey) - CLEAR_DH_FUNC_PTR(gf_setPublicKeyOther) - CLEAR_DH_FUNC_PTR(gf_computeSharedSecret) - CLEAR_DH_FUNC_PTR(gf_encryptDH) - CLEAR_DH_FUNC_PTR(gf_verifyDH) - */ - return; } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DiffieHellman.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DiffieHellman.hpp b/src/cppcache/src/DiffieHellman.hpp index bab6630..a97c0fe 100644 --- a/src/cppcache/src/DiffieHellman.hpp +++ b/src/cppcache/src/DiffieHellman.hpp @@ -41,8 +41,6 @@ namespace geode { namespace client { class DiffieHellman { - static ACE_Recursive_Thread_Mutex s_mutex; - public: void initDhKeys(const PropertiesPtr& props); void clearDhKeys(void); @@ -59,15 +57,10 @@ class DiffieHellman { static void initOpenSSLFuncPtrs(); - DiffieHellman() - : /* adongre - * CID 28933: Uninitialized pointer field (UNINIT_CTOR) - */ - m_dhCtx((void*)0) {} + DiffieHellman() : m_dhCtx(nullptr) {} private: void* m_dhCtx; - static bool m_inited; static void* getOpenSSLFuncPtr(const char* function_name); // OpenSSL Func Ptrs: Declare Func Ptr type and a static variable of FuncPtr http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DiskVersionTag.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DiskVersionTag.hpp b/src/cppcache/src/DiskVersionTag.hpp index 1a34360..850a7c1 100644 --- a/src/cppcache/src/DiskVersionTag.hpp +++ b/src/cppcache/src/DiskVersionTag.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_DISKVERSIONTAG_H_ -#define GEODE_DISKVERSIONTAG_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,11 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_DISKVERSIONTAG_H_ +#define GEODE_DISKVERSIONTAG_H_ + #include "VersionTag.hpp" #include "GeodeTypeIdsImpl.hpp" #include "DiskStoreId.hpp" @@ -35,30 +35,26 @@ _GF_PTR_DEF_(VersionTag, VersionTagPtr); class DiskVersionTag : public VersionTag { protected: virtual void readMembers(uint16_t flags, DataInput& input) { - DSMemberForVersionStampPtr previousMemId, internalMemId; - MemberListForVersionStampPtr memberList = - CacheImpl::getMemberListForVersionStamp(); if ((flags & HAS_MEMBER_ID) != 0) { - DiskStoreId* temp = new DiskStoreId(); - temp->fromData(input); - internalMemId = DSMemberForVersionStampPtr(temp); - m_internalMemId = memberList->add(internalMemId); + auto internalMemId = std::make_shared<DiskStoreId>(); + internalMemId->fromData(input); + m_internalMemId = m_memberListForVersionStamp.add(internalMemId); } if ((flags & HAS_PREVIOUS_MEMBER_ID) != 0) { if ((flags & DUPLICATE_MEMBER_IDS) != 0) { m_previousMemId = m_internalMemId; } else { - DiskStoreId* temp = new DiskStoreId(); - temp->fromData(input); - previousMemId = DSMemberForVersionStampPtr(temp); - m_previousMemId = memberList->add(previousMemId); + auto previousMemId = std::make_shared<DiskStoreId>(); + previousMemId->fromData(input); + m_previousMemId = m_memberListForVersionStamp.add(previousMemId); } } } public: - DiskVersionTag() : VersionTag() {} + DiskVersionTag(MemberListForVersionStamp& memberListForVersionStamp) + : VersionTag(memberListForVersionStamp) {} virtual int32_t classId() const { return 0; } @@ -66,16 +62,20 @@ class DiskVersionTag : public VersionTag { return static_cast<int8_t>(GeodeTypeIdsImpl::DiskVersionTag); } - static Serializable* createDeserializable() { return new DiskVersionTag(); } + static Serializable* createDeserializable( + MemberListForVersionStamp& memberListForVersionStamp) { + return new DiskVersionTag(memberListForVersionStamp); + } /** * for internal testing */ DiskVersionTag(int32_t entryVersion, int16_t regionVersionHighBytes, int32_t regionVersionLowBytes, uint16_t internalMemId, - uint16_t previousMemId) + uint16_t previousMemId, + MemberListForVersionStamp& memberListForVersionStamp) : VersionTag(entryVersion, regionVersionHighBytes, regionVersionLowBytes, - internalMemId, previousMemId) {} + internalMemId, previousMemId, memberListForVersionStamp) {} }; } // namespace client } // namespace geode http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DistributedSystem.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DistributedSystem.cpp b/src/cppcache/src/DistributedSystem.cpp index 72ae401..9c4faf3 100644 --- a/src/cppcache/src/DistributedSystem.cpp +++ b/src/cppcache/src/DistributedSystem.cpp @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - +#include "config.h" #include <geode/geode_globals.hpp> #include <geode/DistributedSystem.hpp> @@ -44,109 +44,84 @@ using namespace apache::geode::client; using namespace apache::geode::statistics; -DistributedSystemPtr* DistributedSystem::m_instance_ptr = nullptr; -bool DistributedSystem::m_connected = false; -DistributedSystemImpl* DistributedSystem::m_impl = nullptr; - ACE_Recursive_Thread_Mutex* g_disconnectLock = new ACE_Recursive_Thread_Mutex(); -namespace { - -StatisticsManager* g_statMngr = nullptr; - -SystemProperties* g_sysProps = nullptr; -} // namespace +namespace {} // namespace -DistributedSystem::DistributedSystem(const char* name) : m_name(nullptr) { - LOGDEBUG("DistributedSystem::DistributedSystem"); - if (name != nullptr) { - size_t len = strlen(name) + 1; - m_name = new char[len]; - ACE_OS::strncpy(m_name, name, len); +namespace apache { +namespace geode { +namespace client { +void setLFH() { +#ifdef _WIN32 + static HINSTANCE kernelMod = nullptr; + if (kernelMod == nullptr) { + kernelMod = GetModuleHandle("kernel32"); + if (kernelMod != nullptr) { + typedef BOOL(WINAPI * PHSI)( + HANDLE HeapHandle, HEAP_INFORMATION_CLASS HeapInformationClass, + PVOID HeapInformation, SIZE_T HeapInformationLength); + typedef HANDLE(WINAPI * PGPH)(); + PHSI pHSI = nullptr; + PGPH pGPH = nullptr; + if ((pHSI = (PHSI)GetProcAddress(kernelMod, "HeapSetInformation")) != + nullptr) { + // The LFH API is available + /* Only set LFH for process heap; causes problems in C++ framework if + set for all heaps + HANDLE hProcessHeapHandles[1024]; + DWORD dwRet; + ULONG heapFragValue = 2; + + dwRet= GetProcessHeaps( 1024, hProcessHeapHandles ); + for (DWORD i = 0; i < dwRet; i++) + { + HeapSetInformation( hProcessHeapHandles[i], + HeapCompatibilityInformation, &heapFragValue, sizeof(heapFragValue) + ); + } + */ + HANDLE hProcessHeapHandle; + ULONG heapFragValue = 2; + if ((pGPH = (PGPH)GetProcAddress(kernelMod, "GetProcessHeap")) != + nullptr) { + hProcessHeapHandle = pGPH(); + LOGCONFIG( + "Setting Microsoft Windows' low-fragmentation heap for use as " + "the main process heap."); + pHSI(hProcessHeapHandle, HeapCompatibilityInformation, &heapFragValue, + sizeof(heapFragValue)); + } + } + } } - if (strlen(g_sysProps->securityClientDhAlgo()) > 0) { +#endif +} +} // namespace client +} // namespace geode +} // namespace apache + +DistributedSystem::DistributedSystem( + const std::string& name, std::unique_ptr<StatisticsManager> statMngr, + std::unique_ptr<SystemProperties> sysProps) + : m_name(name), + m_statisticsManager(std::move(statMngr)), + m_sysProps(std::move(sysProps)), + m_connected(false) { + LOGDEBUG("DistributedSystem::DistributedSystem"); + if (strlen(m_sysProps->securityClientDhAlgo()) > 0) { DiffieHellman::initOpenSSLFuncPtrs(); } } -DistributedSystem::~DistributedSystem() { GF_SAFE_DELETE_ARRAY(m_name); } - -DistributedSystemPtr DistributedSystem::connect( - const char* name, const PropertiesPtr& configPtr) { - ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); - if (m_connected == true) { - throw AlreadyConnectedException( - "DistributedSystem::connect: already connected, call getInstance to " - "get it"); - } - - // make sure statics are initialized. - if (m_instance_ptr == nullptr) { - m_instance_ptr = new DistributedSystemPtr(); - } - if (g_sysProps == nullptr) { - g_sysProps = new SystemProperties(configPtr, nullptr); - } - Exception::setStackTraces(g_sysProps->debugStackTraceEnabled()); - - if (name == nullptr) { - delete g_sysProps; - g_sysProps = nullptr; - throw IllegalArgumentException( - "DistributedSystem::connect: " - "name cannot be nullptr"); - } - if (name[0] == '\0') { - name = "NativeDS"; - } - - // Fix for Ticket#866 on NC OR SR#13306117704 - // Set client name via native client API - const char* propName = g_sysProps->name(); - if (propName != nullptr && strlen(propName) > 0) { - name = propName; - } +DistributedSystem::~DistributedSystem() {} - // Trigger other library initialization. - CppCacheLibrary::initLib(); - - if (!TcrMessage::init()) { - TcrMessage::cleanup(); - throw IllegalArgumentException( - "DistributedSystem::connect: preallocate message buffer failed!"); - } +void DistributedSystem::logSystemInformation() { + std::string gfcpp = CppCacheLibrary::getProductDir(); + LOGCONFIG("Using Geode Native Client Product Directory: %s", gfcpp.c_str()); - const char* logFilename = g_sysProps->logFilename(); - if (logFilename != nullptr) { - try { - Log::close(); - Log::init(g_sysProps->logLevel(), logFilename, - g_sysProps->logFileSizeLimit(), - g_sysProps->logDiskSpaceLimit()); - } catch (const GeodeIOException&) { - Log::close(); - TcrMessage::cleanup(); - CppCacheLibrary::closeLib(); - delete g_sysProps; - g_sysProps = nullptr; - *m_instance_ptr = nullptr; - // delete g_disconnectLock; - throw; - } - } else { - Log::setLogLevel(g_sysProps->logLevel()); - } - - try { - std::string gfcpp = CppCacheLibrary::getProductDir(); - LOGCONFIG("Using Geode Native Client Product Directory: %s", gfcpp.c_str()); - } catch (const Exception&) { - LOGERROR( - "Unable to determine Product Directory. Please set the " - "GFCPP environment variable."); - throw; - } // Add version information, source revision, current directory etc. - LOGCONFIG("Product version: %s", CacheFactory::getProductDescription()); + LOGCONFIG("Product version: %s", + PRODUCT_VENDOR " " PRODUCT_NAME " " PRODUCT_VERSION + " (" PRODUCT_BITS ") " PRODUCT_BUILDDATE); LOGCONFIG("Source revision: %s", PRODUCT_SOURCE_REVISION); LOGCONFIG("Source repository: %s", PRODUCT_SOURCE_REPOSITORY); @@ -171,44 +146,90 @@ DistributedSystemPtr DistributedSystem::connect( ld_libpath == nullptr ? "nullptr" : ld_libpath); #endif // Log the Geode system properties - g_sysProps->logSettings(); + m_sysProps->logSettings(); +} + +std::unique_ptr<DistributedSystem> DistributedSystem::create( + const std::string& _name, Cache* cache, const PropertiesPtr& configPtr) { + // TODO global - Refactory out the static initialization + // Trigger other library initialization. + CppCacheLibrary::initLib(); + + auto sysProps = std::unique_ptr<SystemProperties>( + new SystemProperties(configPtr, nullptr)); + + // TODO global - Refactor this to some process helper + Exception::setStackTraces(sysProps->debugStackTraceEnabled()); + + auto name = _name; + if (name.empty()) { + name = "NativeDS"; + } + + // Set client name via native client API + const char* propName = sysProps->name(); + if (propName != nullptr && strlen(propName) > 0) { + name = propName; + } - /* if (strlen(g_sysProps->securityClientDhAlgo())>0) { - DiffieHellman::initDhKeys(g_sysProps->getSecurityProperties()); - }*/ + // TODO global - keep global but setup once. + const char* logFilename = sysProps->logFilename(); + if (logFilename) { + try { + Log::close(); + Log::init(sysProps->logLevel(), logFilename, sysProps->logFileSizeLimit(), + sysProps->logDiskSpaceLimit()); + } catch (const GeodeIOException&) { + Log::close(); + sysProps = nullptr; + throw; + } + } else { + Log::setLogLevel(sysProps->logLevel()); + } + + try { + std::string gfcpp = CppCacheLibrary::getProductDir(); + } catch (const Exception&) { + LOGERROR( + "Unable to determine Product Directory. Please set the " + "GFCPP environment variable."); + throw; + } - DistributedSystemPtr dptr; + std::unique_ptr<StatisticsManager> statMngr; try { - g_statMngr = StatisticsManager::initInstance( - g_sysProps->statisticsArchiveFile(), - g_sysProps->statisticsSampleInterval(), g_sysProps->statisticsEnabled(), - g_sysProps->statsFileSizeLimit(), g_sysProps->statsDiskSpaceLimit()); + statMngr = std::unique_ptr<StatisticsManager>(new StatisticsManager( + sysProps->statisticsArchiveFile(), sysProps->statisticsSampleInterval(), + sysProps->statisticsEnabled(), cache, sysProps->durableClientId(), + sysProps->durableTimeout(), sysProps->statsFileSizeLimit(), + sysProps->statsDiskSpaceLimit())); } catch (const NullPointerException&) { - // close all open handles, delete whatever was newed. - g_statMngr = nullptr; - //:Merge issue - /*if (strlen(g_sysProps->securityClientDhAlgo())>0) { - DiffieHellman::clearDhKeys(); - }*/ Log::close(); - TcrMessage::cleanup(); - CppCacheLibrary::closeLib(); - delete g_sysProps; - g_sysProps = nullptr; - *m_instance_ptr = nullptr; - // delete g_disconnectLock; throw; } - GF_D_ASSERT(g_statMngr != nullptr); - - CacheImpl::expiryTaskManager = new ExpiryTaskManager(); - CacheImpl::expiryTaskManager->begin(); + GF_D_ASSERT(m_statisticsManager != nullptr); - DistributedSystem* dp = new DistributedSystem(name); - if (!dp) { + auto distributedSystem = std::unique_ptr<DistributedSystem>( + new DistributedSystem(name, std::move(statMngr), std::move(sysProps))); + if (!distributedSystem) { throw NullPointerException("DistributedSystem::connect: new failed"); } - m_impl = new DistributedSystemImpl(name, dp); + distributedSystem->m_impl = + new DistributedSystemImpl(name.c_str(), distributedSystem.get()); + + distributedSystem->logSystemInformation(); + LOGCONFIG("Starting the Geode Native Client"); + return distributedSystem; +} + +void DistributedSystem::connect() { + ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); + if (m_connected == true) { + throw AlreadyConnectedException( + "DistributedSystem::connect: already connected, call getInstance to " + "get it"); + } try { m_impl->connect(); @@ -230,16 +251,8 @@ DistributedSystemPtr DistributedSystem::connect( } m_connected = true; - dptr.reset(dp); - *m_instance_ptr = dptr; - LOGCONFIG("Starting the Geode Native Client"); - - return dptr; } -/** - *@brief disconnect from the distributed system - */ void DistributedSystem::disconnect() { ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); @@ -249,19 +262,6 @@ void DistributedSystem::disconnect() { "not called"); } - try { - CachePtr cache = CacheFactory::getAnyInstance(); - if (cache != nullptr && !cache->isClosed()) { - cache->close(); - } - } catch (const apache::geode::client::Exception& e) { - LOGWARN("Exception while closing: %s: %s", e.getName(), e.getMessage()); - } - - if (CacheImpl::expiryTaskManager != nullptr) { - CacheImpl::expiryTaskManager->stopExpiryTaskManager(); - } - if (m_impl) { m_impl->disconnect(); delete m_impl; @@ -270,73 +270,16 @@ void DistributedSystem::disconnect() { LOGFINEST("Deleted DistributedSystemImpl"); - if (strlen(g_sysProps->securityClientDhAlgo()) > 0) { - // DistributedSystem::getInstance()->m_dh->clearDhKeys(); - } - - // Clear DH Keys - /* if (strlen(g_sysProps->securityClientDhAlgo())>0) { - DiffieHellman::clearDhKeys(); - }*/ - - GF_D_ASSERT(!!g_sysProps); - delete g_sysProps; - g_sysProps = nullptr; - - LOGFINEST("Deleted SystemProperties"); - - if (CacheImpl::expiryTaskManager != nullptr) { - delete CacheImpl::expiryTaskManager; - CacheImpl::expiryTaskManager = nullptr; - } - - LOGFINEST("Deleted ExpiryTaskManager"); - - TcrMessage::cleanup(); - - LOGFINEST("Cleaned TcrMessage"); - - GF_D_ASSERT(!!g_statMngr); - g_statMngr->clean(); - g_statMngr = nullptr; - - LOGFINEST("Cleaned StatisticsManager"); - - RegionStatType::clean(); - - LOGFINEST("Cleaned RegionStatType"); - - PoolStatType::clean(); - - LOGFINEST("Cleaned PoolStatType"); - - *m_instance_ptr = nullptr; - - // Free up library resources - CppCacheLibrary::closeLib(); - LOGCONFIG("Stopped the Geode Native Client"); + // TODO global - log stays global so lets move this Log::close(); m_connected = false; } -SystemProperties* DistributedSystem::getSystemProperties() { - return g_sysProps; +SystemProperties& DistributedSystem::getSystemProperties() const { + return *m_sysProps; } -const char* DistributedSystem::getName() const { return m_name; } - -bool DistributedSystem::isConnected() { - CppCacheLibrary::initLib(); - return m_connected; -} - -DistributedSystemPtr DistributedSystem::getInstance() { - CppCacheLibrary::initLib(); - if (m_instance_ptr == nullptr) { - return nullptr; - } - return *m_instance_ptr; -} +const std::string& DistributedSystem::getName() const { return m_name; } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DistributedSystemImpl.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DistributedSystemImpl.cpp b/src/cppcache/src/DistributedSystemImpl.cpp index 6b5b08d..b562629 100644 --- a/src/cppcache/src/DistributedSystemImpl.cpp +++ b/src/cppcache/src/DistributedSystemImpl.cpp @@ -33,13 +33,13 @@ DistributedSystemImpl::DistributedSystemImpl(const char* name, DistributedSystem* implementee) : m_name(name == 0 ? "" : name), m_implementee(implementee) { g_numInstances = 0; - if (m_implementee->getSystemProperties()->isDhOn()) { + if (m_implementee->getSystemProperties().isDhOn()) { // m_dh.initDhKeys(m_implementee->getSystemProperties()->getSecurityProperties()); } } DistributedSystemImpl::~DistributedSystemImpl() { - if (m_implementee->getSystemProperties()->isDhOn()) { + if (m_implementee->getSystemProperties().isDhOn()) { // m_dh.clearDhKeys(); } g_numInstances = 0; @@ -48,7 +48,7 @@ DistributedSystemImpl::~DistributedSystemImpl() { AuthInitializePtr DistributedSystemImpl::getAuthLoader() { ACE_Guard<ACE_Recursive_Thread_Mutex> authGuard(m_authLock); - return DistributedSystem::getSystemProperties()->getAuthLoader(); + return m_implementee->getSystemProperties().getAuthLoader(); } void DistributedSystemImpl::connect() {} @@ -68,46 +68,28 @@ void DistributedSystemImpl::releaseDisconnectLock() { int DistributedSystemImpl::currentInstances() { ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); - if (DistributedSystem::getInstance() != nullptr && - DistributedSystem::getInstance()->getSystemProperties() != nullptr && - !DistributedSystem::getInstance() - ->getSystemProperties() - ->isAppDomainEnabled()) { - return 0; - } - return g_numInstances; } void DistributedSystemImpl::connectInstance() { ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); - if (DistributedSystem::getInstance()->getSystemProperties() != nullptr && - DistributedSystem::getInstance() - ->getSystemProperties() - ->isAppDomainEnabled()) { - g_numInstances++; - } + g_numInstances++; } void DistributedSystemImpl::disconnectInstance() { ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(*g_disconnectLock); - if (DistributedSystem::getInstance()->getSystemProperties() != nullptr && - DistributedSystem::getInstance() - ->getSystemProperties() - ->isAppDomainEnabled()) { - g_numInstances--; - } + g_numInstances--; } -void DistributedSystemImpl::CallCliCallBack() { +void DistributedSystemImpl::CallCliCallBack(Cache& cache) { ACE_Guard<ACE_Recursive_Thread_Mutex> disconnectGuard(m_cliCallbackLock); if (m_isCliCallbackSet == true) { for (std::map<int, CliCallbackMethod>::iterator iter = m_cliCallbackMap.begin(); iter != m_cliCallbackMap.end(); ++iter) { - (*iter).second(); + (*iter).second(cache); } } } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/DistributedSystemImpl.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/DistributedSystemImpl.hpp b/src/cppcache/src/DistributedSystemImpl.hpp index 404c151..5352263 100644 --- a/src/cppcache/src/DistributedSystemImpl.hpp +++ b/src/cppcache/src/DistributedSystemImpl.hpp @@ -126,7 +126,7 @@ class CPPCACHE_EXPORT DistributedSystemImpl { static void unregisterCliCallback(int appdomainId); - static void CallCliCallBack(); + static void CallCliCallBack(Cache& cache); private: /** http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/EntriesMap.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/EntriesMap.hpp b/src/cppcache/src/EntriesMap.hpp index a6cea61..c057aa5 100644 --- a/src/cppcache/src/EntriesMap.hpp +++ b/src/cppcache/src/EntriesMap.hpp @@ -22,6 +22,8 @@ // This needs to be ace free so that the region can include it. +#include <memory> + #include <geode/geode_globals.hpp> #include "MapEntry.hpp" #include <geode/CacheableKey.hpp> @@ -40,7 +42,7 @@ namespace client { */ class CPPCACHE_EXPORT EntriesMap { public: - EntriesMap(EntryFactory* entryFactory) : m_entryFactory(entryFactory) {} + EntriesMap(std::unique_ptr<EntryFactory> entryFactory) : m_entryFactory(std::move(entryFactory)) {} virtual ~EntriesMap() {} /** @@ -168,10 +170,10 @@ class CPPCACHE_EXPORT EntriesMap { static bool boolVal; protected: - EntryFactory* m_entryFactory; + const std::unique_ptr<EntryFactory> m_entryFactory; /** @brief return the instance of EntryFactory for the segments to use. */ - inline const EntryFactory* getEntryFactory() const { return m_entryFactory; } + inline const EntryFactory* getEntryFactory() const { return m_entryFactory.get(); } }; // class EntriesMap } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/EntriesMapFactory.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/EntriesMapFactory.cpp b/src/cppcache/src/EntriesMapFactory.cpp index 99213dd..b8e3254 100644 --- a/src/cppcache/src/EntriesMapFactory.cpp +++ b/src/cppcache/src/EntriesMapFactory.cpp @@ -44,45 +44,51 @@ EntriesMap* EntriesMapFactory::createMap(RegionInternal* region, bool concurrencyChecksEnabled = attrs->getConcurrencyChecksEnabled(); bool heapLRUEnabled = false; - SystemProperties* prop = DistributedSystem::getSystemProperties(); - if ((lruLimit != 0) || - (prop && prop->heapLRULimitEnabled())) { // create LRU map... + auto cache = region->getCacheImpl(); + auto& prop = cache->getDistributedSystem().getSystemProperties(); + auto& expiryTaskmanager = cache->getExpiryTaskManager(); + + if ((lruLimit != 0) || (prop.heapLRULimitEnabled())) { // create LRU map... LRUAction::Action lruEvictionAction; DiskPolicyType::PolicyType dpType = attrs->getDiskPolicy(); if (dpType == DiskPolicyType::OVERFLOWS) { lruEvictionAction = LRUAction::OVERFLOW_TO_DISK; } else if ((dpType == DiskPolicyType::NONE) || - (prop && prop->heapLRULimitEnabled())) { + (prop.heapLRULimitEnabled())) { lruEvictionAction = LRUAction::LOCAL_DESTROY; - if (prop && prop->heapLRULimitEnabled()) heapLRUEnabled = true; + if (prop.heapLRULimitEnabled()) heapLRUEnabled = true; } else { return nullptr; } if (ttl != 0 || idle != 0) { - EntryFactory* entryFactory = LRUExpEntryFactory::singleton; - entryFactory->setConcurrencyChecksEnabled(concurrencyChecksEnabled); - result = new LRUEntriesMap(entryFactory, region, lruEvictionAction, - lruLimit, concurrencyChecksEnabled, - concurrency, heapLRUEnabled); + result = new LRUEntriesMap( + &expiryTaskmanager, + std::unique_ptr<LRUExpEntryFactory>( + new LRUExpEntryFactory(concurrencyChecksEnabled)), + region, lruEvictionAction, lruLimit, concurrencyChecksEnabled, + concurrency, heapLRUEnabled); } else { - EntryFactory* entryFactory = LRUEntryFactory::singleton; - entryFactory->setConcurrencyChecksEnabled(concurrencyChecksEnabled); - result = new LRUEntriesMap(entryFactory, region, lruEvictionAction, - lruLimit, concurrencyChecksEnabled, - concurrency, heapLRUEnabled); + result = new LRUEntriesMap( + &expiryTaskmanager, + std::unique_ptr<LRUEntryFactory>( + new LRUEntryFactory(concurrencyChecksEnabled)), + region, lruEvictionAction, lruLimit, concurrencyChecksEnabled, + concurrency, heapLRUEnabled); } } else if (ttl != 0 || idle != 0) { // create entries with a ExpEntryFactory. - EntryFactory* entryFactory = ExpEntryFactory::singleton; - entryFactory->setConcurrencyChecksEnabled(concurrencyChecksEnabled); - result = new ConcurrentEntriesMap(entryFactory, concurrencyChecksEnabled, - region, concurrency); + result = new ConcurrentEntriesMap( + &expiryTaskmanager, + std::unique_ptr<ExpEntryFactory>( + new ExpEntryFactory(concurrencyChecksEnabled)), + concurrencyChecksEnabled, region, concurrency); } else { // create plain concurrent map. - EntryFactory* entryFactory = EntryFactory::singleton; - entryFactory->setConcurrencyChecksEnabled(concurrencyChecksEnabled); - result = new ConcurrentEntriesMap(entryFactory, concurrencyChecksEnabled, - region, concurrency); + result = new ConcurrentEntriesMap( + &expiryTaskmanager, + std::unique_ptr<EntryFactory>( + new EntryFactory(concurrencyChecksEnabled)), + concurrencyChecksEnabled, region, concurrency); } result->open(initialCapacity); return result; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/EntryExpiryHandler.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/EntryExpiryHandler.cpp b/src/cppcache/src/EntryExpiryHandler.cpp index 31ef91c..fec863d 100644 --- a/src/cppcache/src/EntryExpiryHandler.cpp +++ b/src/cppcache/src/EntryExpiryHandler.cpp @@ -68,7 +68,8 @@ int EntryExpiryHandler::handle_timeout(const ACE_Time_Value& current_time, "[%s]", -sec, Utils::getCacheableKeyString(key)->asChar(), m_regionPtr->getFullPath()); - CacheImpl::expiryTaskManager->resetTask(expProps.getExpiryTaskId(), -sec); + m_regionPtr->getCacheImpl()->getExpiryTaskManager().resetTask( + expProps.getExpiryTaskId(), -sec); return 0; } } catch (...) { @@ -77,7 +78,8 @@ int EntryExpiryHandler::handle_timeout(const ACE_Time_Value& current_time, LOGDEBUG("Removing expiry task for key [%s] of region [%s]", Utils::getCacheableKeyString(key)->asChar(), m_regionPtr->getFullPath()); - CacheImpl::expiryTaskManager->resetTask(expProps.getExpiryTaskId(), 0); + m_regionPtr->getCacheImpl()->getExpiryTaskManager().resetTask( + expProps.getExpiryTaskId(), 0); // we now delete the handler in GF_Timer_Heap_ImmediateReset_T // and always return success. http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ExecutionImpl.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ExecutionImpl.cpp b/src/cppcache/src/ExecutionImpl.cpp index 3a9e1b1..cfe45cd 100644 --- a/src/cppcache/src/ExecutionImpl.cpp +++ b/src/cppcache/src/ExecutionImpl.cpp @@ -371,7 +371,11 @@ GfErrType ExecutionImpl::getFuncAttributes(const char* func, // do TCR GET_FUNCTION_ATTRIBUTES LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES "); std::string funcName(func); - TcrMessageGetFunctionAttributes request(funcName, tcrdm); + TcrMessageGetFunctionAttributes request(tcrdm->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + funcName, tcrdm); TcrMessageReply reply(true, tcrdm); err = tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) { @@ -483,7 +487,11 @@ CacheableVectorPtr ExecutionImpl::executeOnPool(std::string& func, while (attempt <= retryAttempts) { std::string funcName(func); - TcrMessageExecuteFunction msg(funcName, m_args, getResult, tcrdm, timeout); + TcrMessageExecuteFunction msg(tcrdm->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), + funcName, m_args, getResult, tcrdm, timeout); TcrMessageReply reply(true, tcrdm); ChunkedFunctionExecutionResponse* resultCollector( new ChunkedFunctionExecutionResponse(reply, (getResult & 2), m_rc)); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ExpMapEntry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ExpMapEntry.cpp b/src/cppcache/src/ExpMapEntry.cpp index 22ef261..f3db017 100644 --- a/src/cppcache/src/ExpMapEntry.cpp +++ b/src/cppcache/src/ExpMapEntry.cpp @@ -14,23 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "ExpMapEntry.hpp" #include "MapEntryT.hpp" -using namespace apache::geode::client; - -ExpEntryFactory* ExpEntryFactory::singleton = nullptr; +namespace apache { +namespace geode { +namespace client { -/** - * @brief called when library is initialized... see CppCacheLibrary. - */ -void ExpEntryFactory::init() { singleton = new ExpEntryFactory(); } - -void ExpEntryFactory::newMapEntry(const CacheableKeyPtr& key, +void ExpEntryFactory::newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const { if (m_concurrencyChecksEnabled) { - result = MapEntryT<VersionedExpMapEntry, 0, 0>::create(key); + result = + MapEntryT<VersionedExpMapEntry, 0, 0>::create(expiryTaskManager, key); } else { - result = MapEntryT<ExpMapEntry, 0, 0>::create(key); + result = MapEntryT<ExpMapEntry, 0, 0>::create(expiryTaskManager, key); } } + +} // namespace client +} // namespace geode +} // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ExpMapEntry.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ExpMapEntry.hpp b/src/cppcache/src/ExpMapEntry.hpp index 823ee39..a307222 100644 --- a/src/cppcache/src/ExpMapEntry.hpp +++ b/src/cppcache/src/ExpMapEntry.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_EXPMAPENTRY_H_ -#define GEODE_EXPMAPENTRY_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,11 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_EXPMAPENTRY_H_ +#define GEODE_EXPMAPENTRY_H_ + #include <geode/geode_globals.hpp> #include "MapEntry.hpp" #include "VersionStamp.hpp" @@ -49,7 +49,9 @@ class CPPCACHE_EXPORT ExpMapEntry : public MapEntryImpl, inline explicit ExpMapEntry(bool noInit) : MapEntryImpl(true), ExpEntryProperties(true) {} - inline ExpMapEntry(const CacheableKeyPtr& key) : MapEntryImpl(key) {} + inline ExpMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key) + : MapEntryImpl(key), ExpEntryProperties(expiryTaskManager) {} private: // disabled @@ -69,7 +71,9 @@ class CPPCACHE_EXPORT VersionedExpMapEntry : public ExpMapEntry, protected: inline explicit VersionedExpMapEntry(bool noInit) : ExpMapEntry(true) {} - inline VersionedExpMapEntry(const CacheableKeyPtr& key) : ExpMapEntry(key) {} + inline VersionedExpMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key) + : ExpMapEntry(expiryTaskManager, key) {} private: // disabled @@ -81,14 +85,12 @@ typedef std::shared_ptr<VersionedExpMapEntry> VersionedExpMapEntryPtr; class CPPCACHE_EXPORT ExpEntryFactory : public EntryFactory { public: - static ExpEntryFactory* singleton; - static void init(); - - ExpEntryFactory() {} + using EntryFactory::EntryFactory; virtual ~ExpEntryFactory() {} - virtual void newMapEntry(const CacheableKeyPtr& key, + virtual void newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const; }; } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/FarSideEntryOp.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/FarSideEntryOp.cpp b/src/cppcache/src/FarSideEntryOp.cpp index 6ed18f9..e17c677 100644 --- a/src/cppcache/src/FarSideEntryOp.cpp +++ b/src/cppcache/src/FarSideEntryOp.cpp @@ -30,7 +30,8 @@ namespace apache { namespace geode { namespace client { -FarSideEntryOp::FarSideEntryOp(RegionCommit* region) +FarSideEntryOp::FarSideEntryOp( + RegionCommit* region, MemberListForVersionStamp& memberListForVersionStamp) : // UNUSED m_region(region), /* adongre * @@ -38,7 +39,8 @@ FarSideEntryOp::FarSideEntryOp(RegionCommit* region) m_op(0), m_modSerialNum(0), m_eventOffset(0), - m_didDestroy(false) + m_didDestroy(false), + m_memberListForVersionStamp(memberListForVersionStamp) {} @@ -73,7 +75,8 @@ void FarSideEntryOp::fromData(DataInput& input, bool largeModCount, } skipFilterRoutingInfo(input); - m_versionTag = TcrMessage::readVersionTagPart(input, memId); + m_versionTag = + TcrMessage::readVersionTagPart(input, memId, m_memberListForVersionStamp); // SerializablePtr sPtr; // input.readObject(sPtr); input.readInt(&m_eventOffset); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/FarSideEntryOp.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/FarSideEntryOp.hpp b/src/cppcache/src/FarSideEntryOp.hpp index 043e905..ab4063a 100644 --- a/src/cppcache/src/FarSideEntryOp.hpp +++ b/src/cppcache/src/FarSideEntryOp.hpp @@ -94,7 +94,8 @@ _GF_PTR_DEF_(FarSideEntryOp, FarSideEntryOpPtr); class FarSideEntryOp { public: - FarSideEntryOp(RegionCommit* region); + FarSideEntryOp(RegionCommit* region, + MemberListForVersionStamp& memberListForVersionStamp); virtual ~FarSideEntryOp(); void fromData(DataInput& input, bool largeModCount, uint16_t memId); @@ -115,6 +116,7 @@ class FarSideEntryOp { bool m_didDestroy; UserDataPtr m_callbackArg; VersionTagPtr m_versionTag; + MemberListForVersionStamp& m_memberListForVersionStamp; // FilterRoutingInfo filterRoutingInfo; bool isDestroy(int8_t op); bool isInvalidate(int8_t op); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/FunctionService.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/FunctionService.cpp b/src/cppcache/src/FunctionService.cpp index c785836..a392586 100644 --- a/src/cppcache/src/FunctionService.cpp +++ b/src/cppcache/src/FunctionService.cpp @@ -16,12 +16,14 @@ */ #include <geode/FunctionService.hpp> #include <geode/ExceptionTypes.hpp> -#include <ExecutionImpl.hpp> -#include <ProxyRegion.hpp> -#include <UserAttributes.hpp> -#include <ProxyCache.hpp> #include <geode/PoolManager.hpp> -#include <CacheRegionHelper.hpp> + +#include "CacheRegionHelper.hpp" +#include "ExecutionImpl.hpp" +#include "ProxyRegion.hpp" +#include "UserAttributes.hpp" +#include "ProxyCache.hpp" +#include "CacheImpl.hpp" using namespace apache::geode::client; @@ -45,7 +47,8 @@ ExecutionPtr FunctionService::onRegion(RegionPtr region) { // it is in multiuser mode proxyCache = pr->m_proxyCache; PoolPtr userAttachedPool = proxyCache->m_userAttributes->getPool(); - PoolPtr pool = PoolManager::find(userAttachedPool->getName()); + PoolPtr pool = region->getCache()->getPoolManager().find( + userAttachedPool->getName()); if (!(pool != nullptr && pool.get() == userAttachedPool.get() && !pool->isDestroyed())) { throw IllegalStateException( @@ -54,9 +57,9 @@ ExecutionPtr FunctionService::onRegion(RegionPtr region) { RegionPtr tmpRegion; tmpRegion = nullptr; // getting real region to execute function on region - if (!CacheFactory::getAnyInstance()->isClosed()) { - CacheRegionHelper::getCacheImpl(CacheFactory::getAnyInstance().get()) - ->getRegion(region->getName(), tmpRegion); + if (!region->getCache()->isClosed()) { + region->getCache()->m_cacheImpl->getRegion(region->getName(), + tmpRegion); } else { throw IllegalStateException("Cache has been closed"); } @@ -109,7 +112,8 @@ ExecutionPtr FunctionService::onServerWithCache(const RegionServicePtr& cache) { LOGDEBUG("FunctionService::onServer:"); if (pc != nullptr) { PoolPtr userAttachedPool = pc->m_userAttributes->getPool(); - PoolPtr pool = PoolManager::find(userAttachedPool->getName()); + PoolPtr pool = + pc->m_cacheImpl->getPoolManager().find(userAttachedPool->getName()); if (pool != nullptr && pool.get() == userAttachedPool.get() && !pool->isDestroyed()) { return std::make_shared<ExecutionImpl>(pool, false, pc); @@ -118,7 +122,8 @@ ExecutionPtr FunctionService::onServerWithCache(const RegionServicePtr& cache) { "Pool has been close to execute function on server"); } else { CachePtr realcache = std::static_pointer_cast<Cache>(cache); - return FunctionService::onServer(realcache->m_cacheImpl->getDefaultPool()); + return FunctionService::onServer( + realcache->m_cacheImpl->getPoolManager().getDefaultPool()); } } @@ -133,7 +138,8 @@ ExecutionPtr FunctionService::onServersWithCache( LOGDEBUG("FunctionService::onServers:"); if (pc != nullptr && !cache->isClosed()) { auto userAttachedPool = pc->m_userAttributes->getPool(); - auto pool = PoolManager::find(userAttachedPool->getName()); + auto pool = pc->m_cacheImpl->getCache()->getPoolManager().find( + userAttachedPool->getName()); if (pool != nullptr && pool.get() == userAttachedPool.get() && !pool->isDestroyed()) { return std::make_shared<ExecutionImpl>(pool, true, pc); @@ -142,6 +148,7 @@ ExecutionPtr FunctionService::onServersWithCache( "Pool has been close to execute function on server"); } else { auto realcache = std::static_pointer_cast<Cache>(cache); - return FunctionService::onServers(realcache->m_cacheImpl->getDefaultPool()); + return FunctionService::onServers( + realcache->m_cacheImpl->getPoolManager().getDefaultPool()); } } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/InternalCacheTransactionManager2PCImpl.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/InternalCacheTransactionManager2PCImpl.cpp b/src/cppcache/src/InternalCacheTransactionManager2PCImpl.cpp index 5a01dff..b24c898 100644 --- a/src/cppcache/src/InternalCacheTransactionManager2PCImpl.cpp +++ b/src/cppcache/src/InternalCacheTransactionManager2PCImpl.cpp @@ -66,6 +66,10 @@ void InternalCacheTransactionManager2PCImpl::prepare() { } TcrMessageTxSynchronization requestCommitBefore( + tcr_dm->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), BEFORE_COMMIT, txState->getTransactionId()->getId(), STATUS_COMMITTED); TcrMessageReply replyCommitBefore(true, nullptr); @@ -161,6 +165,10 @@ void InternalCacheTransactionManager2PCImpl::afterCompletion(int32_t status) { TXCleaner txCleaner(this); TcrMessageTxSynchronization requestCommitAfter( + tcr_dm->getConnectionManager() + .getCacheImpl() + ->getCache() + ->createDataOutput(), AFTER_COMMIT, txState->getTransactionId()->getId(), status); TcrMessageReply replyCommitAfter(true, nullptr); @@ -172,9 +180,8 @@ void InternalCacheTransactionManager2PCImpl::afterCompletion(int32_t status) { } else { switch (replyCommitAfter.getMessageType()) { case TcrMessage::RESPONSE: { - TXCommitMessagePtr commit = - std::static_pointer_cast<TXCommitMessage>( - replyCommitAfter.getValue()); + TXCommitMessagePtr commit = std::static_pointer_cast<TXCommitMessage>( + replyCommitAfter.getValue()); if (commit.get() != nullptr) // e.g. when afterCompletion(STATUS_ROLLEDBACK) called { http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUAction.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUAction.cpp b/src/cppcache/src/LRUAction.cpp index 6abb9ee..8c3e74d 100644 --- a/src/cppcache/src/LRUAction.cpp +++ b/src/cppcache/src/LRUAction.cpp @@ -87,7 +87,7 @@ bool LRUOverFlowToDiskAction::evict(const MapEntryImplPtr& mePtr) { lruProps.setPersistenceInfo(persistenceInfo); } (m_regionPtr->getRegionStats())->incOverflows(); - (m_regionPtr->getCacheImpl())->m_cacheStats->incOverflows(); + (m_regionPtr->getCacheImpl())->getCachePerfStats().incOverflows(); // set value after write on disk to indicate that it is on disk. mePtr->setValueI(CacheableToken::overflowed()); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUEntriesMap.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUEntriesMap.cpp b/src/cppcache/src/LRUEntriesMap.cpp index 5ecefec..34348db 100644 --- a/src/cppcache/src/LRUEntriesMap.cpp +++ b/src/cppcache/src/LRUEntriesMap.cpp @@ -52,13 +52,15 @@ class CPPCACHE_EXPORT TestMapAction : public virtual LRUAction { friend class LRUAction; }; -LRUEntriesMap::LRUEntriesMap(EntryFactory* entryFactory, RegionInternal* region, +LRUEntriesMap::LRUEntriesMap(ExpiryTaskManager* expiryTaskManager, + std::unique_ptr<EntryFactory> entryFactory, + RegionInternal* region, const LRUAction::Action& lruAction, const uint32_t limit, bool concurrencyChecksEnabled, const uint8_t concurrency, bool heapLRUEnabled) - : ConcurrentEntriesMap(entryFactory, concurrencyChecksEnabled, region, - concurrency), + : ConcurrentEntriesMap(expiryTaskManager, std::move(entryFactory), + concurrencyChecksEnabled, region, concurrency), m_lruList(), m_limit(limit), m_pmPtr(nullptr), @@ -396,7 +398,7 @@ bool LRUEntriesMap::get(const CacheableKeyPtr& key, CacheablePtr& returnPtr, return false; } m_region->getRegionStats()->incRetrieves(); - m_region->getCacheImpl()->m_cacheStats->incRetrieves(); + m_region->getCacheImpl()->getCachePerfStats().incRetrieves(); returnPtr = tmpObj; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUEntriesMap.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUEntriesMap.hpp b/src/cppcache/src/LRUEntriesMap.hpp index 498bd90..0c86abe 100644 --- a/src/cppcache/src/LRUEntriesMap.hpp +++ b/src/cppcache/src/LRUEntriesMap.hpp @@ -73,10 +73,11 @@ class CPPCACHE_EXPORT LRUEntriesMap : public ConcurrentEntriesMap, bool m_heapLRUEnabled; public: - LRUEntriesMap(EntryFactory* entryFactory, RegionInternal* region, - const LRUAction::Action& lruAction, const uint32_t limit, - bool concurrencyChecksEnabled, const uint8_t concurrency = 16, - bool heapLRUEnabled = false); + LRUEntriesMap(ExpiryTaskManager* expiryTaskManager, + std::unique_ptr<EntryFactory> entryFactory, + RegionInternal* region, const LRUAction::Action& lruAction, + const uint32_t limit, bool concurrencyChecksEnabled, + const uint8_t concurrency = 16, bool heapLRUEnabled = false); virtual ~LRUEntriesMap(); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUExpMapEntry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUExpMapEntry.cpp b/src/cppcache/src/LRUExpMapEntry.cpp index e3213cc..66aface 100644 --- a/src/cppcache/src/LRUExpMapEntry.cpp +++ b/src/cppcache/src/LRUExpMapEntry.cpp @@ -14,23 +14,25 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "LRUExpMapEntry.hpp" #include "MapEntryT.hpp" -using namespace apache::geode::client; - -LRUExpEntryFactory* LRUExpEntryFactory::singleton = nullptr; +namespace apache { +namespace geode { +namespace client { -/** - * @brief called when library is initialized... see CppCacheLibrary. - */ -void LRUExpEntryFactory::init() { singleton = new LRUExpEntryFactory(); } - -void LRUExpEntryFactory::newMapEntry(const CacheableKeyPtr& key, +void LRUExpEntryFactory::newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const { if (m_concurrencyChecksEnabled) { - result = MapEntryT<VersionedLRUExpMapEntry, 0, 0>::create(key); + result = MapEntryT<VersionedLRUExpMapEntry, 0, 0>::create(expiryTaskManager, + key); } else { - result = MapEntryT<LRUExpMapEntry, 0, 0>::create(key); + result = MapEntryT<LRUExpMapEntry, 0, 0>::create(expiryTaskManager, key); } } + +} // namespace client +} // namespace geode +} // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUExpMapEntry.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUExpMapEntry.hpp b/src/cppcache/src/LRUExpMapEntry.hpp index 98271e2..3617909 100644 --- a/src/cppcache/src/LRUExpMapEntry.hpp +++ b/src/cppcache/src/LRUExpMapEntry.hpp @@ -1,8 +1,3 @@ -#pragma once - -#ifndef GEODE_LRUEXPMAPENTRY_H_ -#define GEODE_LRUEXPMAPENTRY_H_ - /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with @@ -20,6 +15,11 @@ * limitations under the License. */ +#pragma once + +#ifndef GEODE_LRUEXPMAPENTRY_H_ +#define GEODE_LRUEXPMAPENTRY_H_ + #include <geode/geode_globals.hpp> #include "MapEntry.hpp" #include "LRUList.hpp" @@ -53,7 +53,9 @@ class CPPCACHE_EXPORT LRUExpMapEntry : public MapEntryImpl, LRUEntryProperties(true), ExpEntryProperties(true) {} - inline LRUExpMapEntry(const CacheableKeyPtr& key) : MapEntryImpl(key) {} + inline LRUExpMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key) + : MapEntryImpl(key), ExpEntryProperties(expiryTaskManager) {} private: // disabled @@ -73,8 +75,9 @@ class CPPCACHE_EXPORT VersionedLRUExpMapEntry : public LRUExpMapEntry, protected: inline explicit VersionedLRUExpMapEntry(bool noInit) : LRUExpMapEntry(true) {} - inline VersionedLRUExpMapEntry(const CacheableKeyPtr& key) - : LRUExpMapEntry(key) {} + inline VersionedLRUExpMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key) + : LRUExpMapEntry(expiryTaskManager, key) {} private: // disabled @@ -86,14 +89,12 @@ typedef std::shared_ptr<VersionedLRUExpMapEntry> VersionedLRUExpMapEntryPtr; class CPPCACHE_EXPORT LRUExpEntryFactory : public EntryFactory { public: - static LRUExpEntryFactory* singleton; - static void init(); - - LRUExpEntryFactory() {} + using EntryFactory::EntryFactory; virtual ~LRUExpEntryFactory() {} - virtual void newMapEntry(const CacheableKeyPtr& key, + virtual void newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const; }; } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUMapEntry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUMapEntry.cpp b/src/cppcache/src/LRUMapEntry.cpp index 8cae083..ed060e3 100644 --- a/src/cppcache/src/LRUMapEntry.cpp +++ b/src/cppcache/src/LRUMapEntry.cpp @@ -14,19 +14,16 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + #include "LRUMapEntry.hpp" #include "MapEntryT.hpp" -using namespace apache::geode::client; - -LRUEntryFactory* LRUEntryFactory::singleton = nullptr; +namespace apache { +namespace geode { +namespace client { -/** - * @brief called when library is initialized... see CppCacheLibrary. - */ -void LRUEntryFactory::init() { singleton = new LRUEntryFactory(); } - -void LRUEntryFactory::newMapEntry(const CacheableKeyPtr& key, +void LRUEntryFactory::newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const { if (m_concurrencyChecksEnabled) { result = MapEntryT<VersionedLRUMapEntry, 0, 0>::create(key); @@ -34,3 +31,7 @@ void LRUEntryFactory::newMapEntry(const CacheableKeyPtr& key, result = MapEntryT<LRUMapEntry, 0, 0>::create(key); } } + +} // namespace client +} // namespace geode +} // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/LRUMapEntry.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/LRUMapEntry.hpp b/src/cppcache/src/LRUMapEntry.hpp index 2ab14d5..3520a4a 100644 --- a/src/cppcache/src/LRUMapEntry.hpp +++ b/src/cppcache/src/LRUMapEntry.hpp @@ -110,14 +110,12 @@ typedef std::shared_ptr<VersionedLRUMapEntry> VersionedLRUMapEntryPtr; class CPPCACHE_EXPORT LRUEntryFactory : public EntryFactory { public: - static LRUEntryFactory* singleton; - static void init(); - - LRUEntryFactory() {} + using EntryFactory::EntryFactory; virtual ~LRUEntryFactory() {} - virtual void newMapEntry(const CacheableKeyPtr& key, + virtual void newMapEntry(ExpiryTaskManager* expiryTaskManager, + const CacheableKeyPtr& key, MapEntryImplPtr& result) const; }; } // namespace client