This is an automated email from the ASF dual-hosted git repository. echobravo pushed a commit to branch fakeDev2 in repository https://gitbox.apache.org/repos/asf/geode-native.git
The following commit(s) were added to refs/heads/fakeDev2 by this push: new df1d211 YABB test df1d211 is described below commit df1d211db0c19b048d8db47b69491772bf179c33 Author: Ernest Burghardt <eburgha...@pivotal.io> AuthorDate: Wed Feb 14 13:39:37 2018 -0700 YABB test --- cppcache/src/TcrEndpoint.cpp | 1353 ------------------------------------------ 1 file changed, 1353 deletions(-) diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp deleted file mode 100644 index c6503fb..0000000 --- a/cppcache/src/TcrEndpoint.cpp +++ /dev/null @@ -1,1353 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include <thread> -#include <chrono> -#include <ace/OS.h> - -#include <geode/SystemProperties.hpp> -#include <geode/AuthInitialize.hpp> - -#include "TcrEndpoint.hpp" -#include "ThinClientRegion.hpp" -#include "ThinClientPoolHADM.hpp" -#include "StackTrace.hpp" -#include "CacheImpl.hpp" -#include "Utils.hpp" -#include "DistributedSystemImpl.hpp" -#include "util/exception.hpp" - -namespace apache { -namespace geode { -namespace client { - -#define throwException(ex) \ - { \ - LOGFINEST("%s: %s", ex.getName(), ex.what()); \ - throw ex; \ - } -/* -This is replaced by the connect-timeout (times 3) system property for SR # 6525. -#define DEFAULT_CALLBACK_CONNECTION_TIMEOUT_SECONDS 180 -*/ -const char* TcrEndpoint::NC_Notification = "NC Notification"; - -TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl, - ACE_Semaphore& failoverSema, - ACE_Semaphore& cleanupSema, - ACE_Semaphore& redundancySema, ThinClientBaseDM* DM, - bool isMultiUserMode) - : m_needToConnectInLock(false), - m_connectLockCond(m_connectLock), - m_maxConnections(cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectionPoolSize()), - m_notifyConnection(0), - m_notifyReceiver(0), - m_numRegionListener(0), - m_isQueueHosted(false), - m_uniqueId(0), - m_isAuthenticated(false), - m_msgSent(false), - m_pingSent(false), - m_numberOfTimesFailed(0), - m_isMultiUserMode(isMultiUserMode), - m_name(name), - m_connected(false), - m_isActiveEndpoint(false), - m_numRegions(0), - m_pingTimeouts(0), - m_notifyCount(0), - m_cacheImpl(cacheImpl), - m_failoverSema(failoverSema), - m_cleanupSema(cleanupSema), - m_notificationCleanupSema(0), - m_redundancySema(redundancySema), - m_dupCount(0), - m_serverQueueStatus(NON_REDUNDANT_SERVER), - m_isServerQueueStatusSet(false), - m_queueSize(0), - // m_poolHADM( poolHADM ), - m_baseDM(DM), - m_noOfConnRefs(0), - m_distributedMemId(0) { - /* - m_name = Utils::convertHostToCanonicalForm(m_name.c_str() ); - */ -} - -TcrEndpoint::~TcrEndpoint() { - m_connected = false; - m_isActiveEndpoint = false; - closeConnections(); - { - // force close the notification channel -- see bug #295 - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - if (m_numRegionListener > 0) { - LOGFINE( - "Connection to %s still has references " - "to subscription channel while closing", - m_name.c_str()); - // fail in dev build to track #295 better in regressions - GF_DEV_ASSERT(m_numRegionListener == 0); - - m_numRegionListener = 0; - closeNotification(); - } - } - while (m_notifyCount > 0) { - LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d", - m_notifyCount); - m_notificationCleanupSema.acquire(); - m_notifyCount--; - } - LOGFINE("Connection to %s deleted", m_name.c_str()); -} - -inline bool TcrEndpoint::needtoTakeConnectLock() { -#ifdef __linux - if (m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectWaitTimeout() > std::chrono::seconds::zero()) { - return m_needToConnectInLock; // once pipe or other socket error will take - // lock to connect. - } - return false; // once pipe or other socket error will take lock to connect. -#else - return false; -#endif -} - -GfErrType TcrEndpoint::createNewConnectionWL( - TcrConnection*& newConn, bool isClientNotification, bool isSecondary, - std::chrono::microseconds connectTimeout) { - LOGFINE("TcrEndpoint::createNewConnectionWL"); - auto connectWaitTimeout = m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectWaitTimeout(); - ACE_Time_Value interval(connectWaitTimeout); - ACE_Time_Value stopAt(ACE_OS::gettimeofday()); - stopAt += interval; - bool connCreated = false; - - while (ACE_OS::gettimeofday() < stopAt) { - int32_t ret = m_connectLock.acquire(&stopAt); - - LOGFINE( - "TcrEndpoint::createNewConnectionWL ret = %d interval = %ld error =%s", - ret, interval.get_msec(), ACE_OS::strerror(ACE_OS::last_error())); - - if (ret != -1) { // got lock - try { - LOGFINE("TcrEndpoint::createNewConnectionWL got lock"); - newConn = - new TcrConnection(m_cacheImpl->tcrConnectionManager(), m_connected); - newConn->InitTcrConnection(this, m_name.c_str(), m_ports, - isClientNotification, isSecondary, - connectTimeout); - - connCreated = true; // to break while loop - - m_needToConnectInLock = false; // no need to take lock - - m_connectLock.release(); - LOGFINE("New Connection Created"); - break; - } catch (const TimeoutException&) { - LOGINFO("Timeout1 in handshake with endpoint[%s]", m_name.c_str()); - m_connectLock.release(); - // throw te; - return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; - } catch (std::exception& ex) { - m_connectLock.release(); - LOGWARN("Failed1 in handshake with endpoint[%s]: %s", m_name.c_str(), - ex.what()); - // throw ex; - return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; - } catch (...) { - LOGWARN("Unknown1 failure in handshake with endpoint[%s]", - m_name.c_str()); - m_connectLock.release(); - // throw; - return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA; - } - } - } - - if (!connCreated) { - LOGFINE("TcrEndpoint::createNewConnectionWL timeout"); - // throwException(TimeoutException("Thread is hanged in connect call")); - return GF_CLIENT_WAIT_TIMEOUT; - } - - return GF_NOERR; -} - -GfErrType TcrEndpoint::createNewConnection( - TcrConnection*& newConn, bool isClientNotification, bool isSecondary, - std::chrono::microseconds connectTimeout, int32_t timeoutRetries, - bool sendUpdateNotification, bool appThreadRequest) { - LOGFINE( - "TcrEndpoint::createNewConnection: connectTimeout =%d " - "m_needToConnectInLock=%d appThreadRequest =%d", - connectTimeout.count(), m_needToConnectInLock, appThreadRequest); - GfErrType err = GF_NOERR; - newConn = nullptr; - while (timeoutRetries-- >= 0) { - try { - if (newConn == nullptr) { - if (!needtoTakeConnectLock() || !appThreadRequest) { - newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(), - m_connected); - bool authenticate = newConn->InitTcrConnection( - this, m_name.c_str(), m_ports, isClientNotification, isSecondary, - connectTimeout); - if (authenticate) { - authenticateEndpoint(newConn); - } - } else { - err = createNewConnectionWL(newConn, isClientNotification, - isSecondary, connectTimeout); - if (err == GF_CLIENT_WAIT_TIMEOUT || - err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) { - break; - } - } - // m_connected = true; - } - if (!isClientNotification && sendUpdateNotification) { - bool notificationStarted; - { - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - notificationStarted = (m_numRegionListener > 0) || m_isQueueHosted; - } - if (notificationStarted) { - LOGFINE("Sending update notification message to endpoint %s", - m_name.c_str()); - TcrMessageUpdateClientNotification updateNotificationMsg( - newConn->getConnectionManager() - .getCacheImpl() - ->getCache() - ->createDataOutput(), - static_cast<int32_t>(newConn->getPort())); - newConn->send(updateNotificationMsg.getMsgData(), - updateNotificationMsg.getMsgLength()); - } - } - err = GF_NOERR; - break; - } catch (const TimeoutException&) { - LOGINFO("Timeout in handshake with endpoint[%s]", m_name.c_str()); - err = GF_TIMOUT; - m_needToConnectInLock = true; // while creating the connection - std::this_thread::sleep_for(std::chrono::milliseconds(50)); - } catch (const GeodeIOException& ex) { - LOGINFO("IO error[%d] in handshake with endpoint[%s]: %s", - ACE_OS::last_error(), m_name.c_str(), ex.what()); - err = GF_IOERR; - m_needToConnectInLock = true; // while creating the connection - break; - } catch (const AuthenticationFailedException& ex) { - LOGWARN("Authentication failed in handshake with endpoint[%s]: %s", - m_name.c_str(), ex.what()); - err = GF_AUTHENTICATION_FAILED_EXCEPTION; - break; - } catch (const AuthenticationRequiredException& ex) { - LOGWARN("Authentication required in handshake with endpoint[%s]: %s", - m_name.c_str(), ex.what()); - err = GF_AUTHENTICATION_REQUIRED_EXCEPTION; - break; - } catch (const CacheServerException& ex) { - LOGWARN("Exception in handshake on server[%s]: %s", m_name.c_str(), - ex.what()); - err = GF_CACHESERVER_EXCEPTION; - break; - } catch (const Exception& ex) { - LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(), - ex.what()); - err = GF_MSG; - break; - } catch (std::exception& ex) { - LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(), - ex.what()); - err = GF_MSG; - break; - } catch (...) { - LOGWARN("Unknown failure in handshake with endpoint[%s]", m_name.c_str()); - err = GF_MSG; - break; - } - } - if (err != GF_NOERR && newConn != nullptr) { - _GEODE_SAFE_DELETE(newConn); - } - return err; -} - -void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) { - LOGDEBUG( - "TcrEndpoint::authenticateEndpoint m_isAuthenticated = %d " - "this->m_baseDM = %d", - m_isAuthenticated, m_baseDM); - if (!m_isAuthenticated && m_baseDM) { - this->setConnected(); - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock); - GfErrType err = GF_NOERR; - auto creds = getCredentials(); - - if (creds != nullptr) { - LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d", - creds->getSize()); - } else { - LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app "); - } - - TcrMessageUserCredential request( - m_cacheImpl->getCache()->createDataOutput(), creds, m_baseDM); - - LOGDEBUG("request is created"); - TcrMessageReply reply(true, this->m_baseDM); - // err = this->sendRequestToEP(request, reply, ( *it ).int_id_); - err = this->sendRequestConnWithRetry(request, reply, conn); - LOGDEBUG("authenticateEndpoint error = %d", err); - if (err == GF_NOERR) { - // put the object into local region - switch (reply.getMessageType()) { - case TcrMessage::RESPONSE: { - // nothing to be done; - break; - } - case TcrMessage::EXCEPTION: { - err = ThinClientRegion::handleServerException("AuthException", - reply.getException()); - break; - } - default: { - LOGERROR("Unknown message type %d while sending credentials", - reply.getMessageType()); - err = GF_MSG; - break; - } - } - } - // throw exception if it is not authenticated - GfErrTypeToException("TcrEndpoint::authenticateEndpoint", err); - - m_isAuthenticated = true; - } -} -std::shared_ptr<Properties> TcrEndpoint::getCredentials() { - const auto& distributedSystem = m_cacheImpl->getDistributedSystem(); - const auto& tmpSecurityProperties = - distributedSystem.getSystemProperties().getSecurityProperties(); - - if (const auto& authInitialize = m_cacheImpl->getAuthInitialize()) { - LOGFINER( - "Acquired handle to AuthInitialize plugin, " - "getting credentials for %s", - m_name.c_str()); - const auto& tmpAuthIniSecurityProperties = - authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str()); - LOGFINER("Done getting credentials"); - return tmpAuthIniSecurityProperties; - } - return nullptr; -} - -ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus( - int32_t& queueSize, bool addToQueue, TcrConnection*& statusConn) { - GfErrType err = GF_NOERR; - TcrConnection* newConn; - ServerQueueStatus status = NON_REDUNDANT_SERVER; - - err = createNewConnection(newConn, false, false, - m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectTimeout()); - if (err == GF_NOERR) { - status = newConn->getServerQueueStatus(queueSize); - - if (status == REDUNDANT_SERVER || status == PRIMARY_SERVER) { - if (addToQueue) { - m_opConnections.put(newConn, true); - } else { - statusConn = newConn; - } - m_connected = true; - return status; - } else { - // remove port from ports list (which is sent to server in notification - // handshake). - closeConnection(newConn); - return status; - } - } - - return status; -} - -GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary, - bool isActiveEndpoint, - ThinClientBaseDM* distMgr) { - // Pre-conditions: - // 1. If this is a secondary server then clientNotification must be true - GF_DEV_ASSERT(!isSecondary || clientNotification); - - bool connected = false; - GfErrType err = GF_NOERR; - - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); - // Three cases here: - // 1. m_connected is false, m_isActiveEndpoint is false and then - // if isActiveEndpoint is true, then create 'max' connections - // 2. m_connected is false, m_isActiveEndpoint is false and then - // if isActiveEndpoint is false, then create just one connection - // to ping the server - // 3. m_connected is true, m_isActiveEndpoint is false (i.e. server was - // previously not an active endpoint) then if isSecondary is false then - // create 'max-1' connections else do nothing - m_opConnections.reset(); - if (m_maxConnections <= 0) { - connected = true; - } else if (!m_isActiveEndpoint) { - int maxConnections = 0; - if (isActiveEndpoint) { - if (m_connected) { - maxConnections = m_maxConnections - 1; - } else { - maxConnections = m_maxConnections; - } - } else if (!m_connected) { - maxConnections = 1; - } - if (maxConnections > 0) { - LOGINFO("Starting Handshake with %s%s", - (isSecondary ? "secondary server " - : (isActiveEndpoint ? "" : "primary server ")), - m_name.c_str()); - for (int connNum = 0; connNum < maxConnections; ++connNum) { - TcrConnection* newConn; - if ((err = createNewConnection(newConn, false, false, - m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectTimeout(), - 0, m_connected)) != GF_NOERR) { - m_connected = false; - m_isActiveEndpoint = false; - closeConnections(); - return err; - } - m_opConnections.put(newConn, true); - } - LOGINFO("Handshake with %s%s success", - (isSecondary ? "secondary server " - : (isActiveEndpoint ? "" : "primary server ")), - m_name.c_str()); - m_connected = true; - m_isActiveEndpoint = isActiveEndpoint; - } - } - - if (m_connected || connected) { - if (clientNotification) { - if (distMgr != nullptr) { - ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock); - m_distMgrs.push_back(distMgr); - } - LOGFINEST( - "Registering subscription " - "channel for endpoint %s", - m_name.c_str()); - // setup notification channel for the first region - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - if (m_numRegionListener == 0) { - if ((err = createNewConnection(m_notifyConnection, true, isSecondary, - m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectTimeout() * - 3, - 0)) != GF_NOERR) { - m_connected = false; - m_isActiveEndpoint = false; - closeConnections(); - LOGWARN("Failed to start subscription channel for endpoint %s", - m_name.c_str()); - return err; - } - m_notifyReceiver = new Task<TcrEndpoint>( - this, &TcrEndpoint::receiveNotification, NC_Notification); - m_notifyReceiver->start(); - } - ++m_numRegionListener; - LOGFINEST("Incremented notification region count for endpoint %s to %d", - m_name.c_str(), m_numRegionListener); - m_connected = true; - } - } - - // Post-conditions: - // 1. The endpoint should be marked as active, only if m_connected is true - // 2. If this is not an active endpoint and it is connected then only one - // connection + notify channel - GF_DEV_ASSERT(!m_isActiveEndpoint || m_connected); -#if GF_DEVEL_ASSERTS == 1 - int numConnections = m_opConnections.size(); - if (!m_isActiveEndpoint && !isActiveEndpoint && m_connected && - (numConnections != 1 || m_numRegionListener <= 0 || - m_notifyReceiver == nullptr)) { - LOGWARN( - "Inactive connected endpoint does not have exactly one " - "connection. Number of connections: %d, number of region listeners: " - "%d", - numConnections, m_numRegionListener); - } -#endif - - return err; -} - -void TcrEndpoint::unregisterDM(bool clientNotification, - ThinClientBaseDM* distMgr, - bool checkQueueHosted) { - if (clientNotification) { - LOGFINEST( - "Closing subscription " - "channel for endpoint %s", - m_name.c_str()); - // close notification channel if there is no region - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - if (m_numRegionListener > 0 && --m_numRegionListener == 0) { - closeNotification(); - } - LOGFINEST("Decremented subscription region count for endpoint %s to %d", - m_name.c_str(), m_numRegionListener); - if (distMgr != nullptr) { - ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock); - m_distMgrs.remove(distMgr); - } - LOGFINEST("Done unsubscribe for endpoint %s", m_name.c_str()); - } -} - -void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) { - LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str()); - if (!m_connected || m_noOfConnRefs == 0) { - LOGFINER("Skipping ping task for disconnected endpoint %s", m_name.c_str()); - return; - } - - if (!m_msgSent && !m_pingSent) { - TcrMessagePing* pingMsg = - TcrMessage::getPingMessage(m_cacheImpl->getCache()); - TcrMessageReply reply(true, nullptr); - LOGFINEST("Sending ping message to endpoint %s", m_name.c_str()); - GfErrType error; - if (poolDM != nullptr) { - error = poolDM->sendRequestToEP(*pingMsg, reply, this); - } else { - error = send(*pingMsg, reply); - } - LOGFINEST("Sent ping message to endpoint %s with error code %d%s", - m_name.c_str(), error, error == GF_NOERR ? " (no error)" : ""); - if (error == GF_NOERR) { - m_pingSent = true; - } - if (error == GF_TIMOUT && m_pingTimeouts < 2) { - ++m_pingTimeouts; - } else { - m_pingTimeouts = 0; - // Only call setConnectionStatus if the status has changed (non thread - // safe check) - // This is to avoid blocking the ping thread if notification channel takes - // a long time to - // complete causing the server to drop the client in the midst of - // connection establishment. - bool connected = (error == GF_NOERR) - ? (reply.getMessageType() == TcrMessage::REPLY) - : false; - if (m_connected != connected) { - setConnectionStatus(connected); - } - } - LOGFINEST("Completed sending ping message to endpoint %s", m_name.c_str()); - } else { - m_msgSent = false; - m_pingSent = false; - } -} - -bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) { - return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid); -} - -int TcrEndpoint::receiveNotification(volatile bool& isRunning) { - LOGFINE("Started subscription channel for endpoint %s", m_name.c_str()); - while (isRunning) { - TcrMessageReply* msg = nullptr; - try { - size_t dataLen; - ConnErrType opErr = CONN_NOERR; - auto data = m_notifyConnection->receive(&dataLen, &opErr, - std::chrono::seconds(5)); - - if (opErr == CONN_IOERR) { - // Endpoint is disconnected, this exception is expected - LOGFINER( - "IO exception while receiving subscription event for endpoint %d", - opErr); - if (isRunning) { - setConnectionStatus(false); - // close notification channel - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - if (m_numRegionListener > 0) { - m_numRegionListener = 0; - closeNotification(); - } - } - break; - } - - if (data) { - msg = new TcrMessageReply(true, m_baseDM); - msg->initCqMap(); - msg->setData(data, static_cast<int32_t>(dataLen), - this->getDistributedMemberID(), - *(m_cacheImpl->getSerializationRegistry()), - *(m_cacheImpl->getMemberListForVersionStamp())); - handleNotificationStats(static_cast<int64_t>(dataLen)); - LOGDEBUG("receive notification %d", msg->getMessageType()); - - if (!isRunning) { - _GEODE_SAFE_DELETE(msg); - break; - } - - if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) { - LOGFINE("Received ping from server subscription channel."); - } - - // ignore some message types like REGISTER_INSTANTIATORS - if (msg->shouldIgnore()) { - _GEODE_SAFE_DELETE(msg); - continue; - } - - bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER); - if (!msg->hasCqPart()) { - if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) { - const std::string& regionFullPath1 = msg->getRegionName(); - std::shared_ptr<Region> region1; - m_cacheImpl->getRegion(regionFullPath1.c_str(), region1); - if (region1 != nullptr && - !static_cast<ThinClientRegion*>(region1.get()) - ->getDistMgr() - ->isEndpointAttached(this)) { - // drop event before even processing the eventid for duplicate - // checking - LOGFINER("Endpoint %s dropping event for region %s", - m_name.c_str(), regionFullPath1.c_str()); - _GEODE_SAFE_DELETE(msg); - continue; - } - } - } - - if (!checkDupAndAdd(msg->getEventId())) { - m_dupCount++; - if (m_dupCount % 100 == 1) { - LOGFINE("Dropped %dst duplicate notification message", m_dupCount); - } - _GEODE_SAFE_DELETE(msg); - continue; - } - - if (isMarker) { - LOGFINE("Got a marker message on endpont %s", m_name.c_str()); - m_cacheImpl->processMarker(); - processMarker(); - _GEODE_SAFE_DELETE(msg); - } else { - if (!msg->hasCqPart()) // || msg->isInterestListPassed()) - { - const std::string& regionFullPath = msg->getRegionName(); - std::shared_ptr<Region> region; - m_cacheImpl->getRegion(regionFullPath.c_str(), region); - if (region != nullptr) { - static_cast<ThinClientRegion*>(region.get()) - ->receiveNotification(msg); - } else { - LOGWARN( - "Notification for region %s that does not exist in " - "client cacheImpl.", - regionFullPath.c_str()); - } - } else { - LOGDEBUG("receive cq notification %d", msg->getMessageType()); - auto queryService = getQueryService(); - if (queryService != nullptr) { - static_cast<RemoteQueryService*>(queryService.get()) - ->receiveNotification(msg); - } - } - } - } - } catch (const TimeoutException&) { - // If there is no notification, this exception is expected - // But this is valid only when *no* data has been received - // otherwise if data has been read then TcrConnection will throw - // a GeodeIOException which will cause the channel to close. - LOGDEBUG( - "receiveNotification timed out: no data received from " - "endpoint %s", - m_name.c_str()); - } catch (const GeodeIOException& e) { - // Endpoint is disconnected, this exception is expected - LOGFINER( - "IO exception while receiving subscription event for endpoint %s: %s", - m_name.c_str(), e.what()); - if (m_connected) { - setConnectionStatus(false); - // close notification channel - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - if (m_numRegionListener > 0) { - m_numRegionListener = 0; - closeNotification(); - } - } - break; - } catch (const Exception& ex) { - _GEODE_SAFE_DELETE(msg); - LOGERROR( - "Exception while receiving subscription event for endpoint %s:: %s: " - "%s", - m_name.c_str(), ex.getName().c_str(), ex.what()); - } catch (...) { - _GEODE_SAFE_DELETE(msg); - LOGERROR( - "Unexpected exception while " - "receiving subscription event from endpoint %s", - m_name.c_str()); - } - } - LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str()); - return 0; -} - -inline bool TcrEndpoint::compareTransactionIds(int32_t reqTransId, - int32_t replyTransId, - std::string& failReason, - TcrConnection* conn) { - LOGDEBUG("TcrEndpoint::compareTransactionIds requested id = %d ,replied = %d", - reqTransId, replyTransId); - if (replyTransId != reqTransId) { - LOGERROR( - "Transaction ids do not match on endpoint %s for " - "send operation: %d, %d. Possible serialization mismatch", - m_name.c_str(), reqTransId, replyTransId); - closeConnection(conn); - failReason = "mismatch of transaction IDs in operation"; - return false; - } - return true; -} - -inline bool TcrEndpoint::handleIOException(const std::string& message, - TcrConnection*& conn, - bool isBgThread) { - int32_t lastError = ACE_OS::last_error(); - if (lastError == ECONNRESET || lastError == EPIPE) { - _GEODE_SAFE_DELETE(conn); - } else { - closeConnection(conn); - } - LOGFINE( - "IO error during send for endpoint %s " - "[errno: %d: %s]: %s", - m_name.c_str(), lastError, ACE_OS::strerror(lastError), message.c_str()); - // EAGAIN =11, EWOULDBLOCK = 10035L, EPIPE = 32, ECONNRESET =10054L(An - // existing connection was forcibly closed by the remote host.) - if (!(lastError == EAGAIN || lastError == EWOULDBLOCK /*|| - lastError == ECONNRESET */ /*|| lastError == EPIPE*/)) { - // break from enclosing loop without retries - // something wrong try connect in lock - m_needToConnectInLock = true; - return false; - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - return true; -} - -GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request, - TcrMessageReply& reply, - TcrConnection* conn, - std::string& failReason) { - int32_t type = request.getMessageType(); - GfErrType error = GF_NOERR; - - LOGFINER("Sending request type %d to endpoint [%s] via connection [%p]", type, - m_name.c_str(), conn); - // TcrMessage * req = const_cast<TcrMessage *>(&request); - LOGDEBUG("TcrEndpoint::sendRequestConn = %d", m_baseDM); - if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn); - if (((type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION) && - (request.hasResult() & 2))) { - sendRequestForChunkedResponse(request, reply, conn); - } else if (type == TcrMessage::REGISTER_INTEREST_LIST || - type == TcrMessage::REGISTER_INTEREST || - type == TcrMessage::QUERY || - type == TcrMessage::QUERY_WITH_PARAMETERS || - type == TcrMessage::GET_ALL_70 || - type == TcrMessage::GET_ALL_WITH_CALLBACK || - type == TcrMessage::PUTALL || - type == TcrMessage::PUT_ALL_WITH_CALLBACK || - type == TcrMessage::REMOVE_ALL || - ((type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION) && - (request.hasResult() & 2)) || - type == - TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || // This is - // kept - // aside as - // server - // always - // sends - // chunked - // response. - type == TcrMessage::EXECUTECQ_MSG_TYPE || - type == TcrMessage::STOPCQ_MSG_TYPE || - type == TcrMessage::CLOSECQ_MSG_TYPE || - type == TcrMessage::KEY_SET || - type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE || - type == TcrMessage::GETCQSTATS_MSG_TYPE || - type == TcrMessage::MONITORCQ_MSG_TYPE || - type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE || - type == TcrMessage::GETDURABLECQS_MSG_TYPE) { - sendRequestForChunkedResponse(request, reply, conn); - LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE"); - } else { - // Chk request type to request if so request.getCallBackArg flag & setCall - // back arg flag to true, and in response chk for this flag. - if (request.getMessageType() == TcrMessage::REQUEST) { - if (request.isCallBackArguement()) { - reply.setCallBackArguement(true); - } - } - size_t dataLen; - LOGDEBUG("sendRequestConn: calling sendRequest"); - auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(), - &dataLen, request.getTimeout(), - reply.getTimeout(), request.getMessageType()); - reply.setMessageTypeRequest(type); - reply.setData( - data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(), - *(m_cacheImpl->getSerializationRegistry()), - *(m_cacheImpl - ->getMemberListForVersionStamp())); // memory is released by - // TcrMessage setData(). - } - - // reset idle timeout of the connection for pool connection manager - if (type != TcrMessage::PING) { - conn->touch(); - } - - if (reply.getMessageType() == TcrMessage::INVALID) { - if (type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) { - ChunkedFunctionExecutionResponse* resultCollector = - dynamic_cast<ChunkedFunctionExecutionResponse*>( - reply.getChunkedResultHandler()); - if (resultCollector->getResult() == false) { - LOGDEBUG("TcrEndpoint::send: function execution, no response desired"); - // m_opConnections.put( conn, false ); - // return GF_NOERR; - error = GF_NOERR; - } - } else { - // Treat INVALID messages like IO exceptions - error = GF_IOERR; - } - } - // do we need to consider case where compareTransactionIds return true? - // I think we will not have issue here - else if (!compareTransactionIds(request.getTransId(), reply.getTransId(), - failReason, conn)) { - error = GF_NOTCON; - } - if (error == GF_NOERR) { - if (m_baseDM != nullptr) - m_baseDM->afterSendingRequest(request, reply, conn); - } - - return error; -} - -bool TcrEndpoint::isMultiUserMode() { - LOGDEBUG("TcrEndpoint::isMultiUserMode %d", m_isMultiUserMode); - return m_isMultiUserMode; -} - -GfErrType TcrEndpoint::sendRequestWithRetry( - const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn, - bool& epFailure, std::string& failReason, int maxSendRetries, - bool useEPPool, std::chrono::microseconds requestedTimeout, - bool isBgThread) { - GfErrType error = GF_NOTCON; - bool createNewConn = false; - // int32_t type = request.getMessageType(); - int sendRetryCount = 0; - - // Retry on the following send errors: - // Timeout: 1 retry - // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry - // Connection pool is empty (too many threads or no connections available): 1 - // retry - - do { - if (sendRetryCount > 0) { - // this is a retry. set the retry bit in the early Ack - (const_cast<TcrMessage&>(request)).updateHeaderForRetry(); - } - - auto timeout = requestedTimeout; - epFailure = false; - if (useEPPool) { - if (m_maxConnections == 0) { - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); - if (m_maxConnections == 0) { - LOGFINE( - "Creating a new connection when connection-pool-size system " - "property set to 0"); - if ((error = createNewConnection(conn, false, false, - m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectTimeout())) != - GF_NOERR) { - epFailure = true; - continue; - } - m_maxConnections = 1; - } - } - } - LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s", - m_name.c_str()); - if (createNewConn) { - createNewConn = false; - if (!m_connected) { - return GF_NOTCON; - } else if ((error = - createNewConnection(conn, false, false, - m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectTimeout(), - 0, true)) != GF_NOERR) { - epFailure = true; - continue; - } - } else if (conn == nullptr && useEPPool) { - LOGFINER( - "sendRequestWithRetry:: looking for connection in queue timeout = " - "%d ", - timeout.count()); - // max wait time to get a connection - conn = m_opConnections.getUntil(timeout); - } - if (!m_connected) { - return GF_NOTCON; - } - if (conn != nullptr) { - LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s", - m_name.c_str()); - int reqTransId = request.getTransId(); - - try { - LOGDEBUG("Calling sendRequestConn"); - error = sendRequestConn(request, reply, conn, failReason); - if (error == GF_IOERR) { - epFailure = true; - failReason = "received INVALID reply from server"; - if (!handleIOException(failReason, conn, isBgThread)) { - break; - } - createNewConn = true; - } else if (error == GF_NOTCON) { - epFailure = true; - createNewConn = true; - } else { - if (useEPPool) { - m_opConnections.put(conn, false); - } - return GF_NOERR; - } - } catch (const TimeoutException&) { - error = GF_TIMOUT; - LOGFINE( - "Send timed out for endpoint %s. " - "Message txid = %d", - m_name.c_str(), reqTransId); - closeFailedConnection(conn); - /* - if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ //close - connection only when not a sticky connection. - closeConnection( conn ); - }*/ - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - int32_t type = request.getMessageType(); - epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL && - type != TcrMessage::PUT_ALL_WITH_CALLBACK && - type != TcrMessage::EXECUTE_FUNCTION && - type != TcrMessage::EXECUTE_REGION_FUNCTION && - type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP && - type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE); - - // epFailure = true; - failReason = "timed out waiting for endpoint"; - createNewConn = true; - } catch (const GeodeIOException& ex) { - error = GF_IOERR; - epFailure = true; - failReason = "IO error for endpoint"; - if (!handleIOException(ex.what(), conn, - isBgThread)) { // change here - break; - } - createNewConn = true; - } catch (const Exception& ex) { - failReason = ex.getName(); - failReason.append(": "); - failReason.append(ex.what()); - LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(), - failReason.c_str()); - if (compareTransactionIds(reqTransId, reply.getTransId(), failReason, - conn)) { - if (Log::warningEnabled()) { - LOGWARN("Stack trace: %s", ex.getStackTrace().c_str()); - } - error = GF_MSG; - if (useEPPool) { - m_opConnections.put(conn, false); - } else { - // we are here its better to close the connection as - // "compareTransactionIds" - // will not close the connection - closeConnection(conn); - } - break; - } else { - error = GF_NOTCON; - epFailure = true; - createNewConn = true; - } - } catch (...) { - failReason = "unexpected exception"; - LOGERROR( - "Unexpected exception while sending request to " - "endpoint %s", - m_name.c_str()); - if (compareTransactionIds(reqTransId, reply.getTransId(), failReason, - conn)) { - error = GF_MSG; - if (useEPPool) { - m_opConnections.put(conn, false); - } else { - // we are here its better to close the connection as - // "compareTransactionIds" - // will not close the connection - closeConnection(conn); - } - break; - } else { - error = GF_NOTCON; - epFailure = true; - createNewConn = true; - } - } - } else { - if (useEPPool) { - epFailure = true; - failReason = "server connection could not be obtained"; - if (timeout <= std::chrono::microseconds::zero()) { - error = GF_TIMOUT; - LOGWARN( - "No connection available for %ld seconds " - "for endpoint %s.", - requestedTimeout.count(), m_name.c_str()); - } else { - error = GF_NOTCON; - LOGFINE( - "Returning without connection with %d seconds remaining " - "for endpoint %s.", - timeout.count(), m_name.c_str()); - } - } else { - LOGERROR("Unexpected failure while sending request to server."); - GF_DEV_ASSERT("Bug in TcrEndpoint::sendRequestWithRetry()?" ? false - : true); - } - } - } while (++sendRetryCount <= maxSendRetries); - return error; -} - -void TcrEndpoint::setRetry(const TcrMessage& request, int& maxSendRetries) { - int32_t type = request.getMessageType(); - if (type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS || - type == TcrMessage::PUTALL || type == TcrMessage::PUT_ALL_WITH_CALLBACK || - type == TcrMessage::EXECUTE_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION || - type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP || - type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) { - maxSendRetries = 0; - } -} - -GfErrType TcrEndpoint::send(const TcrMessage& request, TcrMessageReply& reply) { - GfErrType error = GF_NOTCON; - - int maxSendRetries = 1; - setRetry(request, maxSendRetries); - - TcrConnection* conn = nullptr; - bool epFailure; - std::string failReason; - // TODO: remove sendRetryCount as parameter. - error = sendRequestWithRetry(request, reply, conn, epFailure, failReason, - maxSendRetries, true, reply.getTimeout()); - - if (error == GF_NOERR) { - m_msgSent = true; - } - - if (error != GF_NOERR && epFailure) { - LOGFINE("Send Giving up for endpoint %s; reason: %s.", m_name.c_str(), - failReason.c_str()); - setConnectionStatus(false); - } - -// Postconditions: -#if GF_DEVEL_ASSERTS == 1 - int opConnectionsSize = m_opConnections.size(); - if (!m_isActiveEndpoint && (opConnectionsSize > 1)) { - LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, 1); - } else if (opConnectionsSize > m_maxConnections) { - LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, - m_maxConnections); - } -#endif - - return error; -} - -GfErrType TcrEndpoint::sendRequestConnWithRetry(const TcrMessage& request, - TcrMessageReply& reply, - TcrConnection*& conn, - bool isBgThread) { - GfErrType error = GF_NOTCON; - - int maxSendRetries = 1; - setRetry(request, maxSendRetries); - - // Retry on the following send errors: - // Timeout: 1 retry - // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry - // Connection pool is empty (too many threads or no connections available): 1 - // retry - bool epFailure; - std::string failReason; - LOGFINE("sendRequestConnWithRetry:: maxSendRetries = %d ", maxSendRetries); - error = sendRequestWithRetry(request, reply, conn, epFailure, failReason, - maxSendRetries, false, reply.getTimeout(), - isBgThread); - if (error == GF_NOERR) { - m_msgSent = true; - } - - if (error != GF_NOERR && epFailure) { - LOGFINE("sendRequestConnWithRetry: Giving up for endpoint %s; reason: %s.", - m_name.c_str(), failReason.c_str()); - setConnectionStatus(false); - } - - return error; -} - -void TcrEndpoint::setConnectionStatus(bool status) { - // : Store the original value of m_isActiveEndpoint. - // This is to try make failover more resilient for the case when - // a foreground operation thread is connecting to an endpoint while - // the notification thread is disconnecting from the same, or vice versa. - // By comparing the original value with the new value we know if - // someone else has changed the status in that duration, and skip - // the change if that is the case. - // Same logic applies for the ping thread. - // Try something like (after the 2.5 patch release): - // bool wasActive = m_isActiveEndpoint; - // Then after taking the lock: - // If ( !wasActive && isActiveEndpoint ) { return; } - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock); - if (m_connected != status) { - bool connected = m_connected; - m_connected = status; - if (connected) { - m_numberOfTimesFailed += 1; - m_isAuthenticated = false; - // disconnected - LOGFINE("Disconnecting from endpoint %s", m_name.c_str()); - closeConnections(); - m_isActiveEndpoint = false; - LOGFINE("Disconnected from endpoint %s", m_name.c_str()); - triggerRedundancyThread(); - } - } -} - -void TcrEndpoint::triggerRedundancyThread() { - m_failoverSema.release(); - m_redundancySema.release(); -} - -void TcrEndpoint::closeConnection(TcrConnection*& conn) { - conn->close(); - m_ports.erase(conn->getPort()); - _GEODE_SAFE_DELETE(conn); -} - -void TcrEndpoint::closeConnections() { - m_opConnections.close(); - m_ports.clear(); - m_maxConnections = m_cacheImpl->getDistributedSystem() - .getSystemProperties() - .connectionPoolSize(); -} - -/* -void TcrEndpoint::sendNotificationCloseMsg() -{ - if (m_notifyConnection != nullptr) { - m_notifyReceiver->stop(); - m_notifyConnection->close(); - } -} -*/ - -void TcrEndpoint::closeNotification() { - LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str()); - m_notifyConnection->close(); - m_notifyReceiver->stopNoblock(); - TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager(); - tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection, - m_notificationCleanupSema); - m_notifyCount++; - m_cleanupSema.release(); - m_isQueueHosted = false; - LOGFINEST( - "Added susbcription channel for deletion and " - "released cleanup semaphore for endpoint %s", - m_name.c_str()); -} - -void TcrEndpoint::stopNoBlock() { - if (m_notifyReceiver != nullptr) { - m_notifyConnection->close(); - m_notifyReceiver->stopNoblock(); - } -} - -void TcrEndpoint::stopNotifyReceiverAndCleanup() { - LOGFINER("Stopping subscription receiver and cleaning up"); - ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock); - - if (m_notifyReceiver != nullptr) { - LOGFINER("Waiting for notification thread..."); - // m_notifyReceiver->stopNoblock(); - m_notifyReceiver->wait(); - bool found = false; - for (std::list<Task<TcrEndpoint>*>::iterator it = - m_notifyReceiverList.begin(); - it != m_notifyReceiverList.end(); it++) { - if (*it == m_notifyReceiver) { - found = true; - break; - } - } - - if (!found) { - _GEODE_SAFE_DELETE(m_notifyReceiver); - _GEODE_SAFE_DELETE(m_notifyConnection); - } - } - - m_numRegionListener = 0; - - if (m_notifyReceiverList.size() > 0) { - LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d", - m_notifyReceiverList.size()); - for (std::list<Task<TcrEndpoint>*>::iterator it = - m_notifyReceiverList.begin(); - it != m_notifyReceiverList.end(); it++) { - LOGFINER( - "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify " - "recievers."); - _GEODE_SAFE_DELETE(*it); - } - } - - if (m_notifyConnectionList.size() > 0) { - LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d", - m_notifyConnectionList.size()); - for (std::list<TcrConnection*>::iterator it = - m_notifyConnectionList.begin(); - it != m_notifyConnectionList.end(); it++) { - LOGFINER( - "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify " - "connections."); - _GEODE_SAFE_DELETE(*it); - } - } -} - -void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus, - int32_t queueSize) { - if (!m_isServerQueueStatusSet) { - m_isServerQueueStatusSet = true; - m_serverQueueStatus = queueStatus; - m_queueSize = queueSize; - } -} - -bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; } -void TcrEndpoint::processMarker() { - m_cacheImpl->tcrConnectionManager().processMarker(); -} - std::shared_ptr<QueryService> TcrEndpoint::getQueryService() { - return m_cacheImpl->getQueryService(true); - } - void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request, - TcrMessageReply& reply, - TcrConnection* conn) { - conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply); - } - void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) { - closeConnection(conn); -} - -} // namespace client -} // namespace geode -} // namespace apache -- To stop receiving notification emails like this one, please contact echobr...@apache.org.