http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolStickyHADM.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolStickyHADM.cpp b/src/cppcache/src/ThinClientPoolStickyHADM.cpp deleted file mode 100644 index 9989328..0000000 --- a/src/cppcache/src/ThinClientPoolStickyHADM.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -#include "ThinClientPoolStickyHADM.hpp" -#include "TssConnectionWrapper.hpp" -#include <algorithm> -using namespace apache::geode::client; -/*TcrConnection* ThinClientPoolStickyHADM::getConnectionFromQueueW( GfErrType* -error, - std::set< ServerLocation >& excludeServers, bool isBGThread, TcrMessage & -request, int8_t& version, bool & dummy, const BucketServerLocationPtr& -serverLocation ) -{ - TcrConnection* conn = nullptr; - if( isBGThread ){ - conn = ThinClientPoolDM::getConnectionFromQueueW( error, excludeServers, -isBGThread, request, version, dummy, serverLocation); - return conn; - } - - m_manager->getStickyConnection(conn , error, excludeServers, -request.forTransaction()); - return conn; -} -void ThinClientPoolStickyHADM::putInQueue(TcrConnection* conn, bool isBGThread, -bool isTransaction ) -{ - if( !isBGThread ) - m_manager->setStickyConnection( conn, isTransaction ); - else - ThinClientPoolDM::putInQueue( conn, isBGThread, isTransaction); -} -void ThinClientPoolStickyHADM::setStickyNull( bool isBGThread ) -{ - if( !isBGThread ) m_manager->setStickyConnection( nullptr, false ); -} - -void ThinClientPoolStickyHADM::cleanStickyConnections(volatile bool& isRunning) -{ - if (!isRunning) { - return; - } - m_manager->cleanStaleStickyConnection(); -} - -bool ThinClientPoolStickyHADM::canItBeDeleted(TcrConnection* conn) -{ - return m_manager->canThisConnBeDeleted( conn ); -} -void ThinClientPoolStickyHADM::releaseThreadLocalConnection() -{ - m_manager->releaseThreadLocalConnection(); -} -void ThinClientPoolStickyHADM::setThreadLocalConnection(TcrConnection* conn) -{ - m_manager->addStickyConnection(conn); -} -bool ThinClientPoolStickyHADM::canItBeDeletedNoImpl(TcrConnection* conn ) -{ - return ThinClientPoolDM::canItBeDeleted( conn ); -}*/
http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientPoolStickyHADM.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientPoolStickyHADM.hpp b/src/cppcache/src/ThinClientPoolStickyHADM.hpp index 40c1694..07fa4ed 100644 --- a/src/cppcache/src/ThinClientPoolStickyHADM.hpp +++ b/src/cppcache/src/ThinClientPoolStickyHADM.hpp @@ -29,29 +29,9 @@ class ThinClientPoolStickyHADM : public ThinClientPoolHADM { ThinClientPoolStickyHADM(const char* name, PoolAttributesPtr poolAttrs, TcrConnectionManager& connManager) : ThinClientPoolHADM(name, poolAttrs, connManager) { - // m_manager = new ThinClientStickyManager( this ); m_sticky = true; } - virtual ~ThinClientPoolStickyHADM() { - /*m_manager->closeAllStickyConnections(); - delete m_manager; m_manager = nullptr;*/ - } - /*bool canItBeDeletedNoImpl(TcrConnection* conn ); -protected: - virtual void cleanStickyConnections(volatile bool& isRunning); - virtual TcrConnection* getConnectionFromQueueW( GfErrType* error, - std::set< ServerLocation >&, bool isBGThread, TcrMessage & request, int8_t& -version, bool & dummy, const BucketServerLocationPtr& serverLocation = nullptr -); - virtual void putInQueue(TcrConnection* conn, bool isBGThread, bool -isTransaction = false ); - virtual void setStickyNull( bool isBGThread ); - virtual bool canItBeDeleted(TcrConnection* conn); - virtual void releaseThreadLocalConnection(); - virtual void setThreadLocalConnection(TcrConnection* conn); -*/ - // virtual void cleanStickyConnections(volatile bool& isRunning); - // ThinClientStickyManager* m_manager; + virtual ~ThinClientPoolStickyHADM() {} }; typedef std::shared_ptr<ThinClientPoolStickyHADM> ThinClientPoolStickyHADMPtr; } // namespace client http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRedundancyManager.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientRedundancyManager.cpp b/src/cppcache/src/ThinClientRedundancyManager.cpp index d356782..6ab3248 100644 --- a/src/cppcache/src/ThinClientRedundancyManager.cpp +++ b/src/cppcache/src/ThinClientRedundancyManager.cpp @@ -642,11 +642,13 @@ void ThinClientRedundancyManager::initialize(int redundancyLevel) { m_redundancyLevel = redundancyLevel; m_HAenabled = (redundancyLevel > 0 || m_theTcrConnManager->isDurable() || ThinClientBaseDM::isDeltaEnabledOnServer()); - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); + auto& sysProp = m_theTcrConnManager->getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); if (m_poolHADM) { m_eventidmap.init(m_poolHADM->getSubscriptionMessageTrackingTimeout()); } else { - m_eventidmap.init(sysProp->notifyDupCheckLife()); + m_eventidmap.init(sysProp.notifyDupCheckLife()); } int millis = 100; if (m_HAenabled) { @@ -655,7 +657,7 @@ void ThinClientRedundancyManager::initialize(int redundancyLevel) { millis = m_poolHADM->getSubscriptionAckInterval(); } else { - millis = sysProp->notifyAckInterval(); + millis = sysProp.notifyAckInterval(); } if (millis < 100) millis = 100; { @@ -719,7 +721,8 @@ void ThinClientRedundancyManager::close() { if (m_periodicAckTask) { if (m_processEventIdMapTaskId >= 0) { - CacheImpl::expiryTaskManager->cancelTask(m_processEventIdMapTaskId); + m_theTcrConnManager->getCacheImpl()->getExpiryTaskManager().cancelTask( + m_processEventIdMapTaskId); } m_periodicAckTask->stopNoblock(); m_periodicAckSema.release(); @@ -750,7 +753,8 @@ bool ThinClientRedundancyManager::readyForEvents( return true; } - TcrMessageClientReady request; + TcrMessageClientReady request( + m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput()); TcrMessageReply reply(true, nullptr); GfErrType err = GF_NOTCON; @@ -791,6 +795,7 @@ bool ThinClientRedundancyManager::sendMakePrimaryMesg( } TcrMessageReply reply(false, nullptr); const TcrMessageMakePrimary makePrimaryRequest( + m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput(), ThinClientRedundancyManager::m_sentReadyForEvents); LOGFINE("Making primary subscription endpoint %s", ep->name().c_str()); @@ -1109,7 +1114,8 @@ bool ThinClientRedundancyManager::isDurable() { } void ThinClientRedundancyManager::readyForEvents() { - TcrMessageClientReady request; + TcrMessageClientReady request( + m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput()); TcrMessageReply reply(true, nullptr); GfErrType result = GF_NOTCON; unsigned int epCount = 0; @@ -1194,7 +1200,9 @@ void ThinClientRedundancyManager::doPeriodicAck() { m_redundantEndpoints.begin(); if (endpoint != m_redundantEndpoints.end()) { - TcrMessagePeriodicAck request(entries); + TcrMessagePeriodicAck request( + m_theTcrConnManager->getCacheImpl()->getCache()->createDataOutput(), + entries); TcrMessageReply reply(true, nullptr); GfErrType result = GF_NOERR; @@ -1243,24 +1251,26 @@ void ThinClientRedundancyManager::startPeriodicAck() { m_periodicAckTask = new Task<ThinClientRedundancyManager>( this, &ThinClientRedundancyManager::periodicAck, NC_PerodicACK); m_periodicAckTask->start(); - SystemProperties* props = DistributedSystem::getSystemProperties(); + const auto& props = m_theTcrConnManager->getCacheImpl() + ->getDistributedSystem() + .getSystemProperties(); // start the periodic ACK task handler - ACE_Event_Handler* periodicAckTask = - new ExpiryHandler_T<ThinClientRedundancyManager>( - this, &ThinClientRedundancyManager::processEventIdMap); - // m_processEventIdMapTaskId = CacheImpl::expiryTaskManager-> - // scheduleExpiryTask(periodicAckTask, 1, 1, false); - m_processEventIdMapTaskId = CacheImpl::expiryTaskManager->scheduleExpiryTask( - periodicAckTask, m_nextAckInc, m_nextAckInc, false); + auto periodicAckTask = new ExpiryHandler_T<ThinClientRedundancyManager>( + this, &ThinClientRedundancyManager::processEventIdMap); + m_processEventIdMapTaskId = + m_theTcrConnManager->getCacheImpl() + ->getExpiryTaskManager() + .scheduleExpiryTask(periodicAckTask, m_nextAckInc, m_nextAckInc, + false); LOGFINE( "Registered subscription event " "periodic ack task with id = %ld, notify-ack-interval = %ld, " "notify-dupcheck-life = %ld, periodic ack is %sabled", m_processEventIdMapTaskId, m_poolHADM ? m_poolHADM->getSubscriptionAckInterval() - : props->notifyAckInterval(), + : props.notifyAckInterval(), m_poolHADM ? m_poolHADM->getSubscriptionMessageTrackingTimeout() - : props->notifyDupCheckLife(), + : props.notifyDupCheckLife(), m_HAenabled ? "en" : "dis"); } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRegion.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientRegion.cpp b/src/cppcache/src/ThinClientRegion.cpp index b7a2dcb..8c7ddf3 100644 --- a/src/cppcache/src/ThinClientRegion.cpp +++ b/src/cppcache/src/ThinClientRegion.cpp @@ -15,24 +15,26 @@ * limitations under the License. */ +#include <geode/SelectResultsIterator.hpp> +#include <geode/SystemProperties.hpp> +#include <geode/PoolManager.hpp> +#include <geode/UserFunctionExecutionException.hpp> + #include "Utils.hpp" +#include "CacheRegionHelper.hpp" #include "ThinClientRegion.hpp" #include "TcrDistributionManager.hpp" #include "ThinClientPoolDM.hpp" #include "ThinClientBaseDM.hpp" #include "TcrEndpoint.hpp" -#include <geode/SystemProperties.hpp> #include "CacheImpl.hpp" #include "RegionGlobalLocks.hpp" #include "ReadWriteLock.hpp" #include "RemoteQuery.hpp" -#include <geode/SelectResultsIterator.hpp> #include <geode/Struct.hpp> #include "GeodeTypeIdsImpl.hpp" #include "AutoDelete.hpp" -#include <geode/PoolManager.hpp> #include "UserAttributes.hpp" -#include <geode/UserFunctionExecutionException.hpp> #include "PutAllPartialResultServerException.hpp" #include "VersionedCacheableObjectPartList.hpp" //#include "PutAllPartialResult.hpp" @@ -88,9 +90,9 @@ class PutAllWork : public PooledWork<GfErrType>, m_isPapeReceived(false) // UNUSED , m_aCallbackArgument(aCallbackArgument) { - m_request = new TcrMessagePutAll(m_region.get(), *m_map.get(), - static_cast<int>(m_timeout * 1000), - m_poolDM, aCallbackArgument); + m_request = new TcrMessagePutAll( + m_region->getCache()->createDataOutput(), m_region.get(), *m_map.get(), + static_cast<int>(m_timeout * 1000), m_poolDM, aCallbackArgument); m_reply = new TcrMessageReply(true, m_poolDM); // create new instanceof VCOPL @@ -228,8 +230,9 @@ class RemoveAllWork : public PooledWork<GfErrType>, m_keys(keys), m_papException(nullptr), m_isPapeReceived(false) { - m_request = new TcrMessageRemoveAll(m_region.get(), *keys, - m_aCallbackArgument, m_poolDM); + m_request = new TcrMessageRemoveAll( + m_region->getCache()->createDataOutput(), m_region.get(), *keys, + m_aCallbackArgument, m_poolDM); m_reply = new TcrMessageReply(true, m_poolDM); // create new instanceof VCOPL ACE_Recursive_Thread_Mutex responseLock; @@ -327,30 +330,35 @@ class RemoveAllWork : public PooledWork<GfErrType>, } }; -ThinClientRegion::ThinClientRegion(const std::string& name, CacheImpl* cache, +ThinClientRegion::ThinClientRegion(const std::string& name, + CacheImpl* cacheImpl, const RegionInternalPtr& rPtr, const RegionAttributesPtr& attributes, const CacheStatisticsPtr& stats, bool shared) - : LocalRegion(name, cache, rPtr, attributes, stats, shared), + : LocalRegion(name, cacheImpl, rPtr, attributes, stats, shared), m_tcrdm((ThinClientBaseDM*)0), m_notifyRelease(false), m_notificationSema(1), m_isMetaDataRefreshed(false) { m_transactionEnabled = true; - m_isDurableClnt = - strlen(DistributedSystem::getSystemProperties()->durableClientId()) > 0; + m_isDurableClnt = strlen(cacheImpl->getDistributedSystem() + .getSystemProperties() + .durableClientId()) > 0; } void ThinClientRegion::initTCR() { bool subscription = false; - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { subscription = pool->getSubscriptionEnabled(); } bool notificationEnabled = getAttributes()->getClientNotificationEnabled() || subscription; if (notificationEnabled) { - if (DistributedSystem::getSystemProperties()->isGridClient()) { + if (m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .isGridClient()) { LOGWARN( "Region %s: client subscription channel enabled for a grid " "client; starting required internal subscription, cleanup and " @@ -374,7 +382,8 @@ void ThinClientRegion::initTCR() { void ThinClientRegion::registerKeys(const VectorOfCacheableKey& keys, bool isDurable, bool getInitialValues, bool receiveValues) { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -417,7 +426,8 @@ void ThinClientRegion::registerKeys(const VectorOfCacheableKey& keys, } void ThinClientRegion::unregisterKeys(const VectorOfCacheableKey& keys) { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -452,7 +462,8 @@ void ThinClientRegion::registerAllKeys(bool isDurable, VectorOfCacheableKeyPtr resultKeys, bool getInitialValues, bool receiveValues) { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -510,7 +521,8 @@ void ThinClientRegion::registerRegex(const char* regex, bool isDurable, VectorOfCacheableKeyPtr resultKeys, bool getInitialValues, bool receiveValues) { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -571,7 +583,8 @@ void ThinClientRegion::registerRegex(const char* regex, bool isDurable, } void ThinClientRegion::unregisterRegex(const char* regex) { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -596,7 +609,8 @@ void ThinClientRegion::unregisterRegex(const char* regex) { } void ThinClientRegion::unregisterAllKeys() { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGERROR( @@ -688,7 +702,8 @@ bool ThinClientRegion::existsValue(const char* predicate, uint32_t timeout) { } GfErrType ThinClientRegion::unregisterKeysBeforeDestroyRegion() { - PoolPtr pool = PoolManager::find(getAttributes()->getPoolName()); + PoolPtr pool = m_cacheImpl->getCache()->getPoolManager().find( + getAttributes()->getPoolName()); if (pool != nullptr) { if (!pool->getSubscriptionEnabled()) { LOGDEBUG( @@ -756,7 +771,8 @@ void ThinClientRegion::serverKeys(VectorOfCacheableKey& v) { CHECK_DESTROY_PENDING(TryReadGuard, Region::serverKeys); TcrMessageReply reply(true, m_tcrdm); - TcrMessageKeySet request(m_fullPath, m_tcrdm); + TcrMessageKeySet request(m_cacheImpl->getCache()->createDataOutput(), + m_fullPath, m_tcrdm); reply.setMessageTypeRequest(TcrMessage::KEY_SET); // need to check ChunkedKeySetResponse* resultCollector( @@ -812,8 +828,9 @@ bool ThinClientRegion::containsKeyOnServer( /** @brief Create message and send to bridge server */ - TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(nullptr), - true, m_tcrdm); + TcrMessageContainsKey request(m_cache->createDataOutput(), this, keyPtr, + static_cast<UserDataPtr>(nullptr), true, + m_tcrdm); TcrMessageReply reply(true, m_tcrdm); reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY); err = m_tcrdm->sendSyncRequest(request, reply); @@ -866,8 +883,9 @@ bool ThinClientRegion::containsValueForKey_remote( /** @brief Create message and send to bridge server */ - TcrMessageContainsKey request(this, keyPtr, static_cast<UserDataPtr>(nullptr), - false, m_tcrdm); + TcrMessageContainsKey request(m_cache->createDataOutput(), this, keyPtr, + static_cast<UserDataPtr>(nullptr), false, + m_tcrdm); TcrMessageReply reply(true, m_tcrdm); reply.setMessageTypeRequest(TcrMessage::CONTAINS_KEY); err = m_tcrdm->sendSyncRequest(request, reply); @@ -912,7 +930,8 @@ void ThinClientRegion::clear(const UserDataPtr& aCallbackArgument) { /** @brief Create message and send to bridge server */ - TcrMessageClearRegion request(this, aCallbackArgument, -1, m_tcrdm); + TcrMessageClearRegion request(m_cache->createDataOutput(), this, + aCallbackArgument, -1, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) GfErrTypeToException("Region::clear", err); @@ -952,7 +971,8 @@ GfErrType ThinClientRegion::getNoThrow_remote( /** @brief Create message and send to bridge server */ - TcrMessageRequest request(this, keyPtr, aCallbackArgument, m_tcrdm); + TcrMessageRequest request(m_cache->createDataOutput(), this, keyPtr, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) return err; @@ -991,7 +1011,8 @@ GfErrType ThinClientRegion::invalidateNoThrow_remote( /** @brief Create message and send to bridge server */ - TcrMessageInvalidate request(this, keyPtr, aCallbackArgument, m_tcrdm); + TcrMessageInvalidate request(m_cache->createDataOutput(), this, keyPtr, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) return err; @@ -1030,25 +1051,27 @@ GfErrType ThinClientRegion::putNoThrow_remote( // do TCR put // bool delta = valuePtr->hasDelta(); bool delta = false; - const char* conFlationValue = - DistributedSystem::getSystemProperties()->conflateEvents(); + const char* conFlationValue = getCacheImpl() + ->getDistributedSystem() + .getSystemProperties() + .conflateEvents(); if (checkDelta && valuePtr != nullptr && conFlationValue != nullptr && strcmp(conFlationValue, "true") != 0 && ThinClientBaseDM::isDeltaEnabledOnServer()) { Delta* temp = dynamic_cast<Delta*>(valuePtr.get()); delta = (temp && temp->hasDelta()); } - TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, delta, - m_tcrdm); + TcrMessagePut request(m_cache->createDataOutput(), this, keyPtr, valuePtr, + aCallbackArgument, delta, m_tcrdm); TcrMessageReply* reply = new TcrMessageReply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, *reply); if (delta) { - m_cacheImpl->m_cacheStats - ->incDeltaPut(); // Does not chcek whether success of failure.. + m_cacheImpl->getCachePerfStats() + .incDeltaPut(); // Does not chcek whether success of failure.. if (reply->getMessageType() == TcrMessage::PUT_DELTA_ERROR) { // Try without delta - TcrMessagePut request(this, keyPtr, valuePtr, aCallbackArgument, false, - m_tcrdm, false, true); + TcrMessagePut request(m_cache->createDataOutput(), this, keyPtr, valuePtr, + aCallbackArgument, false, m_tcrdm, false, true); delete reply; reply = new TcrMessageReply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, *reply); @@ -1096,7 +1119,8 @@ GfErrType ThinClientRegion::destroyNoThrow_remote( GfErrType err = GF_NOERR; // do TCR destroy - TcrMessageDestroy request(this, keyPtr, nullptr, aCallbackArgument, m_tcrdm); + TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, nullptr, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) return err; @@ -1137,7 +1161,8 @@ GfErrType ThinClientRegion::removeNoThrow_remote( GfErrType err = GF_NOERR; // do TCR remove - TcrMessageDestroy request(this, keyPtr, cvalue, aCallbackArgument, m_tcrdm); + TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, cvalue, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) { @@ -1178,7 +1203,8 @@ GfErrType ThinClientRegion::removeNoThrowEX_remote( GfErrType err = GF_NOERR; // do TCR remove - TcrMessageDestroy request(this, keyPtr, nullptr, aCallbackArgument, m_tcrdm); + TcrMessageDestroy request(m_cache->createDataOutput(), this, keyPtr, nullptr, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) { @@ -1239,7 +1265,7 @@ GfErrType ThinClientRegion::getAllNoThrow_remote( } // create the GET_ALL request TcrMessageGetAll request( - this, keys, m_tcrdm, + m_cache->createDataOutput(), this, keys, m_tcrdm, aCallbackArgument); // now we need to initialize later TcrMessageReply reply(true, m_tcrdm); @@ -1347,7 +1373,8 @@ GfErrType ThinClientRegion::singleHopPutAllNoThrow_remote( * e. insert the worker into the vector. */ std::vector<PutAllWork*> putAllWorkers; - ThreadPool* threadPool = TPSingleton::instance(); + auto* threadPool = + CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool(); int locationMapIndex = 0; for (const auto& locationIter : *locationMap) { const auto& serverLocation = locationIter.first; @@ -1616,7 +1643,8 @@ GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote( GfErrType err = GF_NOERR; // Construct request/reply for putAll - TcrMessagePutAll request(this, map, static_cast<int>(timeout * 1000), m_tcrdm, + TcrMessagePutAll request(m_cache->createDataOutput(), this, map, + static_cast<int>(timeout * 1000), m_tcrdm, aCallbackArgument); TcrMessageReply reply(true, m_tcrdm); request.setTimeout(timeout); @@ -1624,7 +1652,7 @@ GfErrType ThinClientRegion::multiHopPutAllNoThrow_remote( ACE_Recursive_Thread_Mutex responseLock; versionedObjPartList = - std::make_shared<VersionedCacheableObjectPartList>(responseLock); + std::make_shared<VersionedCacheableObjectPartList>(this, responseLock); // need to check ChunkedPutAllResponse* resultCollector(new ChunkedPutAllResponse( shared_from_this(), reply, responseLock, versionedObjPartList)); @@ -1731,7 +1759,8 @@ GfErrType ThinClientRegion::singleHopRemoveAllNoThrow_remote( * e. insert the worker into the vector. */ std::vector<RemoveAllWork*> removeAllWorkers; - ThreadPool* threadPool = TPSingleton::instance(); + auto* threadPool = + CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool(); int locationMapIndex = 0; for (const auto& locationIter : *locationMap) { const auto& serverLocation = locationIter.first; @@ -1947,12 +1976,13 @@ GfErrType ThinClientRegion::multiHopRemoveAllNoThrow_remote( GfErrType err = GF_NOERR; // Construct request/reply for putAll - TcrMessageRemoveAll request(this, keys, aCallbackArgument, m_tcrdm); + TcrMessageRemoveAll request(m_cache->createDataOutput(), this, keys, + aCallbackArgument, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); ACE_Recursive_Thread_Mutex responseLock; versionedObjPartList = - std::make_shared<VersionedCacheableObjectPartList>(responseLock); + std::make_shared<VersionedCacheableObjectPartList>(this, responseLock); // need to check ChunkedRemoveAllResponse* resultCollector(new ChunkedRemoveAllResponse( shared_from_this(), reply, responseLock, versionedObjPartList)); @@ -2023,7 +2053,7 @@ uint32_t ThinClientRegion::size_remote() { GfErrType err = GF_NOERR; // do TCR size - TcrMessageSize request(m_fullPath.c_str()); + TcrMessageSize request(m_cache->createDataOutput(), m_fullPath.c_str()); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); @@ -2034,8 +2064,7 @@ uint32_t ThinClientRegion::size_remote() { switch (reply.getMessageType()) { case TcrMessage::RESPONSE: { CacheableInt32Ptr size = - std::static_pointer_cast<CacheableInt32>( - reply.getValue()); + std::static_pointer_cast<CacheableInt32>(reply.getValue()); return size->value(); // LOGINFO("Map is written into remote server at region %s", // m_fullPath.c_str()); @@ -2229,7 +2258,8 @@ GfErrType ThinClientRegion::destroyRegionNoThrow_remote( GfErrType err = GF_NOERR; // do TCR destroyRegion - TcrMessageDestroyRegion request(this, aCallbackArgument, -1, m_tcrdm); + TcrMessageDestroyRegion request(m_cache->createDataOutput(), this, + aCallbackArgument, -1, m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) return err; @@ -2284,8 +2314,9 @@ GfErrType ThinClientRegion::registerKeysNoThrow( interestPolicy.ordinal); TcrMessageRegisterInterestList request( - this, keys, isDurable, getAttributes()->getCachingEnabled(), - receiveValues, interestPolicy, m_tcrdm); + m_cache->createDataOutput(), this, keys, isDurable, + getAttributes()->getCachingEnabled(), receiveValues, interestPolicy, + m_tcrdm); ACE_Recursive_Thread_Mutex responseLock; TcrChunkedResult* resultCollector = nullptr; if (interestPolicy.ordinal == InterestResultPolicy::KEYS_VALUES.ordinal) { @@ -2348,7 +2379,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrow( return GF_CACHE_ILLEGAL_STATE_EXCEPTION; } - TcrMessageUnregisterInterestList request(this, keys, false, false, true, + TcrMessageUnregisterInterestList request(m_cache->createDataOutput(), this, + keys, false, false, true, InterestResultPolicy::NONE, m_tcrdm); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { @@ -2382,7 +2414,8 @@ GfErrType ThinClientRegion::unregisterKeysNoThrowLocalDestroy( return GF_CACHE_ILLEGAL_STATE_EXCEPTION; } - TcrMessageUnregisterInterestList request(this, keys, false, false, true, + TcrMessageUnregisterInterestList request(m_cache->createDataOutput(), this, + keys, false, false, true, InterestResultPolicy::NONE, m_tcrdm); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR) { @@ -2456,8 +2489,8 @@ GfErrType ThinClientRegion::registerRegexNoThrow( // TODO: TcrMessageRegisterInterest request( - m_fullPath, regex.c_str(), interestPolicy, isDurable, - getAttributes()->getCachingEnabled(), receiveValues, m_tcrdm); + m_cache->createDataOutput(), m_fullPath, regex.c_str(), interestPolicy, + isDurable, getAttributes()->getCachingEnabled(), receiveValues, m_tcrdm); ACE_Recursive_Thread_Mutex responseLock; if (reply == nullptr) { reply = &replyLocal; @@ -2527,9 +2560,9 @@ GfErrType ThinClientRegion::unregisterRegexNoThrow(const std::string& regex, if (err == GF_NOERR) { TcrMessageReply reply(false, m_tcrdm); - TcrMessageUnregisterInterest request(m_fullPath, regex, - InterestResultPolicy::NONE, false, - false, true, m_tcrdm); + TcrMessageUnregisterInterest request( + m_cache->createDataOutput(), m_fullPath, regex, + InterestResultPolicy::NONE, false, false, true, m_tcrdm); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR /*|| err == GF_CACHE_REDUNDANCY_FAILURE*/) { if (attemptFailover) { @@ -2573,9 +2606,9 @@ GfErrType ThinClientRegion::unregisterRegexNoThrowLocalDestroy( if (err == GF_NOERR) { TcrMessageReply reply(false, m_tcrdm); - TcrMessageUnregisterInterest request(m_fullPath, regex, - InterestResultPolicy::NONE, false, - false, true, m_tcrdm); + TcrMessageUnregisterInterest request( + m_cache->createDataOutput(), m_fullPath, regex, + InterestResultPolicy::NONE, false, false, true, m_tcrdm); err = m_tcrdm->sendSyncRequestRegisterInterest(request, reply); if (err == GF_NOERR) { if (attemptFailover) { @@ -3054,12 +3087,12 @@ void ThinClientRegion::executeFunction(const char* func, TcrMessage* msg; if (reExecuteForServ) { msg = new TcrMessageExecuteRegionFunction( - funcName, this, args, routingObj, getResult, failedNodes, timeout, - m_tcrdm, static_cast<int8_t>(1)); + m_cache->createDataOutput(), funcName, this, args, routingObj, + getResult, failedNodes, timeout, m_tcrdm, static_cast<int8_t>(1)); } else { msg = new TcrMessageExecuteRegionFunction( - funcName, this, args, routingObj, getResult, failedNodes, timeout, - m_tcrdm, static_cast<int8_t>(0)); + m_cache->createDataOutput(), funcName, this, args, routingObj, + getResult, failedNodes, timeout, m_tcrdm, static_cast<int8_t>(0)); } TcrMessageReply reply(true, m_tcrdm); // need to check @@ -3151,9 +3184,10 @@ CacheableVectorPtr ThinClientRegion::reExecuteFunction( do { reExecute = false; std::string funcName(func); - TcrMessageExecuteRegionFunction msg( - funcName, this, args, routingObj, getResult, failedNodes, timeout, - m_tcrdm, /*reExecute*/ static_cast<int8_t>(1)); + TcrMessageExecuteRegionFunction msg(m_cache->createDataOutput(), funcName, + this, args, routingObj, getResult, + failedNodes, timeout, m_tcrdm, + /*reExecute*/ static_cast<int8_t>(1)); TcrMessageReply reply(true, m_tcrdm); // need to check ChunkedFunctionExecutionResponse* resultCollector( @@ -3222,7 +3256,8 @@ bool ThinClientRegion::executeFunctionSH( const auto& userAttr = TSSUserAttributesWrapper::s_geodeTSSUserAttributes->getUserAttributes(); std::vector<OnRegionFunctionExecution*> feWorkers; - auto threadPool = TPSingleton::instance(); + auto* threadPool = + CacheRegionHelper::getCacheImpl(getCache().get())->getThreadPool(); for (const auto& locationIter : *locationMap) { const auto& serverLocation = locationIter.first; @@ -3317,7 +3352,8 @@ GfErrType ThinClientRegion::getFuncAttributes(const char* func, // do TCR GET_FUNCTION_ATTRIBUTES LOGDEBUG("Tcrmessage request GET_FUNCTION_ATTRIBUTES "); std::string funcName(func); - TcrMessageGetFunctionAttributes request(funcName, m_tcrdm); + TcrMessageGetFunctionAttributes request(m_cache->createDataOutput(), funcName, + m_tcrdm); TcrMessageReply reply(true, m_tcrdm); err = m_tcrdm->sendSyncRequest(request, reply); if (err != GF_NOERR) { @@ -3345,7 +3381,8 @@ GfErrType ThinClientRegion::getFuncAttributes(const char* func, GfErrType ThinClientRegion::getNoThrow_FullObject(EventIdPtr eventId, CacheablePtr& fullObject, VersionTagPtr& versionTag) { - TcrMessageRequestEventValue fullObjectMsg(eventId); + TcrMessageRequestEventValue fullObjectMsg(m_cache->createDataOutput(), + eventId); TcrMessageReply reply(true, nullptr); GfErrType err = GF_NOTCON; @@ -3378,12 +3415,12 @@ void ThinClientRegion::txPut(const CacheableKeyPtr& key, const UserDataPtr& aCallbackArgument, VersionTagPtr versionTag) { CacheablePtr oldValue; - int64_t sampleStartNanos = Utils::startStatOpTime(); + int64_t sampleStartNanos = startStatOpTime(); GfErrType err = putNoThrowTX(key, value, aCallbackArgument, oldValue, -1, CacheEventFlags::NORMAL, versionTag); - Utils::updateStatOpTime(m_regionStats->getStat(), - RegionStatType::getInstance()->getPutTimeId(), - sampleStartNanos); + + updateStatOpTime(m_regionStats->getStat(), m_regionStats->getPutTimeId(), + sampleStartNanos); GfErrTypeToException("Region::putTX", err); } @@ -3395,18 +3432,19 @@ void ChunkedInterestResponse::reset() { void ChunkedInterestResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); - input.setPoolName(m_replyMsg.getPoolName()); + input->setPoolName(m_replyMsg.getPoolName()); uint32_t partLen; if (TcrMessageHelper::readChunkPartHeader( - m_msg, input, 0, GeodeTypeIds::CacheableArrayList, + m_msg, *input, 0, GeodeTypeIds::CacheableArrayList, "ChunkedInterestResponse", partLen, isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) { // encountered an exception part, so return without reading more - m_replyMsg.readSecureObjectPart(input, false, true, + m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3414,8 +3452,8 @@ void ChunkedInterestResponse::handleChunk(const uint8_t* chunk, if (m_resultKeys == nullptr) { m_resultKeys = std::make_shared<VectorOfCacheableKey>(); } - serializer::readObject(input, *m_resultKeys); - m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + serializer::readObject(*input, *m_resultKeys); + m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } void ChunkedKeySetResponse::reset() { @@ -3425,24 +3463,25 @@ void ChunkedKeySetResponse::reset() { } void ChunkedKeySetResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); - input.setPoolName(m_replyMsg.getPoolName()); + input->setPoolName(m_replyMsg.getPoolName()); uint32_t partLen; if (TcrMessageHelper::readChunkPartHeader( - m_msg, input, 0, GeodeTypeIds::CacheableArrayList, + m_msg, *input, 0, GeodeTypeIds::CacheableArrayList, "ChunkedKeySetResponse", partLen, isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) { // encountered an exception part, so return without reading more - m_replyMsg.readSecureObjectPart(input, false, true, + m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } - serializer::readObject(input, m_resultKeys); - m_replyMsg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + serializer::readObject(*input, m_resultKeys); + m_replyMsg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } void ChunkedQueryResponse::reset() { @@ -3510,31 +3549,32 @@ void ChunkedQueryResponse::readObjectPartList(DataInput& input, } void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { + uint8_t isLastChunkWithSecurity, + const Cache* cache) { LOGDEBUG("ChunkedQueryResponse::handleChunk.."); - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); uint32_t partLen; int8_t isObj; TcrMessageHelper::ChunkObjectType objType; if ((objType = TcrMessageHelper::readChunkPartHeader( - m_msg, input, GeodeTypeIdsImpl::FixedIDByte, + m_msg, *input, GeodeTypeIdsImpl::FixedIDByte, static_cast<uint8_t>(GeodeTypeIdsImpl::CollectionTypeImpl), "ChunkedQueryResponse", partLen, isLastChunkWithSecurity)) == TcrMessageHelper::EXCEPTION) { // encountered an exception part, so return without reading more - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } else if (objType == TcrMessageHelper::NULL_OBJECT) { // special case for scalar result - input.readInt(&partLen); - input.read(&isObj); + input->readInt(&partLen); + input->read(&isObj); CacheableInt32Ptr intVal; - input.readObject(intVal, true); + input->readObject(intVal, true); m_queryResults->push_back(intVal); // TODO: - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3550,30 +3590,30 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, // If the results on server are in a bag, or the user need to manipulate // the elements, then we have to revisit this issue. // For now, we'll live with duplicate records, hoping they do not cost much. - skipClass(input); + skipClass(*input); // skipping CollectionTypeImpl - // skipClass(input); // no longer, since GFE 5.7 + // skipClass(*input); // no longer, since GFE 5.7 int8_t structType; - input.read(&structType); // this is Fixed ID byte (1) - input.read(&structType); // this is DataSerializable (45) - input.read(&classByte); + input->read(&structType); // this is Fixed ID byte (1) + input->read(&structType); // this is DataSerializable (45) + input->read(&classByte); uint8_t stringType; - input.read(&stringType); // ignore string header - assume 64k string - input.readUTF(&isStructTypeImpl, &stiLen); + input->read(&stringType); // ignore string header - assume 64k string + input->readUTF(&isStructTypeImpl, &stiLen); DeleteArray<char> delSTI(isStructTypeImpl); if (strcmp(isStructTypeImpl, "org.apache.geode.cache.query.Struct") == 0) { int32_t numOfFldNames; - input.readArrayLen(&numOfFldNames); + input->readArrayLen(&numOfFldNames); bool skip = false; if (m_structFieldNames.size() != 0) { skip = true; } for (int i = 0; i < numOfFldNames; i++) { CacheableStringPtr sptr; - // input.readObject(sptr); - input.readNativeString(sptr); + // input->readObject(sptr); + input->readNativeString(sptr); if (!skip) { m_structFieldNames.push_back(sptr); } @@ -3581,12 +3621,12 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, } // skip the remaining part - input.reset(); + input->reset(); // skip the whole part including partLen and isObj (4+1) - input.advanceCursor(partLen + 5); + input->advanceCursor(partLen + 5); - input.readInt(&partLen); - input.read(&isObj); + input->readInt(&partLen); + input->read(&isObj); if (!isObj) { LOGERROR( "Query response part is not an object; possible serialization " @@ -3599,30 +3639,30 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, bool isResultSet = (m_structFieldNames.size() == 0); int8_t arrayType; - input.read(&arrayType); + input->read(&arrayType); if (arrayType == GeodeTypeIds::CacheableObjectArray) { int32_t arraySize; - input.readArrayLen(&arraySize); - skipClass(input); + input->readArrayLen(&arraySize); + skipClass(*input); for (int32_t arrayItem = 0; arrayItem < arraySize; ++arrayItem) { SerializablePtr value; if (isResultSet) { - input.readObject(value); + input->readObject(value); m_queryResults->push_back(value); } else { - input.read(&isObj); + input->read(&isObj); int32_t arraySize2; - input.readArrayLen(&arraySize2); - skipClass(input); + input->readArrayLen(&arraySize2); + skipClass(*input); for (int32_t index = 0; index < arraySize2; ++index) { - input.readObject(value); + input->readObject(value); m_queryResults->push_back(value); } } } } else if (arrayType == GeodeTypeIdsImpl::FixedIDByte) { - input.read(&arrayType); + input->read(&arrayType); if (arrayType != GeodeTypeIdsImpl::CacheableObjectPartList) { LOGERROR( "Query response got unhandled message format %d while expecting " @@ -3632,7 +3672,7 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, "Query response got unhandled message format while expecting object " "part list; possible serialization mismatch"); } - readObjectPartList(input, isResultSet); + readObjectPartList(*input, isResultSet); } else { LOGERROR( "Query response got unhandled message format %d; possible " @@ -3643,7 +3683,7 @@ void ChunkedQueryResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, "mismatch"); } - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } void ChunkedQueryResponse::skipClass(DataInput& input) { @@ -3668,19 +3708,20 @@ void ChunkedFunctionExecutionResponse::reset() { } void ChunkedFunctionExecutionResponse::handleChunk( - const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) { + const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity, + const Cache* cache) { LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk"); - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); uint32_t partLen; int8_t arrayType; if ((arrayType = static_cast<TcrMessageHelper::ChunkObjectType>( TcrMessageHelper::readChunkPartHeader( - m_msg, input, "ChunkedFunctionExecutionResponse", partLen, + m_msg, *input, "ChunkedFunctionExecutionResponse", partLen, isLastChunkWithSecurity))) == TcrMessageHelper::EXCEPTION) { // encountered an exception part, so return without reading more - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3692,20 +3733,20 @@ void ChunkedFunctionExecutionResponse::handleChunk( TcrMessageHelper::NULL_OBJECT) { LOGDEBUG("ChunkedFunctionExecutionResponse::handleChunk nullptr object"); // m_functionExecutionResults->push_back(nullptr); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } int32_t len; int startLen = - input.getBytesRead() - + input->getBytesRead() - 1; // from here need to look value part + memberid AND -1 for array type - input.readArrayLen(&len); + input->readArrayLen(&len); // read a byte to determine whether to read exception part for sendException // or read objects. uint8_t partType; - input.read(&partType); + input->read(&partType); bool isExceptionPart = false; // See If partType is JavaSerializable const int CHUNK_HDR_LEN = 5; @@ -3718,7 +3759,7 @@ void ChunkedFunctionExecutionResponse::handleChunk( if (partType == GeodeTypeIdsImpl::JavaSerializable) { isExceptionPart = true; // reset the input. - input.reset(); + input->reset(); if (((isLastChunkWithSecurity & 0x02) && (chunkLen - static_cast<int32_t>(partLen) <= @@ -3726,54 +3767,54 @@ void ChunkedFunctionExecutionResponse::handleChunk( (((isLastChunkWithSecurity & 0x02) == 0) && (chunkLen - static_cast<int32_t>(partLen) <= CHUNK_HDR_LEN))) { readPart = false; - input.readInt(&partLen); - input.advanceCursor(1); // skip isObject byte - input.advanceCursor(partLen); + input->readInt(&partLen); + input->advanceCursor(1); // skip isObject byte + input->advanceCursor(partLen); } else { // skip first part i.e JavaSerializable. - TcrMessageHelper::skipParts(m_msg, input, 1); + TcrMessageHelper::skipParts(m_msg, *input, 1); // read the second part which is string in usual manner, first its length. - input.readInt(&partLen); + input->readInt(&partLen); int8_t isObject; // then isObject byte - input.read(&isObject); + input->read(&isObject); - startLen = input.getBytesRead(); // reset from here need to look value + startLen = input->getBytesRead(); // reset from here need to look value // part + memberid AND -1 for array type // Since it is contained as a part of other results, read arrayType which // is arrayList = 65. - input.read(&arrayType); + input->read(&arrayType); // then its len which is 2 - input.readArrayLen(&len); + input->readArrayLen(&len); } } else { // rewind cursor by 1 to what we had read a byte to determine whether to // read exception part or read objects. - input.rewindCursor(1); + input->rewindCursor(1); } // Read either object or exception string from sendException. SerializablePtr value; // CacheablePtr memberId; if (readPart) { - input.readObject(value); + input->readObject(value); // TODO: track this memberId for PrFxHa - // input.readObject(memberId); - int objectlen = input.getBytesRead() - startLen; + // input->readObject(memberId); + int objectlen = input->getBytesRead() - startLen; int memberIdLen = partLen - objectlen; - input.advanceCursor(memberIdLen); + input->advanceCursor(memberIdLen); LOGDEBUG("function partlen = %d , objectlen = %d, memberidlen = %d ", partLen, objectlen, memberIdLen); - LOGDEBUG("function input.getBytesRemaining() = %d ", - input.getBytesRemaining()); + LOGDEBUG("function input->getBytesRemaining() = %d ", + input->getBytesRemaining()); // is there any way to assert it, as after that we need to read security // header - /*if(input.getBytesRemaining() != 0) { + /*if(input->getBytesRemaining() != 0) { LOGERROR("Function response not read all bytes"); throw IllegalStateException("Function Execution didn't read all bytes"); }*/ @@ -3796,7 +3837,7 @@ void ChunkedFunctionExecutionResponse::handleChunk( } } - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); // m_functionExecutionResults->push_back(value); } @@ -3809,16 +3850,17 @@ void ChunkedGetAllResponse::reset() { // process a GET_ALL response chunk void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); uint32_t partLen; if (TcrMessageHelper::readChunkPartHeader( - m_msg, input, GeodeTypeIdsImpl::FixedIDByte, + m_msg, *input, GeodeTypeIdsImpl::FixedIDByte, GeodeTypeIdsImpl::VersionedObjectPartList, "ChunkedGetAllResponse", partLen, isLastChunkWithSecurity) != TcrMessageHelper::OBJECT) { // encountered an exception part, so return without reading more - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3827,9 +3869,9 @@ void ChunkedGetAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, &m_trackerMap, m_destroyTracker, m_addToLocalCache, m_dsmemId, m_responseLock); - objectList.fromData(input); + objectList.fromData(*input); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } void ChunkedGetAllResponse::add(const ChunkedGetAllResponse* other) { @@ -3862,20 +3904,21 @@ void ChunkedPutAllResponse::reset() { // process a PUT_ALL response chunk void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); uint32_t partLen; int8_t chunkType; if ((chunkType = (TcrMessageHelper::ChunkObjectType) TcrMessageHelper::readChunkPartHeader( - m_msg, input, GeodeTypeIdsImpl::FixedIDByte, + m_msg, *input, GeodeTypeIdsImpl::FixedIDByte, GeodeTypeIdsImpl::VersionedObjectPartList, "ChunkedPutAllResponse", partLen, isLastChunkWithSecurity)) == TcrMessageHelper::NULL_OBJECT) { LOGDEBUG("ChunkedPutAllResponse::handleChunk nullptr object"); // No issues it will be empty in case of disabled caching. - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3884,21 +3927,22 @@ void ChunkedPutAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, LOGDEBUG("ChunkedPutAllResponse::handleChunk object"); ACE_Recursive_Thread_Mutex responseLock; auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>( + dynamic_cast<ThinClientRegion*>(m_region.get()), m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock); - vcObjPart->fromData(input); + vcObjPart->fromData(*input); m_list->addAll(vcObjPart); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } else { LOGDEBUG("ChunkedPutAllResponse::handleChunk BYTES PART"); int8_t byte0; - input.read(&byte0); + input->read(&byte0); LOGDEBUG("ChunkedPutAllResponse::handleChunk single-hop bytes byte0 = %d ", byte0); int8_t byte1; - input.read(&byte1); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + input->read(&byte1); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); - PoolPtr pool = PoolManager::find(m_msg.getPoolName()); + PoolPtr pool = cache->getPoolManager().find(m_msg.getPoolName()); if (pool != nullptr && !pool->isDestroyed() && pool->getPRSingleHopEnabled()) { ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.get()); @@ -3924,20 +3968,21 @@ void ChunkedRemoveAllResponse::reset() { // process a REMOVE_ALL response chunk void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); uint32_t partLen; int8_t chunkType; if ((chunkType = (TcrMessageHelper::ChunkObjectType) TcrMessageHelper::readChunkPartHeader( - m_msg, input, GeodeTypeIdsImpl::FixedIDByte, + m_msg, *input, GeodeTypeIdsImpl::FixedIDByte, GeodeTypeIdsImpl::VersionedObjectPartList, "ChunkedRemoveAllResponse", partLen, isLastChunkWithSecurity)) == TcrMessageHelper::NULL_OBJECT) { LOGDEBUG("ChunkedRemoveAllResponse::handleChunk nullptr object"); // No issues it will be empty in case of disabled caching. - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); return; } @@ -3946,22 +3991,23 @@ void ChunkedRemoveAllResponse::handleChunk(const uint8_t* chunk, LOGDEBUG("ChunkedRemoveAllResponse::handleChunk object"); ACE_Recursive_Thread_Mutex responseLock; auto vcObjPart = std::make_shared<VersionedCacheableObjectPartList>( + dynamic_cast<ThinClientRegion*>(m_region.get()), m_msg.getChunkedResultHandler()->getEndpointMemId(), responseLock); - vcObjPart->fromData(input); + vcObjPart->fromData(*input); m_list->addAll(vcObjPart); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); } else { LOGDEBUG("ChunkedRemoveAllResponse::handleChunk BYTES PART"); int8_t byte0; - input.read(&byte0); + input->read(&byte0); LOGDEBUG( "ChunkedRemoveAllResponse::handleChunk single-hop bytes byte0 = %d ", byte0); int8_t byte1; - input.read(&byte1); - m_msg.readSecureObjectPart(input, false, true, isLastChunkWithSecurity); + input->read(&byte1); + m_msg.readSecureObjectPart(*input, false, true, isLastChunkWithSecurity); - PoolPtr pool = PoolManager::find(m_msg.getPoolName()); + PoolPtr pool = cache->getPoolManager().find(m_msg.getPoolName()); if (pool != nullptr && !pool->isDestroyed() && pool->getPRSingleHopEnabled()) { ThinClientPoolDM* poolDM = dynamic_cast<ThinClientPoolDM*>(pool.get()); @@ -3985,17 +4031,19 @@ void ChunkedDurableCQListResponse::reset() { } // handles the chunk response for GETDURABLECQS_MSG_TYPE -void ChunkedDurableCQListResponse::handleChunk( - const uint8_t* chunk, int32_t chunkLen, uint8_t isLastChunkWithSecurity) { - DataInput input(chunk, chunkLen); - input.setPoolName(m_msg.getPoolName()); +void ChunkedDurableCQListResponse::handleChunk(const uint8_t* chunk, + int32_t chunkLen, + uint8_t isLastChunkWithSecurity, + const Cache* cache) { + auto input = cache->createDataInput(chunk, chunkLen); + input->setPoolName(m_msg.getPoolName()); // read part length uint32_t partLen; - input.readInt(&partLen); + input->readInt(&partLen); bool isObj; - input.readBoolean(&isObj); + input->readBoolean(&isObj); if (!isObj) { // we're currently always expecting an object @@ -4006,16 +4054,16 @@ void ChunkedDurableCQListResponse::handleChunk( throw MessageException(exMsg); } - input.advanceCursor(1); // skip the CacheableArrayList type ID byte + input->advanceCursor(1); // skip the CacheableArrayList type ID byte int8_t stringParts; - input.read(&stringParts); // read the number of strings in the message this + input->read(&stringParts); // read the number of strings in the message this // is one byte CacheableStringPtr strTemp; for (int i = 0; i < stringParts; i++) { - input.readObject(strTemp); + input->readObject(strTemp); m_resultList->push_back(strTemp); } } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThinClientRegion.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThinClientRegion.hpp b/src/cppcache/src/ThinClientRegion.hpp index e52845e..0ed8d9d 100644 --- a/src/cppcache/src/ThinClientRegion.hpp +++ b/src/cppcache/src/ThinClientRegion.hpp @@ -366,7 +366,7 @@ class ChunkedInterestResponse : public TcrChunkedResult { } virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); }; @@ -404,7 +404,7 @@ class ChunkedQueryResponse : public TcrChunkedResult { } virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); void readObjectPartList(DataInput& input, bool isResultSet); @@ -455,7 +455,7 @@ class ChunkedFunctionExecutionResponse : public TcrChunkedResult { inline bool getResult() const { return m_getResult; } virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); }; typedef std::shared_ptr<ChunkedFunctionExecutionResponse> @@ -506,7 +506,7 @@ class ChunkedGetAllResponse : public TcrChunkedResult { m_responseLock(responseLock) {} virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); void add(const ChunkedGetAllResponse* other); @@ -544,7 +544,7 @@ class ChunkedPutAllResponse : public TcrChunkedResult { m_list(list) {} virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); VersionedCacheableObjectPartListPtr getList() { return m_list; } ACE_Recursive_Thread_Mutex& getResponseLock() { return m_responseLock; } @@ -576,7 +576,7 @@ class ChunkedRemoveAllResponse : public TcrChunkedResult { m_list(list) {} virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); VersionedCacheableObjectPartListPtr getList() { return m_list; } ACE_Recursive_Thread_Mutex& getResponseLock() { return m_responseLock; } @@ -609,7 +609,7 @@ class ChunkedKeySetResponse : public TcrChunkedResult { m_resultKeys(resultKeys) {} virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); }; @@ -632,7 +632,7 @@ class ChunkedDurableCQListResponse : public TcrChunkedResult { inline CacheableArrayListPtr getResults() { return m_resultList; } virtual void handleChunk(const uint8_t* chunk, int32_t chunkLen, - uint8_t isLastChunkWithSecurity); + uint8_t isLastChunkWithSecurity, const Cache* cache); virtual void reset(); }; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThreadPool.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThreadPool.cpp b/src/cppcache/src/ThreadPool.cpp index e66a25a..1efd8f9 100644 --- a/src/cppcache/src/ThreadPool.cpp +++ b/src/cppcache/src/ThreadPool.cpp @@ -25,6 +25,7 @@ #include <geode/DistributedSystem.hpp> #include <geode/SystemProperties.hpp> #include "DistributedSystemImpl.hpp" +#include "CacheImpl.hpp" using namespace apache::geode::client; ThreadPoolWorker::ThreadPoolWorker(IThreadPool* manager) @@ -67,10 +68,11 @@ int ThreadPoolWorker::shutDown(void) { ACE_thread_t ThreadPoolWorker::threadId(void) { return threadId_; } -ThreadPool::ThreadPool() - : shutdown_(0), workersLock_(), workersCond_(workersLock_) { - SystemProperties* sysProp = DistributedSystem::getSystemProperties(); - poolSize_ = sysProp->threadPoolSize(); +ThreadPool::ThreadPool(uint32_t threadPoolSize) + : shutdown_(0), + workersLock_(), + workersCond_(workersLock_), + poolSize_(threadPoolSize) { activate(); } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/ThreadPool.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/ThreadPool.hpp b/src/cppcache/src/ThreadPool.hpp index ce8257f..91812b2 100644 --- a/src/cppcache/src/ThreadPool.hpp +++ b/src/cppcache/src/ThreadPool.hpp @@ -32,9 +32,9 @@ #include <ace/Activation_Queue.h> #include <ace/Condition_T.h> #include <ace/Singleton.h> -#include <ace/Recursive_Thread_Mutex.h> #include <ace/Guard_T.h> - +#include <mutex> +#include <condition_variable> namespace apache { namespace geode { namespace client { @@ -42,43 +42,35 @@ namespace client { template <class T> class PooledWork : public ACE_Method_Request { private: - // ACE_Future<T> result_; T m_retVal; - ACE_Recursive_Thread_Mutex m_mutex; - ACE_Condition<ACE_Recursive_Thread_Mutex> m_cond; + std::recursive_mutex m_mutex; + std::condition_variable_any m_cond; bool m_done; public: - PooledWork() : m_mutex(), m_cond(m_mutex), m_done(false) {} + PooledWork() : m_mutex(), m_cond(), m_done(false) {} virtual ~PooledWork() {} virtual int call(void) { T res = execute(); - ACE_Guard<ACE_Recursive_Thread_Mutex> sync(m_mutex); + std::lock_guard<decltype(m_mutex)> lock(m_mutex); m_retVal = res; m_done = true; - m_cond.broadcast(); - // result_.set(res); - return 0; - } + m_cond.notify_all(); - /* - void attach(ACE_Future_Observer<T> *cb) { - result_.attach(cb); + return 0; } - */ T getResult(void) { - ACE_Guard<ACE_Recursive_Thread_Mutex> sync(m_mutex); + std::unique_lock<decltype(m_mutex)> lock(m_mutex); while (!m_done) { - m_cond.wait(); + m_cond.wait(lock, [this] { return m_done; }); } - // T res; - // result_.get(res); + return m_retVal; } @@ -131,15 +123,14 @@ class ThreadPool : public ACE_Task_Base, IThreadPool { friend class ACE_Singleton<ThreadPool, ACE_Recursive_Thread_Mutex>; public: + ThreadPool(uint32_t threadPoolSize); + virtual ~ThreadPool(); int perform(ACE_Method_Request* req); int svc(void); int shutDown(void); virtual int returnToWork(ThreadPoolWorker* worker); private: - ThreadPool(); - virtual ~ThreadPool(); - ThreadPoolWorker* chooseWorker(void); int createWorkerPool(void); int done(void); @@ -154,8 +145,6 @@ class ThreadPool : public ACE_Task_Base, IThreadPool { ACE_Activation_Queue queue_; static const char* NC_Pool_Thread; }; - -typedef ACE_Singleton<ThreadPool, ACE_Recursive_Thread_Mutex> TPSingleton; } // namespace client } // namespace geode } // namespace apache http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneExpiryHandler.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TombstoneExpiryHandler.cpp b/src/cppcache/src/TombstoneExpiryHandler.cpp index c5f5512..399131e 100644 --- a/src/cppcache/src/TombstoneExpiryHandler.cpp +++ b/src/cppcache/src/TombstoneExpiryHandler.cpp @@ -31,10 +31,12 @@ using namespace apache::geode::client; TombstoneExpiryHandler::TombstoneExpiryHandler(TombstoneEntryPtr entryPtr, TombstoneList* tombstoneList, - uint32_t duration) + uint32_t duration, + CacheImpl* cacheImpl) : m_entryPtr(entryPtr), m_duration(duration), - m_tombstoneList(tombstoneList) {} + m_tombstoneList(tombstoneList), + m_cacheImpl(cacheImpl) {} int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time, const void* arg) { @@ -59,7 +61,7 @@ int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time, "Resetting expiry task %d secs later for key " "[%s]", -sec / 1000 + 1, Utils::getCacheableKeyString(key)->asChar()); - CacheImpl::expiryTaskManager->resetTask( + m_cacheImpl->getExpiryTaskManager().resetTask( static_cast<long>(m_entryPtr->getExpiryTaskId()), uint32_t(-sec / 1000 + 1)); return 0; @@ -71,7 +73,8 @@ int TombstoneExpiryHandler::handle_timeout(const ACE_Time_Value& current_time, Utils::getCacheableKeyString(key)->asChar()); // we now delete the handler in GF_Timer_Heap_ImmediateReset_T // and always return success. - CacheImpl::expiryTaskManager->resetTask(static_cast<long>(expiryTaskId), 0); + m_cacheImpl->getExpiryTaskManager().resetTask(static_cast<long>(expiryTaskId), + 0); return 0; } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneExpiryHandler.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TombstoneExpiryHandler.hpp b/src/cppcache/src/TombstoneExpiryHandler.hpp index 233b4b5..e4ef730 100644 --- a/src/cppcache/src/TombstoneExpiryHandler.hpp +++ b/src/cppcache/src/TombstoneExpiryHandler.hpp @@ -43,10 +43,11 @@ namespace client { class CPPCACHE_EXPORT TombstoneExpiryHandler : public ACE_Event_Handler { public: /** -* Constructor -*/ + * Constructor + */ TombstoneExpiryHandler(TombstoneEntryPtr entryPtr, - TombstoneList* tombstoneList, uint32_t duration); + TombstoneList* tombstoneList, uint32_t duration, + CacheImpl* cacheImpl); /** This task object will be registered with the Timer Queue. * When the timer expires the handle_timeout is invoked. @@ -65,6 +66,7 @@ class CPPCACHE_EXPORT TombstoneExpiryHandler : public ACE_Event_Handler { // Duration after which the task should be reset in case of // modification. uint32_t m_duration; + CacheImpl* m_cacheImpl; // perform the actual expiration action void DoTheExpirationAction(const CacheableKeyPtr& key); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneList.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TombstoneList.cpp b/src/cppcache/src/TombstoneList.cpp index 31ab22a..d32243a 100644 --- a/src/cppcache/src/TombstoneList.cpp +++ b/src/cppcache/src/TombstoneList.cpp @@ -39,46 +39,39 @@ long TombstoneList::getExpiryTask(TombstoneExpiryHandler** handler) { // This function is not guarded as all functions of this class are called from // MapSegment // read TombstoneTImeout from systemProperties. - uint32_t duration = - DistributedSystem::getSystemProperties()->tombstoneTimeoutInMSec() / 1000; + uint32_t duration = m_cacheImpl->getDistributedSystem() + .getSystemProperties() + .tombstoneTimeoutInMSec() / + 1000; ACE_Time_Value currTime(ACE_OS::gettimeofday()); auto tombstoneEntryPtr = std::make_shared<TombstoneEntry>( nullptr, static_cast<int64_t>(currTime.get_msec())); - *handler = new TombstoneExpiryHandler(tombstoneEntryPtr, this, duration); + *handler = new TombstoneExpiryHandler(tombstoneEntryPtr, this, duration, + m_cacheImpl); tombstoneEntryPtr->setHandler(*handler); - long id = - CacheImpl::expiryTaskManager->scheduleExpiryTask(*handler, duration, 0); + long id = m_cacheImpl->getExpiryTaskManager().scheduleExpiryTask(*handler, + duration, 0); return id; } -void TombstoneList::add(RegionInternal* rptr, const MapEntryImplPtr& entry, +void TombstoneList::add(const MapEntryImplPtr& entry, TombstoneExpiryHandler* handler, long taskid) { // This function is not guarded as all functions of this class are called from // MapSegment // read TombstoneTImeout from systemProperties. - // uint32_t duration = - // DistributedSystem::getSystemProperties()->tombstoneTimeoutInMSec()/1000; ACE_Time_Value currTime(ACE_OS::gettimeofday()); auto tombstoneEntryPtr = std::make_shared<TombstoneEntry>( entry, static_cast<int64_t>(currTime.get_msec())); - // TombstoneExpiryHandler* handler = new - // TombstoneExpiryHandler(tombstoneEntryPtr, this, duration); handler->setTombstoneEntry(tombstoneEntryPtr); tombstoneEntryPtr->setHandler(handler); - // long id = CacheImpl::expiryTaskManager->scheduleExpiryTask( - // handler, duration, 0); CacheableKeyPtr key; entry->getKeyI(key); - /*if (Log::finestEnabled()) { - LOGFINEST("tombstone expiry for key [%s], task id = %d, " - "duration = %d", - Utils::getCacheableKeyString(key)->asChar(), id, duration); - }*/ + tombstoneEntryPtr->setExpiryTaskId(taskid); m_tombstoneMap[key] = tombstoneEntryPtr; - rptr->getCacheImpl()->m_cacheStats->incTombstoneCount(); + m_cacheImpl->getCachePerfStats().incTombstoneCount(); int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD; - rptr->getCacheImpl()->m_cacheStats->incTombstoneSize(tombstonesize); + m_cacheImpl->getCachePerfStats().incTombstoneSize(tombstonesize); } // Reaps the tombstones which have been gc'ed on server. @@ -137,36 +130,34 @@ bool TombstoneList::exists(const CacheableKeyPtr& key) const { } void TombstoneList::eraseEntryFromTombstoneList(const CacheableKeyPtr& key, - RegionInternal* region, bool cancelTask) { // This function is not guarded as all functions of this class are called from // MapSegment if (exists(key)) { if (cancelTask) { - CacheImpl::expiryTaskManager->cancelTask( + m_cacheImpl->getExpiryTaskManager().cancelTask( static_cast<long>(m_tombstoneMap[key]->getExpiryTaskId())); delete m_tombstoneMap[key]->getHandler(); } - region->getCacheImpl()->m_cacheStats->decTombstoneCount(); + m_cacheImpl->getCachePerfStats().decTombstoneCount(); int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD; - region->getCacheImpl()->m_cacheStats->decTombstoneSize(tombstonesize); + m_cacheImpl->getCachePerfStats().decTombstoneSize(tombstonesize); m_tombstoneMap.erase(key); } } long TombstoneList::eraseEntryFromTombstoneListWithoutCancelTask( - const CacheableKeyPtr& key, RegionInternal* region, - TombstoneExpiryHandler*& handler) { + const CacheableKeyPtr& key, TombstoneExpiryHandler*& handler) { // This function is not guarded as all functions of this class are called from // MapSegment long taskid = -1; if (exists(key)) { taskid = static_cast<long>(m_tombstoneMap[key]->getExpiryTaskId()); handler = m_tombstoneMap[key]->getHandler(); - region->getCacheImpl()->m_cacheStats->decTombstoneCount(); + m_cacheImpl->getCachePerfStats().decTombstoneCount(); int32_t tombstonesize = key->objectSize() + SIZEOF_TOMBSTONEOVERHEAD; - region->getCacheImpl()->m_cacheStats->decTombstoneSize(tombstonesize); + m_cacheImpl->getCachePerfStats().decTombstoneSize(tombstonesize); m_tombstoneMap.erase(key); } return taskid; @@ -175,8 +166,9 @@ long TombstoneList::eraseEntryFromTombstoneListWithoutCancelTask( void TombstoneList::cleanUp() { // This function is not guarded as all functions of this class are called from // MapSegment + auto& expiryTaskManager = m_cacheImpl->getExpiryTaskManager(); for (const auto& queIter : m_tombstoneMap) { - CacheImpl::expiryTaskManager->cancelTask(queIter.second->getExpiryTaskId()); + expiryTaskManager.cancelTask(queIter.second->getExpiryTaskId()); delete queIter.second->getHandler(); } } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TombstoneList.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TombstoneList.hpp b/src/cppcache/src/TombstoneList.hpp index 66260c1..3178b29 100644 --- a/src/cppcache/src/TombstoneList.hpp +++ b/src/cppcache/src/TombstoneList.hpp @@ -72,10 +72,11 @@ typedef std::shared_ptr<TombstoneEntry> TombstoneEntryPtr; class TombstoneList { public: - TombstoneList(MapSegment* mapSegment) { m_mapSegment = mapSegment; } + TombstoneList(MapSegment* mapSegment, CacheImpl* cacheImpl) + : m_mapSegment(mapSegment), m_cacheImpl(cacheImpl) {} virtual ~TombstoneList() { cleanUp(); } - void add(RegionInternal* rptr, const MapEntryImplPtr& entry, - TombstoneExpiryHandler* handler, long taskID); + void add(const MapEntryImplPtr& entry, TombstoneExpiryHandler* handler, + long taskID); // Reaps the tombstones which have been gc'ed on server. // A map that has identifier for ClientProxyMembershipID as key @@ -84,11 +85,9 @@ class TombstoneList { void reapTombstones(std::map<uint16_t, int64_t>& gcVersions); void reapTombstones(CacheableHashSetPtr removedKeys); void eraseEntryFromTombstoneList(const CacheableKeyPtr& key, - RegionInternal* region, bool cancelTask = true); long eraseEntryFromTombstoneListWithoutCancelTask( - const CacheableKeyPtr& key, RegionInternal* region, - TombstoneExpiryHandler*& handler); + const CacheableKeyPtr& key, TombstoneExpiryHandler*& handler); void cleanUp(); long getExpiryTask(TombstoneExpiryHandler** handler); bool exists(const CacheableKeyPtr& key) const; @@ -103,6 +102,7 @@ class TombstoneList { TombstoneMap m_tombstoneMap; ACE_Recursive_Thread_Mutex m_queueLock; MapSegment* m_mapSegment; + CacheImpl* m_cacheImpl; friend class TombstoneExpiryHandler; }; typedef std::shared_ptr<TombstoneList> TombstoneListPtr; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/TypeRegistry.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/TypeRegistry.cpp b/src/cppcache/src/TypeRegistry.cpp new file mode 100644 index 0000000..f9ee8cd --- /dev/null +++ b/src/cppcache/src/TypeRegistry.cpp @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "geode/TypeRegistry.hpp" +#include "CacheRegionHelper.hpp" +#include "CacheImpl.hpp" + +/** + TypeRegistry is the public facing wrapper for the serialization registry. +**/ + +TypeRegistry::TypeRegistry(Cache &cache) : m_cache(cache) {} + +void TypeRegistry::registerType(TypeFactoryMethod creationFunction) { + CacheRegionHelper::getCacheImpl(&m_cache) + ->getSerializationRegistry() + ->addType(creationFunction); +} + +void TypeRegistry::registerPdxType(TypeFactoryMethodPdx creationFunction) { + CacheRegionHelper::getCacheImpl(&m_cache) + ->getSerializationRegistry() + ->addPdxType(creationFunction); +} + +void TypeRegistry::registerPdxSerializer(PdxSerializerPtr pdxSerializer) { + CacheRegionHelper::getCacheImpl(&m_cache) + ->getSerializationRegistry() + ->setPdxSerializer(pdxSerializer); +} http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/Utils.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/Utils.cpp b/src/cppcache/src/Utils.cpp index bc0605d..b64e3db 100644 --- a/src/cppcache/src/Utils.cpp +++ b/src/cppcache/src/Utils.cpp @@ -214,6 +214,17 @@ int32_t Utils::logWideString(char* buf, size_t maxLen, const wchar_t* wStr) { } } +int64_t Utils::startStatOpTime() { + return std::chrono::duration_cast<std::chrono::nanoseconds>( + std::chrono::steady_clock::now().time_since_epoch()) + .count(); +} + +void Utils::updateStatOpTime(statistics::Statistics* m_regionStats, int32_t statId, + int64_t start) { + m_regionStats->incLong(statId, startStatOpTime() - start); +} + } // namespace client } // namespace geode -} // namespace apache +} // namespace apache \ No newline at end of file http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/Utils.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/Utils.hpp b/src/cppcache/src/Utils.hpp index 8dbe76d..3e62fb0 100644 --- a/src/cppcache/src/Utils.hpp +++ b/src/cppcache/src/Utils.hpp @@ -24,6 +24,11 @@ * @file */ +#include <typeinfo> +#include <string> +#include <unordered_set> +#include <memory> + #include <geode/geode_globals.hpp> #include <geode/geode_base.hpp> #include <geode/ExceptionTypes.hpp> @@ -141,18 +146,7 @@ class CPPCACHE_EXPORT Utils { return CacheableString::create("(null)"); } - inline static int64_t startStatOpTime() { - if (DistributedSystem::getSystemProperties() != nullptr) { - return (DistributedSystem::getSystemProperties() - ->getEnableTimeStatistics()) - ? std::chrono::duration_cast<std::chrono::nanoseconds>( - std::chrono::steady_clock::now().time_since_epoch()) - .count() - : 0; - } else { - return 0; - } - } + static int64_t startStatOpTime(); // Check objectSize() implementation return value and log a warning at most // once. @@ -176,14 +170,8 @@ class CPPCACHE_EXPORT Utils { return objectSize; } - inline static void updateStatOpTime(statistics::Statistics* m_regionStats, - int32_t statId, int64_t start) { - if (DistributedSystem::getSystemProperties() != nullptr) { - if (DistributedSystem::getSystemProperties()->getEnableTimeStatistics()) { - m_regionStats->incLong(statId, startStatOpTime() - start); - } - } - } + static void updateStatOpTime(statistics::Statistics* m_regionStats, + int32_t statId, int64_t start); static void parseEndpointNamesString( const char* endpoints, std::unordered_set<std::string>& endpointNames); http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionStamp.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/VersionStamp.cpp b/src/cppcache/src/VersionStamp.cpp index 6c7becc..8a95356 100644 --- a/src/cppcache/src/VersionStamp.cpp +++ b/src/cppcache/src/VersionStamp.cpp @@ -168,7 +168,7 @@ GfErrType VersionStamp::checkForConflict(const RegionInternal* region, } if (!apply) { - region->getCacheImpl()->m_cacheStats->incConflatedEvents(); + region->getCacheImpl()->getCachePerfStats().incConflatedEvents(); return GF_CACHE_CONCURRENT_MODIFICATION_EXCEPTION; } return GF_NOERR; http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionTag.cpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/VersionTag.cpp b/src/cppcache/src/VersionTag.cpp index cfb133e..8d57e81 100644 --- a/src/cppcache/src/VersionTag.cpp +++ b/src/cppcache/src/VersionTag.cpp @@ -22,27 +22,21 @@ using namespace apache::geode::client; -VersionTag::VersionTag() { - m_bits = 0; - m_entryVersion = 0; - m_regionVersionHighBytes = 0; - m_regionVersionLowBytes = 0; - m_timeStamp = 0; - m_internalMemId = 0; - m_previousMemId = 0; -} +VersionTag::VersionTag(MemberListForVersionStamp& memberListForVersionStamp) + : VersionTag(0, 0, 0, 0, 0, memberListForVersionStamp) {} VersionTag::VersionTag(int32_t entryVersion, int16_t regionVersionHighBytes, int32_t regionVersionLowBytes, uint16_t internalMemId, - uint16_t previousMemId) { - m_bits = 0; - m_entryVersion = entryVersion; - m_regionVersionHighBytes = regionVersionHighBytes; - m_regionVersionLowBytes = regionVersionLowBytes; - m_timeStamp = 0; - m_internalMemId = internalMemId; - m_previousMemId = previousMemId; -} + uint16_t previousMemId, + MemberListForVersionStamp& memberListForVersionStamp) + : m_bits(0), + m_entryVersion(entryVersion), + m_regionVersionHighBytes(regionVersionHighBytes), + m_regionVersionLowBytes(regionVersionLowBytes), + m_timeStamp(0), + m_internalMemId(internalMemId), + m_previousMemId(previousMemId), + m_memberListForVersionStamp(memberListForVersionStamp) {} VersionTag::~VersionTag() {} @@ -80,7 +74,11 @@ Serializable* VersionTag::fromData(DataInput& input) { return this; } -Serializable* VersionTag::createDeserializable() { return new VersionTag(); } +Serializable* VersionTag::createDeserializable( + MemberListForVersionStamp& memberListForVersionStamp) { + return new VersionTag(memberListForVersionStamp); +} + void VersionTag::replaceNullMemberId(uint16_t memId) { if (m_previousMemId == 0) { m_previousMemId = memId; @@ -90,24 +88,19 @@ void VersionTag::replaceNullMemberId(uint16_t memId) { } } void VersionTag::readMembers(uint16_t flags, DataInput& input) { - ClientProxyMembershipIDPtr previousMemId, internalMemId; - MemberListForVersionStampPtr memberList = - CacheImpl::getMemberListForVersionStamp(); if ((flags & HAS_MEMBER_ID) != 0) { - internalMemId = ClientProxyMembershipIDPtr(new ClientProxyMembershipID()); - + auto internalMemId = std::make_shared<ClientProxyMembershipID>(); internalMemId->readEssentialData(input); - m_internalMemId = - memberList->add((DSMemberForVersionStampPtr)internalMemId); + m_internalMemId = m_memberListForVersionStamp.add( + (DSMemberForVersionStampPtr)internalMemId); } if ((flags & HAS_PREVIOUS_MEMBER_ID) != 0) { if ((flags & DUPLICATE_MEMBER_IDS) != 0) { m_previousMemId = m_internalMemId; } else { - previousMemId = ClientProxyMembershipIDPtr(new ClientProxyMembershipID()); + auto previousMemId = std::make_shared<ClientProxyMembershipID>(); previousMemId->readEssentialData(input); - m_previousMemId = - memberList->add((DSMemberForVersionStampPtr)previousMemId); + m_previousMemId = m_memberListForVersionStamp.add(previousMemId); } } } http://git-wip-us.apache.org/repos/asf/geode-native/blob/da389793/src/cppcache/src/VersionTag.hpp ---------------------------------------------------------------------- diff --git a/src/cppcache/src/VersionTag.hpp b/src/cppcache/src/VersionTag.hpp index d645784..0489009 100644 --- a/src/cppcache/src/VersionTag.hpp +++ b/src/cppcache/src/VersionTag.hpp @@ -40,6 +40,7 @@ class VersionTag : public Cacheable { uint16_t m_internalMemId; uint16_t m_previousMemId; int64_t m_timeStamp; + MemberListForVersionStamp& m_memberListForVersionStamp; static const uint8_t HAS_MEMBER_ID = 0x01; static const uint8_t HAS_PREVIOUS_MEMBER_ID = 0x02; @@ -53,7 +54,7 @@ class VersionTag : public Cacheable { virtual void readMembers(uint16_t flags, DataInput& input); public: - VersionTag(); + VersionTag(MemberListForVersionStamp& memberListForVersionStamp); virtual ~VersionTag(); @@ -65,7 +66,8 @@ class VersionTag : public Cacheable { virtual Serializable* fromData(DataInput& input); - static Serializable* createDeserializable(); + static Serializable* createDeserializable( + MemberListForVersionStamp& memberListForVersionStamp); int32_t getEntryVersion() const { return m_entryVersion; } int16_t getRegionVersionHighBytes() const { return m_regionVersionHighBytes; } @@ -83,7 +85,8 @@ class VersionTag : public Cacheable { */ VersionTag(int32_t entryVersion, int16_t regionVersionHighBytes, int32_t regionVersionLowBytes, uint16_t internalMemId, - uint16_t previousMemId); + uint16_t previousMemId, + MemberListForVersionStamp& memberListForVersionStamp); }; } // namespace client } // namespace geode