This is an automated email from the ASF dual-hosted git repository.

echobravo pushed a commit to branch fakeDev2
in repository https://gitbox.apache.org/repos/asf/geode-native.git


The following commit(s) were added to refs/heads/fakeDev2 by this push:
     new df1d211   YABB test
df1d211 is described below

commit df1d211db0c19b048d8db47b69491772bf179c33
Author: Ernest Burghardt <eburgha...@pivotal.io>
AuthorDate: Wed Feb 14 13:39:37 2018 -0700

     YABB test
---
 cppcache/src/TcrEndpoint.cpp | 1353 ------------------------------------------
 1 file changed, 1353 deletions(-)

diff --git a/cppcache/src/TcrEndpoint.cpp b/cppcache/src/TcrEndpoint.cpp
deleted file mode 100644
index c6503fb..0000000
--- a/cppcache/src/TcrEndpoint.cpp
+++ /dev/null
@@ -1,1353 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include <thread>
-#include <chrono>
-#include <ace/OS.h>
-
-#include <geode/SystemProperties.hpp>
-#include <geode/AuthInitialize.hpp>
-
-#include "TcrEndpoint.hpp"
-#include "ThinClientRegion.hpp"
-#include "ThinClientPoolHADM.hpp"
-#include "StackTrace.hpp"
-#include "CacheImpl.hpp"
-#include "Utils.hpp"
-#include "DistributedSystemImpl.hpp"
-#include "util/exception.hpp"
-
-namespace apache {
-namespace geode {
-namespace client {
-
-#define throwException(ex)                              \
-  {                                                     \
-    LOGFINEST("%s: %s", ex.getName(), ex.what()); \
-    throw ex;                                           \
-  }
-/*
-This is replaced by the connect-timeout (times 3) system property for SR # 
6525.
-#define DEFAULT_CALLBACK_CONNECTION_TIMEOUT_SECONDS 180
-*/
-const char* TcrEndpoint::NC_Notification = "NC Notification";
-
-TcrEndpoint::TcrEndpoint(const std::string& name, CacheImpl* cacheImpl,
-                         ACE_Semaphore& failoverSema,
-                         ACE_Semaphore& cleanupSema,
-                         ACE_Semaphore& redundancySema, ThinClientBaseDM* DM,
-                         bool isMultiUserMode)
-    : m_needToConnectInLock(false),
-      m_connectLockCond(m_connectLock),
-      m_maxConnections(cacheImpl->getDistributedSystem()
-                           .getSystemProperties()
-                           .connectionPoolSize()),
-      m_notifyConnection(0),
-      m_notifyReceiver(0),
-      m_numRegionListener(0),
-      m_isQueueHosted(false),
-      m_uniqueId(0),
-      m_isAuthenticated(false),
-      m_msgSent(false),
-      m_pingSent(false),
-      m_numberOfTimesFailed(0),
-      m_isMultiUserMode(isMultiUserMode),
-      m_name(name),
-      m_connected(false),
-      m_isActiveEndpoint(false),
-      m_numRegions(0),
-      m_pingTimeouts(0),
-      m_notifyCount(0),
-      m_cacheImpl(cacheImpl),
-      m_failoverSema(failoverSema),
-      m_cleanupSema(cleanupSema),
-      m_notificationCleanupSema(0),
-      m_redundancySema(redundancySema),
-      m_dupCount(0),
-      m_serverQueueStatus(NON_REDUNDANT_SERVER),
-      m_isServerQueueStatusSet(false),
-      m_queueSize(0),
-      // m_poolHADM( poolHADM ),
-      m_baseDM(DM),
-      m_noOfConnRefs(0),
-      m_distributedMemId(0) {
-  /*
-  m_name = Utils::convertHostToCanonicalForm(m_name.c_str() );
-  */
-}
-
-TcrEndpoint::~TcrEndpoint() {
-  m_connected = false;
-  m_isActiveEndpoint = false;
-  closeConnections();
-  {
-    // force close the notification channel -- see bug #295
-    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-    if (m_numRegionListener > 0) {
-      LOGFINE(
-          "Connection to %s still has references "
-          "to subscription channel while closing",
-          m_name.c_str());
-      // fail in dev build to track #295 better in regressions
-      GF_DEV_ASSERT(m_numRegionListener == 0);
-
-      m_numRegionListener = 0;
-      closeNotification();
-    }
-  }
-  while (m_notifyCount > 0) {
-    LOGDEBUG("TcrEndpoint::~TcrEndpoint(): reducing notify count at %d",
-             m_notifyCount);
-    m_notificationCleanupSema.acquire();
-    m_notifyCount--;
-  }
-  LOGFINE("Connection to %s deleted", m_name.c_str());
-}
-
-inline bool TcrEndpoint::needtoTakeConnectLock() {
-#ifdef __linux
-  if (m_cacheImpl->getDistributedSystem()
-          .getSystemProperties()
-          .connectWaitTimeout() > std::chrono::seconds::zero()) {
-    return m_needToConnectInLock;  // once pipe or other socket error will take
-                                   // lock to connect.
-  }
-  return false;  // once pipe or other socket error will take lock to connect.
-#else
-  return false;
-#endif
-}
-
-GfErrType TcrEndpoint::createNewConnectionWL(
-    TcrConnection*& newConn, bool isClientNotification, bool isSecondary,
-    std::chrono::microseconds connectTimeout) {
-  LOGFINE("TcrEndpoint::createNewConnectionWL");
-  auto connectWaitTimeout = m_cacheImpl->getDistributedSystem()
-                                .getSystemProperties()
-                                .connectWaitTimeout();
-  ACE_Time_Value interval(connectWaitTimeout);
-  ACE_Time_Value stopAt(ACE_OS::gettimeofday());
-  stopAt += interval;
-  bool connCreated = false;
-
-  while (ACE_OS::gettimeofday() < stopAt) {
-    int32_t ret = m_connectLock.acquire(&stopAt);
-
-    LOGFINE(
-        "TcrEndpoint::createNewConnectionWL ret = %d interval = %ld error =%s",
-        ret, interval.get_msec(), ACE_OS::strerror(ACE_OS::last_error()));
-
-    if (ret != -1) {  // got lock
-      try {
-        LOGFINE("TcrEndpoint::createNewConnectionWL got lock");
-        newConn =
-            new TcrConnection(m_cacheImpl->tcrConnectionManager(), 
m_connected);
-        newConn->InitTcrConnection(this, m_name.c_str(), m_ports,
-                                   isClientNotification, isSecondary,
-                                   connectTimeout);
-
-        connCreated = true;  // to break while loop
-
-        m_needToConnectInLock = false;  // no need to take lock
-
-        m_connectLock.release();
-        LOGFINE("New Connection Created");
-        break;
-      } catch (const TimeoutException&) {
-        LOGINFO("Timeout1 in handshake with endpoint[%s]", m_name.c_str());
-        m_connectLock.release();
-        // throw te;
-        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
-      } catch (std::exception& ex) {
-        m_connectLock.release();
-        LOGWARN("Failed1 in handshake with endpoint[%s]: %s", m_name.c_str(),
-                ex.what());
-        // throw ex;
-        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
-      } catch (...) {
-        LOGWARN("Unknown1 failure in handshake with endpoint[%s]",
-                m_name.c_str());
-        m_connectLock.release();
-        // throw;
-        return GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA;
-      }
-    }
-  }
-
-  if (!connCreated) {
-    LOGFINE("TcrEndpoint::createNewConnectionWL timeout");
-    // throwException(TimeoutException("Thread is hanged in connect call"));
-    return GF_CLIENT_WAIT_TIMEOUT;
-  }
-
-  return GF_NOERR;
-}
-
-GfErrType TcrEndpoint::createNewConnection(
-    TcrConnection*& newConn, bool isClientNotification, bool isSecondary,
-    std::chrono::microseconds connectTimeout, int32_t timeoutRetries,
-    bool sendUpdateNotification, bool appThreadRequest) {
-  LOGFINE(
-      "TcrEndpoint::createNewConnection: connectTimeout =%d "
-      "m_needToConnectInLock=%d appThreadRequest =%d",
-      connectTimeout.count(), m_needToConnectInLock, appThreadRequest);
-  GfErrType err = GF_NOERR;
-  newConn = nullptr;
-  while (timeoutRetries-- >= 0) {
-    try {
-      if (newConn == nullptr) {
-        if (!needtoTakeConnectLock() || !appThreadRequest) {
-          newConn = new TcrConnection(m_cacheImpl->tcrConnectionManager(),
-                                      m_connected);
-          bool authenticate = newConn->InitTcrConnection(
-              this, m_name.c_str(), m_ports, isClientNotification, isSecondary,
-              connectTimeout);
-          if (authenticate) {
-            authenticateEndpoint(newConn);
-          }
-        } else {
-          err = createNewConnectionWL(newConn, isClientNotification,
-                                      isSecondary, connectTimeout);
-          if (err == GF_CLIENT_WAIT_TIMEOUT ||
-              err == GF_CLIENT_WAIT_TIMEOUT_REFRESH_PRMETADATA) {
-            break;
-          }
-        }
-        // m_connected = true;
-      }
-      if (!isClientNotification && sendUpdateNotification) {
-        bool notificationStarted;
-        {
-          ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-          notificationStarted = (m_numRegionListener > 0) || m_isQueueHosted;
-        }
-        if (notificationStarted) {
-          LOGFINE("Sending update notification message to endpoint %s",
-                  m_name.c_str());
-          TcrMessageUpdateClientNotification updateNotificationMsg(
-              newConn->getConnectionManager()
-                  .getCacheImpl()
-                  ->getCache()
-                  ->createDataOutput(),
-              static_cast<int32_t>(newConn->getPort()));
-          newConn->send(updateNotificationMsg.getMsgData(),
-                        updateNotificationMsg.getMsgLength());
-        }
-      }
-      err = GF_NOERR;
-      break;
-    } catch (const TimeoutException&) {
-      LOGINFO("Timeout in handshake with endpoint[%s]", m_name.c_str());
-      err = GF_TIMOUT;
-      m_needToConnectInLock = true;  // while creating the connection
-      std::this_thread::sleep_for(std::chrono::milliseconds(50));
-    } catch (const GeodeIOException& ex) {
-      LOGINFO("IO error[%d] in handshake with endpoint[%s]: %s",
-              ACE_OS::last_error(), m_name.c_str(), ex.what());
-      err = GF_IOERR;
-      m_needToConnectInLock = true;  // while creating the connection
-      break;
-    } catch (const AuthenticationFailedException& ex) {
-      LOGWARN("Authentication failed in handshake with endpoint[%s]: %s",
-              m_name.c_str(), ex.what());
-      err = GF_AUTHENTICATION_FAILED_EXCEPTION;
-      break;
-    } catch (const AuthenticationRequiredException& ex) {
-      LOGWARN("Authentication required in handshake with endpoint[%s]: %s",
-              m_name.c_str(), ex.what());
-      err = GF_AUTHENTICATION_REQUIRED_EXCEPTION;
-      break;
-    } catch (const CacheServerException& ex) {
-      LOGWARN("Exception in handshake on server[%s]: %s", m_name.c_str(),
-              ex.what());
-      err = GF_CACHESERVER_EXCEPTION;
-      break;
-    } catch (const Exception& ex) {
-      LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(),
-              ex.what());
-      err = GF_MSG;
-      break;
-    } catch (std::exception& ex) {
-      LOGWARN("Failed in handshake with endpoint[%s]: %s", m_name.c_str(),
-              ex.what());
-      err = GF_MSG;
-      break;
-    } catch (...) {
-      LOGWARN("Unknown failure in handshake with endpoint[%s]", 
m_name.c_str());
-      err = GF_MSG;
-      break;
-    }
-  }
-  if (err != GF_NOERR && newConn != nullptr) {
-    _GEODE_SAFE_DELETE(newConn);
-  }
-  return err;
-}
-
-void TcrEndpoint::authenticateEndpoint(TcrConnection*& conn) {
-  LOGDEBUG(
-      "TcrEndpoint::authenticateEndpoint m_isAuthenticated  = %d "
-      "this->m_baseDM = %d",
-      m_isAuthenticated, m_baseDM);
-  if (!m_isAuthenticated && m_baseDM) {
-    this->setConnected();
-    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_endpointAuthenticationLock);
-    GfErrType err = GF_NOERR;
-    auto creds = getCredentials();
-
-    if (creds != nullptr) {
-      LOGDEBUG("TcrEndpoint::authenticateEndpoint got creds from app = %d",
-               creds->getSize());
-    } else {
-      LOGDEBUG("TcrEndpoint::authenticateEndpoint no creds from app ");
-    }
-
-    TcrMessageUserCredential request(
-        m_cacheImpl->getCache()->createDataOutput(), creds, m_baseDM);
-
-    LOGDEBUG("request is created");
-    TcrMessageReply reply(true, this->m_baseDM);
-    // err = this->sendRequestToEP(request, reply, ( *it ).int_id_);
-    err = this->sendRequestConnWithRetry(request, reply, conn);
-    LOGDEBUG("authenticateEndpoint error = %d", err);
-    if (err == GF_NOERR) {
-      // put the object into local region
-      switch (reply.getMessageType()) {
-        case TcrMessage::RESPONSE: {
-          // nothing to be done;
-          break;
-        }
-        case TcrMessage::EXCEPTION: {
-          err = ThinClientRegion::handleServerException("AuthException",
-                                                        reply.getException());
-          break;
-        }
-        default: {
-          LOGERROR("Unknown message type %d while sending credentials",
-                   reply.getMessageType());
-          err = GF_MSG;
-          break;
-        }
-      }
-    }
-    // throw exception if it is not authenticated
-    GfErrTypeToException("TcrEndpoint::authenticateEndpoint", err);
-
-    m_isAuthenticated = true;
-  }
-}
-std::shared_ptr<Properties> TcrEndpoint::getCredentials() {
-  const auto& distributedSystem = m_cacheImpl->getDistributedSystem();
-  const auto& tmpSecurityProperties =
-      distributedSystem.getSystemProperties().getSecurityProperties();
-
-  if (const auto& authInitialize = m_cacheImpl->getAuthInitialize()) {
-    LOGFINER(
-        "Acquired handle to AuthInitialize plugin, "
-        "getting credentials for %s",
-        m_name.c_str());
-    const auto& tmpAuthIniSecurityProperties =
-        authInitialize->getCredentials(tmpSecurityProperties, m_name.c_str());
-    LOGFINER("Done getting credentials");
-    return tmpAuthIniSecurityProperties;
-  }
-  return nullptr;
-}
-
-ServerQueueStatus TcrEndpoint::getFreshServerQueueStatus(
-    int32_t& queueSize, bool addToQueue, TcrConnection*& statusConn) {
-  GfErrType err = GF_NOERR;
-  TcrConnection* newConn;
-  ServerQueueStatus status = NON_REDUNDANT_SERVER;
-
-  err = createNewConnection(newConn, false, false,
-                            m_cacheImpl->getDistributedSystem()
-                                .getSystemProperties()
-                                .connectTimeout());
-  if (err == GF_NOERR) {
-    status = newConn->getServerQueueStatus(queueSize);
-
-    if (status == REDUNDANT_SERVER || status == PRIMARY_SERVER) {
-      if (addToQueue) {
-        m_opConnections.put(newConn, true);
-      } else {
-        statusConn = newConn;
-      }
-      m_connected = true;
-      return status;
-    } else {
-      //  remove port from ports list (which is sent to server in notification
-      //  handshake).
-      closeConnection(newConn);
-      return status;
-    }
-  }
-
-  return status;
-}
-
-GfErrType TcrEndpoint::registerDM(bool clientNotification, bool isSecondary,
-                                  bool isActiveEndpoint,
-                                  ThinClientBaseDM* distMgr) {
-  // Pre-conditions:
-  // 1. If this is a secondary server then clientNotification must be true
-  GF_DEV_ASSERT(!isSecondary || clientNotification);
-
-  bool connected = false;
-  GfErrType err = GF_NOERR;
-
-  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
-  // Three cases here:
-  // 1. m_connected is false, m_isActiveEndpoint is false and then
-  //    if isActiveEndpoint is true, then create 'max' connections
-  // 2. m_connected is false, m_isActiveEndpoint is false and then
-  //    if isActiveEndpoint is false, then create just one connection
-  //    to ping the server
-  // 3. m_connected is true, m_isActiveEndpoint is false (i.e. server was
-  //    previously not an active endpoint) then if isSecondary is false then
-  //    create 'max-1' connections else do nothing
-  m_opConnections.reset();
-  if (m_maxConnections <= 0) {
-    connected = true;
-  } else if (!m_isActiveEndpoint) {
-    int maxConnections = 0;
-    if (isActiveEndpoint) {
-      if (m_connected) {
-        maxConnections = m_maxConnections - 1;
-      } else {
-        maxConnections = m_maxConnections;
-      }
-    } else if (!m_connected) {
-      maxConnections = 1;
-    }
-    if (maxConnections > 0) {
-      LOGINFO("Starting Handshake with %s%s",
-              (isSecondary ? "secondary server "
-                           : (isActiveEndpoint ? "" : "primary server ")),
-              m_name.c_str());
-      for (int connNum = 0; connNum < maxConnections; ++connNum) {
-        TcrConnection* newConn;
-        if ((err = createNewConnection(newConn, false, false,
-                                       m_cacheImpl->getDistributedSystem()
-                                           .getSystemProperties()
-                                           .connectTimeout(),
-                                       0, m_connected)) != GF_NOERR) {
-          m_connected = false;
-          m_isActiveEndpoint = false;
-          closeConnections();
-          return err;
-        }
-        m_opConnections.put(newConn, true);
-      }
-      LOGINFO("Handshake with %s%s success",
-              (isSecondary ? "secondary server "
-                           : (isActiveEndpoint ? "" : "primary server ")),
-              m_name.c_str());
-      m_connected = true;
-      m_isActiveEndpoint = isActiveEndpoint;
-    }
-  }
-
-  if (m_connected || connected) {
-    if (clientNotification) {
-      if (distMgr != nullptr) {
-        ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock);
-        m_distMgrs.push_back(distMgr);
-      }
-      LOGFINEST(
-          "Registering subscription "
-          "channel for endpoint %s",
-          m_name.c_str());
-      // setup notification channel for the first region
-      ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-      if (m_numRegionListener == 0) {
-        if ((err = createNewConnection(m_notifyConnection, true, isSecondary,
-                                       m_cacheImpl->getDistributedSystem()
-                                               .getSystemProperties()
-                                               .connectTimeout() *
-                                           3,
-                                       0)) != GF_NOERR) {
-          m_connected = false;
-          m_isActiveEndpoint = false;
-          closeConnections();
-          LOGWARN("Failed to start subscription channel for endpoint %s",
-                  m_name.c_str());
-          return err;
-        }
-        m_notifyReceiver = new Task<TcrEndpoint>(
-            this, &TcrEndpoint::receiveNotification, NC_Notification);
-        m_notifyReceiver->start();
-      }
-      ++m_numRegionListener;
-      LOGFINEST("Incremented notification region count for endpoint %s to %d",
-                m_name.c_str(), m_numRegionListener);
-      m_connected = true;
-    }
-  }
-
-  // Post-conditions:
-  // 1. The endpoint should be marked as active, only if m_connected is true
-  // 2. If this is not an active endpoint and it is connected then only one
-  //    connection + notify channel
-  GF_DEV_ASSERT(!m_isActiveEndpoint || m_connected);
-#if GF_DEVEL_ASSERTS == 1
-  int numConnections = m_opConnections.size();
-  if (!m_isActiveEndpoint && !isActiveEndpoint && m_connected &&
-      (numConnections != 1 || m_numRegionListener <= 0 ||
-       m_notifyReceiver == nullptr)) {
-    LOGWARN(
-        "Inactive connected endpoint does not have exactly one "
-        "connection. Number of connections: %d, number of region listeners: "
-        "%d",
-        numConnections, m_numRegionListener);
-  }
-#endif
-
-  return err;
-}
-
-void TcrEndpoint::unregisterDM(bool clientNotification,
-                               ThinClientBaseDM* distMgr,
-                               bool checkQueueHosted) {
-  if (clientNotification) {
-    LOGFINEST(
-        "Closing subscription "
-        "channel for endpoint %s",
-        m_name.c_str());
-    // close notification channel if there is no region
-    ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-    if (m_numRegionListener > 0 && --m_numRegionListener == 0) {
-      closeNotification();
-    }
-    LOGFINEST("Decremented subscription region count for endpoint %s to %d",
-              m_name.c_str(), m_numRegionListener);
-    if (distMgr != nullptr) {
-      ACE_Guard<ACE_Recursive_Thread_Mutex> guardDistMgrs(m_distMgrsLock);
-      m_distMgrs.remove(distMgr);
-    }
-    LOGFINEST("Done unsubscribe for endpoint %s", m_name.c_str());
-  }
-}
-
-void TcrEndpoint::pingServer(ThinClientPoolDM* poolDM) {
-  LOGDEBUG("Sending ping message to endpoint %s", m_name.c_str());
-  if (!m_connected || m_noOfConnRefs == 0) {
-    LOGFINER("Skipping ping task for disconnected endpoint %s", 
m_name.c_str());
-    return;
-  }
-
-  if (!m_msgSent && !m_pingSent) {
-    TcrMessagePing* pingMsg =
-        TcrMessage::getPingMessage(m_cacheImpl->getCache());
-    TcrMessageReply reply(true, nullptr);
-    LOGFINEST("Sending ping message to endpoint %s", m_name.c_str());
-    GfErrType error;
-    if (poolDM != nullptr) {
-      error = poolDM->sendRequestToEP(*pingMsg, reply, this);
-    } else {
-      error = send(*pingMsg, reply);
-    }
-    LOGFINEST("Sent ping message to endpoint %s with error code %d%s",
-              m_name.c_str(), error, error == GF_NOERR ? " (no error)" : "");
-    if (error == GF_NOERR) {
-      m_pingSent = true;
-    }
-    if (error == GF_TIMOUT && m_pingTimeouts < 2) {
-      ++m_pingTimeouts;
-    } else {
-      m_pingTimeouts = 0;
-      //  Only call setConnectionStatus if the status has changed (non thread
-      //  safe check)
-      // This is to avoid blocking the ping thread if notification channel 
takes
-      // a long time to
-      // complete causing the server to drop the client in the midst of
-      // connection establishment.
-      bool connected = (error == GF_NOERR)
-                           ? (reply.getMessageType() == TcrMessage::REPLY)
-                           : false;
-      if (m_connected != connected) {
-        setConnectionStatus(connected);
-      }
-    }
-    LOGFINEST("Completed sending ping message to endpoint %s", m_name.c_str());
-  } else {
-    m_msgSent = false;
-    m_pingSent = false;
-  }
-}
-
-bool TcrEndpoint::checkDupAndAdd(std::shared_ptr<EventId> eventid) {
-  return m_cacheImpl->tcrConnectionManager().checkDupAndAdd(eventid);
-}
-
-int TcrEndpoint::receiveNotification(volatile bool& isRunning) {
-  LOGFINE("Started subscription channel for endpoint %s", m_name.c_str());
-  while (isRunning) {
-    TcrMessageReply* msg = nullptr;
-    try {
-      size_t dataLen;
-      ConnErrType opErr = CONN_NOERR;
-      auto data = m_notifyConnection->receive(&dataLen, &opErr,
-                                              std::chrono::seconds(5));
-
-      if (opErr == CONN_IOERR) {
-        // Endpoint is disconnected, this exception is expected
-        LOGFINER(
-            "IO exception while receiving subscription event for endpoint %d",
-            opErr);
-        if (isRunning) {
-          setConnectionStatus(false);
-          // close notification channel
-          ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-          if (m_numRegionListener > 0) {
-            m_numRegionListener = 0;
-            closeNotification();
-          }
-        }
-        break;
-      }
-
-      if (data) {
-        msg = new TcrMessageReply(true, m_baseDM);
-        msg->initCqMap();
-        msg->setData(data, static_cast<int32_t>(dataLen),
-                     this->getDistributedMemberID(),
-                     *(m_cacheImpl->getSerializationRegistry()),
-                     *(m_cacheImpl->getMemberListForVersionStamp()));
-        handleNotificationStats(static_cast<int64_t>(dataLen));
-        LOGDEBUG("receive notification %d", msg->getMessageType());
-
-        if (!isRunning) {
-          _GEODE_SAFE_DELETE(msg);
-          break;
-        }
-
-        if (msg->getMessageType() == TcrMessage::SERVER_TO_CLIENT_PING) {
-          LOGFINE("Received ping from server subscription channel.");
-        }
-
-        // ignore some message types like REGISTER_INSTANTIATORS
-        if (msg->shouldIgnore()) {
-          _GEODE_SAFE_DELETE(msg);
-          continue;
-        }
-
-        bool isMarker = (msg->getMessageType() == TcrMessage::CLIENT_MARKER);
-        if (!msg->hasCqPart()) {
-          if (msg->getMessageType() != TcrMessage::CLIENT_MARKER) {
-            const std::string& regionFullPath1 = msg->getRegionName();
-            std::shared_ptr<Region> region1;
-            m_cacheImpl->getRegion(regionFullPath1.c_str(), region1);
-            if (region1 != nullptr &&
-                !static_cast<ThinClientRegion*>(region1.get())
-                     ->getDistMgr()
-                     ->isEndpointAttached(this)) {
-              // drop event before even processing the eventid for duplicate
-              // checking
-              LOGFINER("Endpoint %s dropping event for region %s",
-                       m_name.c_str(), regionFullPath1.c_str());
-              _GEODE_SAFE_DELETE(msg);
-              continue;
-            }
-          }
-        }
-
-        if (!checkDupAndAdd(msg->getEventId())) {
-          m_dupCount++;
-          if (m_dupCount % 100 == 1) {
-            LOGFINE("Dropped %dst duplicate notification message", m_dupCount);
-          }
-          _GEODE_SAFE_DELETE(msg);
-          continue;
-        }
-
-        if (isMarker) {
-          LOGFINE("Got a marker message on endpont %s", m_name.c_str());
-          m_cacheImpl->processMarker();
-          processMarker();
-          _GEODE_SAFE_DELETE(msg);
-        } else {
-          if (!msg->hasCqPart())  // || msg->isInterestListPassed())
-          {
-            const std::string& regionFullPath = msg->getRegionName();
-            std::shared_ptr<Region> region;
-            m_cacheImpl->getRegion(regionFullPath.c_str(), region);
-            if (region != nullptr) {
-              static_cast<ThinClientRegion*>(region.get())
-                  ->receiveNotification(msg);
-            } else {
-              LOGWARN(
-                  "Notification for region %s that does not exist in "
-                  "client cacheImpl.",
-                  regionFullPath.c_str());
-            }
-          } else {
-            LOGDEBUG("receive cq notification %d", msg->getMessageType());
-           auto queryService = getQueryService();
-           if (queryService != nullptr) {
-             static_cast<RemoteQueryService*>(queryService.get())
-                 ->receiveNotification(msg);
-           }
-          }
-        }
-      }
-    } catch (const TimeoutException&) {
-      // If there is no notification, this exception is expected
-      // But this is valid only when *no* data has been received
-      // otherwise if data has been read then TcrConnection will throw
-      // a GeodeIOException which will cause the channel to close.
-      LOGDEBUG(
-          "receiveNotification timed out: no data received from "
-          "endpoint %s",
-          m_name.c_str());
-    } catch (const GeodeIOException& e) {
-      // Endpoint is disconnected, this exception is expected
-      LOGFINER(
-          "IO exception while receiving subscription event for endpoint %s: 
%s",
-          m_name.c_str(), e.what());
-      if (m_connected) {
-        setConnectionStatus(false);
-        // close notification channel
-        ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-        if (m_numRegionListener > 0) {
-          m_numRegionListener = 0;
-          closeNotification();
-        }
-      }
-      break;
-    } catch (const Exception& ex) {
-      _GEODE_SAFE_DELETE(msg);
-      LOGERROR(
-          "Exception while receiving subscription event for endpoint %s:: %s: "
-          "%s",
-          m_name.c_str(), ex.getName().c_str(), ex.what());
-    } catch (...) {
-      _GEODE_SAFE_DELETE(msg);
-      LOGERROR(
-          "Unexpected exception while "
-          "receiving subscription event from endpoint %s",
-          m_name.c_str());
-    }
-  }
-  LOGFINE("Ended subscription channel for endpoint %s", m_name.c_str());
-  return 0;
-}
-
-inline bool TcrEndpoint::compareTransactionIds(int32_t reqTransId,
-                                               int32_t replyTransId,
-                                               std::string& failReason,
-                                               TcrConnection* conn) {
-  LOGDEBUG("TcrEndpoint::compareTransactionIds requested id = %d ,replied = 
%d",
-           reqTransId, replyTransId);
-  if (replyTransId != reqTransId) {
-    LOGERROR(
-        "Transaction ids do not match on endpoint %s for "
-        "send operation: %d, %d. Possible serialization mismatch",
-        m_name.c_str(), reqTransId, replyTransId);
-    closeConnection(conn);
-    failReason = "mismatch of transaction IDs in operation";
-    return false;
-  }
-  return true;
-}
-
-inline bool TcrEndpoint::handleIOException(const std::string& message,
-                                           TcrConnection*& conn,
-                                           bool isBgThread) {
-  int32_t lastError = ACE_OS::last_error();
-  if (lastError == ECONNRESET || lastError == EPIPE) {
-    _GEODE_SAFE_DELETE(conn);
-  } else {
-    closeConnection(conn);
-  }
-  LOGFINE(
-      "IO error during send for endpoint %s "
-      "[errno: %d: %s]: %s",
-      m_name.c_str(), lastError, ACE_OS::strerror(lastError), message.c_str());
-  // EAGAIN =11, EWOULDBLOCK = 10035L, EPIPE = 32, ECONNRESET =10054L(An
-  // existing connection was forcibly closed by the remote host.)
-  if (!(lastError == EAGAIN || lastError == EWOULDBLOCK /*||
-        lastError == ECONNRESET */ /*|| lastError == EPIPE*/)) {
-    // break from enclosing loop without retries
-    // something wrong try connect in lock
-    m_needToConnectInLock = true;
-    return false;
-  }
-  std::this_thread::sleep_for(std::chrono::milliseconds(10));
-  return true;
-}
-
-GfErrType TcrEndpoint::sendRequestConn(const TcrMessage& request,
-                                       TcrMessageReply& reply,
-                                       TcrConnection* conn,
-                                       std::string& failReason) {
-  int32_t type = request.getMessageType();
-  GfErrType error = GF_NOERR;
-
-  LOGFINER("Sending request type %d to endpoint [%s] via connection [%p]", 
type,
-           m_name.c_str(), conn);
-  // TcrMessage * req = const_cast<TcrMessage *>(&request);
-  LOGDEBUG("TcrEndpoint::sendRequestConn  = %d", m_baseDM);
-  if (m_baseDM != nullptr) m_baseDM->beforeSendingRequest(request, conn);
-  if (((type == TcrMessage::EXECUTE_FUNCTION ||
-        type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
-       (request.hasResult() & 2))) {
-    sendRequestForChunkedResponse(request, reply, conn);
-  } else if (type == TcrMessage::REGISTER_INTEREST_LIST ||
-             type == TcrMessage::REGISTER_INTEREST ||
-             type == TcrMessage::QUERY ||
-             type == TcrMessage::QUERY_WITH_PARAMETERS ||
-             type == TcrMessage::GET_ALL_70 ||
-             type == TcrMessage::GET_ALL_WITH_CALLBACK ||
-             type == TcrMessage::PUTALL ||
-             type == TcrMessage::PUT_ALL_WITH_CALLBACK ||
-             type == TcrMessage::REMOVE_ALL ||
-             ((type == TcrMessage::EXECUTE_FUNCTION ||
-               type == TcrMessage::EXECUTE_REGION_FUNCTION) &&
-              (request.hasResult() & 2)) ||
-             type ==
-                 TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||  // This is
-                                                                    // kept
-                                                                    // aside as
-                                                                    // server
-                                                                    // always
-                                                                    // sends
-                                                                    // chunked
-                                                                    // 
response.
-             type == TcrMessage::EXECUTECQ_MSG_TYPE ||
-             type == TcrMessage::STOPCQ_MSG_TYPE ||
-             type == TcrMessage::CLOSECQ_MSG_TYPE ||
-             type == TcrMessage::KEY_SET ||
-             type == TcrMessage::CLOSECLIENTCQS_MSG_TYPE ||
-             type == TcrMessage::GETCQSTATS_MSG_TYPE ||
-             type == TcrMessage::MONITORCQ_MSG_TYPE ||
-             type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE ||
-             type == TcrMessage::GETDURABLECQS_MSG_TYPE) {
-    sendRequestForChunkedResponse(request, reply, conn);
-    LOGDEBUG("sendRequestConn: calling sendRequestForChunkedResponse DONE");
-  } else {
-    // Chk request type to request if so request.getCallBackArg flag & setCall
-    // back arg flag to true, and in response chk for this flag.
-    if (request.getMessageType() == TcrMessage::REQUEST) {
-      if (request.isCallBackArguement()) {
-        reply.setCallBackArguement(true);
-      }
-    }
-    size_t dataLen;
-    LOGDEBUG("sendRequestConn: calling sendRequest");
-    auto data = conn->sendRequest(request.getMsgData(), request.getMsgLength(),
-                                  &dataLen, request.getTimeout(),
-                                  reply.getTimeout(), 
request.getMessageType());
-    reply.setMessageTypeRequest(type);
-    reply.setData(
-        data, static_cast<int32_t>(dataLen), this->getDistributedMemberID(),
-        *(m_cacheImpl->getSerializationRegistry()),
-        *(m_cacheImpl
-              ->getMemberListForVersionStamp()));  // memory is released by
-                                                   // TcrMessage setData().
-  }
-
-  // reset idle timeout of the connection for pool connection manager
-  if (type != TcrMessage::PING) {
-    conn->touch();
-  }
-
-  if (reply.getMessageType() == TcrMessage::INVALID) {
-    if (type == TcrMessage::EXECUTE_FUNCTION ||
-        type == TcrMessage::EXECUTE_REGION_FUNCTION ||
-        type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP) {
-      ChunkedFunctionExecutionResponse* resultCollector =
-          dynamic_cast<ChunkedFunctionExecutionResponse*>(
-              reply.getChunkedResultHandler());
-      if (resultCollector->getResult() == false) {
-        LOGDEBUG("TcrEndpoint::send: function execution, no response desired");
-        //            m_opConnections.put( conn, false );
-        //  return GF_NOERR;
-        error = GF_NOERR;
-      }
-    } else {
-      // Treat INVALID messages like IO exceptions
-      error = GF_IOERR;
-    }
-  }
-  // do we need to consider case where compareTransactionIds return true?
-  // I think we will not have issue here
-  else if (!compareTransactionIds(request.getTransId(), reply.getTransId(),
-                                  failReason, conn)) {
-    error = GF_NOTCON;
-  }
-  if (error == GF_NOERR) {
-    if (m_baseDM != nullptr)
-      m_baseDM->afterSendingRequest(request, reply, conn);
-  }
-
-  return error;
-}
-
-bool TcrEndpoint::isMultiUserMode() {
-  LOGDEBUG("TcrEndpoint::isMultiUserMode %d", m_isMultiUserMode);
-  return m_isMultiUserMode;
-}
-
-GfErrType TcrEndpoint::sendRequestWithRetry(
-    const TcrMessage& request, TcrMessageReply& reply, TcrConnection*& conn,
-    bool& epFailure, std::string& failReason, int maxSendRetries,
-    bool useEPPool, std::chrono::microseconds requestedTimeout,
-    bool isBgThread) {
-  GfErrType error = GF_NOTCON;
-  bool createNewConn = false;
-  // int32_t type = request.getMessageType();
-  int sendRetryCount = 0;
-
-  //  Retry on the following send errors:
-  // Timeout: 1 retry
-  // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
-  // Connection pool is empty (too many threads or no connections available): 1
-  // retry
-
-  do {
-    if (sendRetryCount > 0) {
-      // this is a retry. set the retry bit in the early Ack
-      (const_cast<TcrMessage&>(request)).updateHeaderForRetry();
-    }
-
-    auto timeout = requestedTimeout;
-    epFailure = false;
-    if (useEPPool) {
-      if (m_maxConnections == 0) {
-        ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
-        if (m_maxConnections == 0) {
-          LOGFINE(
-              "Creating a new connection when connection-pool-size system "
-              "property set to 0");
-          if ((error = createNewConnection(conn, false, false,
-                                           m_cacheImpl->getDistributedSystem()
-                                               .getSystemProperties()
-                                               .connectTimeout())) !=
-              GF_NOERR) {
-            epFailure = true;
-            continue;
-          }
-          m_maxConnections = 1;
-        }
-      }
-    }
-    LOGDEBUG("TcrEndpoint::send() getting a connection for endpoint %s",
-             m_name.c_str());
-    if (createNewConn) {
-      createNewConn = false;
-      if (!m_connected) {
-        return GF_NOTCON;
-      } else if ((error =
-                      createNewConnection(conn, false, false,
-                                          m_cacheImpl->getDistributedSystem()
-                                              .getSystemProperties()
-                                              .connectTimeout(),
-                                          0, true)) != GF_NOERR) {
-        epFailure = true;
-        continue;
-      }
-    } else if (conn == nullptr && useEPPool) {
-      LOGFINER(
-          "sendRequestWithRetry:: looking for connection in queue timeout = "
-          "%d ",
-          timeout.count());
-      // max wait time to get a connection
-      conn = m_opConnections.getUntil(timeout);
-    }
-    if (!m_connected) {
-      return GF_NOTCON;
-    }
-    if (conn != nullptr) {
-      LOGDEBUG("TcrEndpoint::send() obtained a connection for endpoint %s",
-               m_name.c_str());
-      int reqTransId = request.getTransId();
-
-      try {
-        LOGDEBUG("Calling sendRequestConn");
-        error = sendRequestConn(request, reply, conn, failReason);
-        if (error == GF_IOERR) {
-          epFailure = true;
-          failReason = "received INVALID reply from server";
-          if (!handleIOException(failReason, conn, isBgThread)) {
-            break;
-          }
-          createNewConn = true;
-        } else if (error == GF_NOTCON) {
-          epFailure = true;
-          createNewConn = true;
-        } else {
-          if (useEPPool) {
-            m_opConnections.put(conn, false);
-          }
-          return GF_NOERR;
-        }
-      } catch (const TimeoutException&) {
-        error = GF_TIMOUT;
-        LOGFINE(
-            "Send timed out for endpoint %s. "
-            "Message txid = %d",
-            m_name.c_str(), reqTransId);
-        closeFailedConnection(conn);
-        /*
-        if ( !(m_poolHADM && m_poolHADM->getThreadLocalConnections()) ){ 
//close
-        connection only when not a sticky connection.
-          closeConnection( conn );
-        }*/
-        std::this_thread::sleep_for(std::chrono::milliseconds(10));
-        int32_t type = request.getMessageType();
-        epFailure = (type != TcrMessage::QUERY && type != TcrMessage::PUTALL &&
-                     type != TcrMessage::PUT_ALL_WITH_CALLBACK &&
-                     type != TcrMessage::EXECUTE_FUNCTION &&
-                     type != TcrMessage::EXECUTE_REGION_FUNCTION &&
-                     type != TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP &&
-                     type != TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE);
-
-        // epFailure = true;
-        failReason = "timed out waiting for endpoint";
-        createNewConn = true;
-      } catch (const GeodeIOException& ex) {
-        error = GF_IOERR;
-        epFailure = true;
-        failReason = "IO error for endpoint";
-        if (!handleIOException(ex.what(), conn,
-                               isBgThread)) {  // change here
-          break;
-        }
-        createNewConn = true;
-      } catch (const Exception& ex) {
-        failReason = ex.getName();
-        failReason.append(": ");
-        failReason.append(ex.what());
-        LOGWARN("Error during send for endpoint %s due to %s", m_name.c_str(),
-                failReason.c_str());
-        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
-                                  conn)) {
-          if (Log::warningEnabled()) {
-            LOGWARN("Stack trace: %s", ex.getStackTrace().c_str());
-          }
-          error = GF_MSG;
-          if (useEPPool) {
-            m_opConnections.put(conn, false);
-          } else {
-            // we are here its better to close the connection as
-            // "compareTransactionIds"
-            // will not close the connection
-            closeConnection(conn);
-          }
-          break;
-        } else {
-          error = GF_NOTCON;
-          epFailure = true;
-          createNewConn = true;
-        }
-      } catch (...) {
-        failReason = "unexpected exception";
-        LOGERROR(
-            "Unexpected exception while sending request to "
-            "endpoint %s",
-            m_name.c_str());
-        if (compareTransactionIds(reqTransId, reply.getTransId(), failReason,
-                                  conn)) {
-          error = GF_MSG;
-          if (useEPPool) {
-            m_opConnections.put(conn, false);
-          } else {
-            // we are here its better to close the connection as
-            // "compareTransactionIds"
-            // will not close the connection
-            closeConnection(conn);
-          }
-          break;
-        } else {
-          error = GF_NOTCON;
-          epFailure = true;
-          createNewConn = true;
-        }
-      }
-    } else {
-      if (useEPPool) {
-        epFailure = true;
-        failReason = "server connection could not be obtained";
-        if (timeout <= std::chrono::microseconds::zero()) {
-          error = GF_TIMOUT;
-          LOGWARN(
-              "No connection available for %ld seconds "
-              "for endpoint %s.",
-              requestedTimeout.count(), m_name.c_str());
-        } else {
-          error = GF_NOTCON;
-          LOGFINE(
-              "Returning without connection with %d seconds remaining "
-              "for endpoint %s.",
-              timeout.count(), m_name.c_str());
-        }
-      } else {
-        LOGERROR("Unexpected failure while sending request to server.");
-        GF_DEV_ASSERT("Bug in TcrEndpoint::sendRequestWithRetry()?" ? false
-                                                                    : true);
-      }
-    }
-  } while (++sendRetryCount <= maxSendRetries);
-  return error;
-}
-
-void TcrEndpoint::setRetry(const TcrMessage& request, int& maxSendRetries) {
-  int32_t type = request.getMessageType();
-  if (type == TcrMessage::QUERY || type == TcrMessage::QUERY_WITH_PARAMETERS ||
-      type == TcrMessage::PUTALL || type == TcrMessage::PUT_ALL_WITH_CALLBACK 
||
-      type == TcrMessage::EXECUTE_FUNCTION ||
-      type == TcrMessage::EXECUTE_REGION_FUNCTION ||
-      type == TcrMessage::EXECUTE_REGION_FUNCTION_SINGLE_HOP ||
-      type == TcrMessage::EXECUTECQ_WITH_IR_MSG_TYPE) {
-    maxSendRetries = 0;
-  }
-}
-
-GfErrType TcrEndpoint::send(const TcrMessage& request, TcrMessageReply& reply) 
{
-  GfErrType error = GF_NOTCON;
-
-  int maxSendRetries = 1;
-  setRetry(request, maxSendRetries);
-
-  TcrConnection* conn = nullptr;
-  bool epFailure;
-  std::string failReason;
-  //  TODO: remove sendRetryCount as parameter.
-  error = sendRequestWithRetry(request, reply, conn, epFailure, failReason,
-                               maxSendRetries, true, reply.getTimeout());
-
-  if (error == GF_NOERR) {
-    m_msgSent = true;
-  }
-
-  if (error != GF_NOERR && epFailure) {
-    LOGFINE("Send Giving up for endpoint %s; reason: %s.", m_name.c_str(),
-            failReason.c_str());
-    setConnectionStatus(false);
-  }
-
-// Postconditions:
-#if GF_DEVEL_ASSERTS == 1
-  int opConnectionsSize = m_opConnections.size();
-  if (!m_isActiveEndpoint && (opConnectionsSize > 1)) {
-    LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize, 
1);
-  } else if (opConnectionsSize > m_maxConnections) {
-    LOGWARN("Connections size = %d, expected maximum %d", opConnectionsSize,
-            m_maxConnections);
-  }
-#endif
-
-  return error;
-}
-
-GfErrType TcrEndpoint::sendRequestConnWithRetry(const TcrMessage& request,
-                                                TcrMessageReply& reply,
-                                                TcrConnection*& conn,
-                                                bool isBgThread) {
-  GfErrType error = GF_NOTCON;
-
-  int maxSendRetries = 1;
-  setRetry(request, maxSendRetries);
-
-  //  Retry on the following send errors:
-  // Timeout: 1 retry
-  // EAGAIN, ECONNRESET, EWOULDBLOCK: 1 retry
-  // Connection pool is empty (too many threads or no connections available): 1
-  // retry
-  bool epFailure;
-  std::string failReason;
-  LOGFINE("sendRequestConnWithRetry:: maxSendRetries = %d ", maxSendRetries);
-  error = sendRequestWithRetry(request, reply, conn, epFailure, failReason,
-                               maxSendRetries, false, reply.getTimeout(),
-                               isBgThread);
-  if (error == GF_NOERR) {
-    m_msgSent = true;
-  }
-
-  if (error != GF_NOERR && epFailure) {
-    LOGFINE("sendRequestConnWithRetry: Giving up for endpoint %s; reason: %s.",
-            m_name.c_str(), failReason.c_str());
-    setConnectionStatus(false);
-  }
-
-  return error;
-}
-
-void TcrEndpoint::setConnectionStatus(bool status) {
-  // : Store the original value of m_isActiveEndpoint.
-  // This is to try make failover more resilient for the case when
-  // a foreground operation thread is connecting to an endpoint while
-  // the notification thread is disconnecting from the same, or vice versa.
-  // By comparing the original value with the new value we know if
-  // someone else has changed the status in that duration, and skip
-  // the change if that is the case.
-  // Same logic applies for the ping thread.
-  // Try something like (after the 2.5 patch release):
-  // bool wasActive = m_isActiveEndpoint;
-  // Then after taking the lock:
-  // If ( !wasActive && isActiveEndpoint ) { return; }
-  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_connectionLock);
-  if (m_connected != status) {
-    bool connected = m_connected;
-    m_connected = status;
-    if (connected) {
-      m_numberOfTimesFailed += 1;
-      m_isAuthenticated = false;
-      // disconnected
-      LOGFINE("Disconnecting from endpoint %s", m_name.c_str());
-      closeConnections();
-      m_isActiveEndpoint = false;
-      LOGFINE("Disconnected from endpoint %s", m_name.c_str());
-      triggerRedundancyThread();
-    }
-  }
-}
-
-void TcrEndpoint::triggerRedundancyThread() {
-  m_failoverSema.release();
-  m_redundancySema.release();
-}
-
-void TcrEndpoint::closeConnection(TcrConnection*& conn) {
-  conn->close();
-  m_ports.erase(conn->getPort());
-  _GEODE_SAFE_DELETE(conn);
-}
-
-void TcrEndpoint::closeConnections() {
-  m_opConnections.close();
-  m_ports.clear();
-  m_maxConnections = m_cacheImpl->getDistributedSystem()
-                         .getSystemProperties()
-                         .connectionPoolSize();
-}
-
-/*
-void TcrEndpoint::sendNotificationCloseMsg()
-{
-  if (m_notifyConnection != nullptr) {
-    m_notifyReceiver->stop();
-    m_notifyConnection->close();
-  }
-}
-*/
-
-void TcrEndpoint::closeNotification() {
-  LOGFINEST("Closing subscription channel for endpoint %s", m_name.c_str());
-  m_notifyConnection->close();
-  m_notifyReceiver->stopNoblock();
-  TcrConnectionManager& tccm = m_cacheImpl->tcrConnectionManager();
-  tccm.addNotificationForDeletion(m_notifyReceiver, m_notifyConnection,
-                                  m_notificationCleanupSema);
-  m_notifyCount++;
-  m_cleanupSema.release();
-  m_isQueueHosted = false;
-  LOGFINEST(
-      "Added susbcription channel for deletion and "
-      "released cleanup semaphore for endpoint %s",
-      m_name.c_str());
-}
-
-void TcrEndpoint::stopNoBlock() {
-  if (m_notifyReceiver != nullptr) {
-    m_notifyConnection->close();
-    m_notifyReceiver->stopNoblock();
-  }
-}
-
-void TcrEndpoint::stopNotifyReceiverAndCleanup() {
-  LOGFINER("Stopping subscription receiver and cleaning up");
-  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(m_notifyReceiverLock);
-
-  if (m_notifyReceiver != nullptr) {
-    LOGFINER("Waiting for notification thread...");
-    // m_notifyReceiver->stopNoblock();
-    m_notifyReceiver->wait();
-    bool found = false;
-    for (std::list<Task<TcrEndpoint>*>::iterator it =
-             m_notifyReceiverList.begin();
-         it != m_notifyReceiverList.end(); it++) {
-      if (*it == m_notifyReceiver) {
-        found = true;
-        break;
-      }
-    }
-
-    if (!found) {
-      _GEODE_SAFE_DELETE(m_notifyReceiver);
-      _GEODE_SAFE_DELETE(m_notifyConnection);
-    }
-  }
-
-  m_numRegionListener = 0;
-
-  if (m_notifyReceiverList.size() > 0) {
-    LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d",
-             m_notifyReceiverList.size());
-    for (std::list<Task<TcrEndpoint>*>::iterator it =
-             m_notifyReceiverList.begin();
-         it != m_notifyReceiverList.end(); it++) {
-      LOGFINER(
-          "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify "
-          "recievers.");
-      _GEODE_SAFE_DELETE(*it);
-    }
-  }
-
-  if (m_notifyConnectionList.size() > 0) {
-    LOGFINER("TcrEndpoint::stopNotifyReceiverAndCleanup: notifylist size = %d",
-             m_notifyConnectionList.size());
-    for (std::list<TcrConnection*>::iterator it =
-             m_notifyConnectionList.begin();
-         it != m_notifyConnectionList.end(); it++) {
-      LOGFINER(
-          "TcrEndpoint::stopNotifyReceiverAndCleanup: deleting old notify "
-          "connections.");
-      _GEODE_SAFE_DELETE(*it);
-    }
-  }
-}
-
-void TcrEndpoint::setServerQueueStatus(ServerQueueStatus queueStatus,
-                                       int32_t queueSize) {
-  if (!m_isServerQueueStatusSet) {
-    m_isServerQueueStatusSet = true;
-    m_serverQueueStatus = queueStatus;
-    m_queueSize = queueSize;
-  }
-}
-
-bool TcrEndpoint::isQueueHosted() { return m_isQueueHosted; }
-void TcrEndpoint::processMarker() {
-  m_cacheImpl->tcrConnectionManager().processMarker();
-}
- std::shared_ptr<QueryService> TcrEndpoint::getQueryService() {
-   return m_cacheImpl->getQueryService(true);
- }
- void TcrEndpoint::sendRequestForChunkedResponse(const TcrMessage& request,
-                                                 TcrMessageReply& reply,
-                                                 TcrConnection* conn) {
-   conn->sendRequestForChunkedResponse(request, request.getMsgLength(), reply);
- }
- void TcrEndpoint::closeFailedConnection(TcrConnection*& conn) {
-   closeConnection(conn);
-}
-
-}  // namespace client
-}  // namespace geode
-}  // namespace apache

-- 
To stop receiving notification emails like this one, please contact
echobr...@apache.org.

Reply via email to