WIP refactoring
Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/a071d0c0 Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/a071d0c0 Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/a071d0c0 Branch: refs/heads/feature/GEODE-2632 Commit: a071d0c060e4f7d58fa85d612f0cdcdbb6002e8d Parents: 0c16858 Author: Kirk Lund <kl...@apache.org> Authored: Wed Apr 5 10:24:23 2017 -0700 Committer: Kirk Lund <kl...@apache.org> Committed: Wed Apr 5 12:49:42 2017 -0700 ---------------------------------------------------------------------- .../sockets/command/ClientCachePutBench.java | 16 +- .../cache/tier/sockets/command/Put65Bench.java | 94 +-- .../geode/internal/cache/CacheServerImpl.java | 2 +- .../geode/internal/cache/tier/Acceptor.java | 2 +- .../cache/tier/sockets/AcceptorImpl.java | 50 +- .../cache/tier/sockets/CacheClientNotifier.java | 589 ++++------------ .../cache/tier/sockets/CacheClientProxy.java | 696 +++++++++---------- .../cache/tier/sockets/ClientHealthMonitor.java | 15 +- .../tier/sockets/ClientUpdateMessageImpl.java | 4 +- .../internal/cache/tier/sockets/HandShake.java | 10 +- .../geode/internal/logging/LogService.java | 10 + .../tier/sockets/AcceptorImplJUnitTest.java | 18 +- .../cache/tier/sockets/AcceptorImplTest.java | 112 +++ .../tier/sockets/ClientConflationDUnitTest.java | 2 +- .../ClientServerForceInvalidateDUnitTest.java | 4 +- .../tier/sockets/ClientServerMiscDUnitTest.java | 8 +- .../cache/tier/sockets/ConflationDUnitTest.java | 4 +- .../cache/tier/sockets/HAInterestTestCase.java | 12 +- .../sockets/HAStartupAndFailoverDUnitTest.java | 4 +- .../sockets/InterestListRecoveryDUnitTest.java | 2 +- .../tier/sockets/RedundancyLevelTestBase.java | 10 +- .../tier/sockets/command/Put65BenchTest.java | 116 ++++ .../sockets/command/Put65RealBenchTest.java | 145 ++++ .../sockets/DurableClientSimpleDUnitTest.java | 14 +- .../tier/sockets/DurableClientTestCase.java | 6 +- .../cache/wan/Simple2CacheServerDUnitTest.java | 6 +- 26 files changed, 959 insertions(+), 992 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java ---------------------------------------------------------------------- diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java index a1cbd81..df51b78 100644 --- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java +++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/ClientCachePutBench.java @@ -14,9 +14,12 @@ */ package org.apache.geode.internal.cache.tier.sockets.command; +import static java.util.concurrent.TimeUnit.MINUTES; import static org.apache.commons.io.FileUtils.*; +import static org.apache.geode.distributed.AbstractLauncher.Status.ONLINE; import static org.apache.geode.test.dunit.NetworkUtils.getIPLiteral; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.*; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; @@ -24,6 +27,7 @@ import org.apache.geode.cache.Region; import org.apache.geode.cache.client.ClientCache; import org.apache.geode.cache.client.ClientCacheFactory; import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.distributed.AbstractLauncher.Status; import org.apache.geode.distributed.ConfigurationProperties; import org.apache.geode.distributed.ServerLauncher; import org.apache.geode.distributed.internal.DistributionConfig; @@ -121,16 +125,22 @@ public class ClientCachePutBench { command.add(ServerLauncher.Command.START.getName()); command.add("server1"); command.add("--server-port=" + this.serverPort); - // command.add("--redirect-output"); + // put65Command.add("--redirect-output"); this.process = new ProcessBuilder(command).directory(this.temporaryFolder.getRoot()).start(); - boolean forever = true; - while (forever) { + boolean sleep = false; + while (sleep) { assertThat(this.process.isAlive()).isTrue(); Thread.sleep(10000); } + ServerLauncher serverLauncher = new ServerLauncher.Builder() + .setWorkingDirectory(this.temporaryFolder.getRoot().getAbsolutePath()).build(); + + await().atMost(2, MINUTES) + .until(() -> assertThat(serverLauncher.status().getStatus()).isEqualTo(ONLINE)); + this.clientCache = new ClientCacheFactory().addPoolServer(getIPLiteral(), this.serverPort).create(); this.region = http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java ---------------------------------------------------------------------- diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java index 6ccd8c3..d393769 100644 --- a/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java +++ b/geode-core/src/jmh/java/org/apache/geode/internal/cache/tier/sockets/command/Put65Bench.java @@ -16,10 +16,8 @@ package org.apache.geode.internal.cache.tier.sockets.command; import static org.apache.geode.SystemFailure.loadEmergencyClasses; import static org.apache.geode.internal.cache.TXManagerImpl.NOTX; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; -import org.apache.geode.SystemFailure; import org.apache.geode.cache.Operation; import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.GemFireCacheImpl; @@ -30,8 +28,6 @@ import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID; import org.apache.geode.internal.cache.tier.sockets.Message; import org.apache.geode.internal.cache.tier.sockets.Part; import org.apache.geode.internal.cache.tier.sockets.ServerConnection; -import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -42,78 +38,82 @@ public class Put65Bench { @State(Scope.Benchmark) public static class ServerConnectionState { - public Command command; + public Command put65Command; public ServerConnection mockServerConnection; - public Message message; + public Message mockMessage; @Setup(Level.Trial) public void setup() throws Exception { loadEmergencyClasses(); - this.command = Put65.getCommand(); + this.put65Command = Put65.getCommand(); - this.mockServerConnection = mock(ServerConnection.class); + this.mockServerConnection = mock(ServerConnection.class, + withSettings().defaultAnswer(CALLS_REAL_METHODS).name("mockServerConnection")); when(this.mockServerConnection.getClientVersion()).thenReturn(Version.CURRENT); - TXManagerImpl txManager = mock(TXManagerImpl.class); - GemFireCacheImpl cache = mock(GemFireCacheImpl.class); - when(cache.getTxManager()).thenReturn(txManager); + GemFireCacheImpl mockCache = mock(GemFireCacheImpl.class, withSettings().name("mockCache")); + when(this.mockServerConnection.getCache()).thenReturn(mockCache); - when(this.mockServerConnection.getCache()).thenReturn(cache); + TXManagerImpl mockTxManager = mock(TXManagerImpl.class, withSettings().name("mockTxManager")); + when(mockCache.getTxManager()).thenReturn(mockTxManager); - CacheServerStats cacheServerStats = mock(CacheServerStats.class); - when(this.mockServerConnection.getCacheServerStats()).thenReturn(cacheServerStats); + CacheServerStats mockCacheServerStats = + mock(CacheServerStats.class, withSettings().name("mockCacheServerStats")); + when(this.mockServerConnection.getCacheServerStats()).thenReturn(mockCacheServerStats); - // .getDistributedMember() - ClientProxyMembershipID mockProxyId = mock(ClientProxyMembershipID.class); + ClientProxyMembershipID mockProxyId = + mock(ClientProxyMembershipID.class, withSettings().name("mockProxyId")); when(this.mockServerConnection.getProxyID()).thenReturn(mockProxyId); - Message errorResponseMessage = mock(Message.class); - when(this.mockServerConnection.getErrorResponseMessage()).thenReturn(errorResponseMessage); + Message mockErrorResponseMessage = + mock(Message.class, withSettings().name("mockErrorResponseMessage")); + when(this.mockServerConnection.getErrorResponseMessage()) + .thenReturn(mockErrorResponseMessage); - Part regionNamePart = mock(Part.class); - when(regionNamePart.getString()).thenReturn("regionNamePart"); + Part mockRegionNamePart = mock(Part.class, withSettings().name("mockRegionNamePart")); + when(mockRegionNamePart.getString()).thenReturn("mockRegionNamePart"); - Part operationPart = mock(Part.class); - when(operationPart.getObject()).thenReturn(Operation.UPDATE); + Part mockOperationPart = mock(Part.class); + when(mockOperationPart.getObject()).thenReturn(Operation.UPDATE); - Part flagsPart = mock(Part.class); - when(flagsPart.getInt()).thenReturn(0); + Part mockFlagsPart = mock(Part.class); + when(mockFlagsPart.getInt()).thenReturn(0); - Part keyPart = mock(Part.class); - when(keyPart.getObject()).thenReturn("keyPart"); - when(keyPart.getStringOrObject()).thenReturn("keyPart"); + Part mockKeyPart = mock(Part.class); + when(mockKeyPart.getObject()).thenReturn("mockKeyPart"); + when(mockKeyPart.getStringOrObject()).thenReturn("mockKeyPart"); - Part isDeltaPart = mock(Part.class); - when(isDeltaPart.getObject()).thenReturn(Boolean.FALSE); + Part mockIsDeltaPart = mock(Part.class); + when(mockIsDeltaPart.getObject()).thenReturn(Boolean.FALSE); - Part valuePart = mock(Part.class); - when(valuePart.getObject()).thenReturn("valuePart"); + Part mockValuePart = mock(Part.class); + when(mockValuePart.getObject()).thenReturn("mockValuePart"); - Part eventPart = mock(Part.class); - when(eventPart.getObject()).thenReturn("eventPart"); + Part mockEventPart = mock(Part.class); + when(mockEventPart.getObject()).thenReturn("mockEventPart"); - Part callbackArgPart = mock(Part.class); - when(callbackArgPart.getObject()).thenReturn("callbackArgPart"); + Part mockCallbackArgPart = mock(Part.class); + when(mockCallbackArgPart.getObject()).thenReturn("mockCallbackArgPart"); - message = mock(Message.class); + mockMessage = mock(Message.class); - when(message.getTransactionId()).thenReturn(NOTX); + when(mockMessage.getTransactionId()).thenReturn(NOTX); - when(message.getPart(0)).thenReturn(regionNamePart); - when(message.getPart(1)).thenReturn(operationPart); - when(message.getPart(2)).thenReturn(flagsPart); - when(message.getPart(3)).thenReturn(keyPart); - when(message.getPart(4)).thenReturn(isDeltaPart); - when(message.getPart(5)).thenReturn(valuePart); - when(message.getPart(6)).thenReturn(eventPart); - when(message.getPart(7)).thenReturn(callbackArgPart); + when(mockMessage.getPart(0)).thenReturn(mockRegionNamePart); + when(mockMessage.getPart(1)).thenReturn(mockOperationPart); + when(mockMessage.getPart(2)).thenReturn(mockFlagsPart); + when(mockMessage.getPart(3)).thenReturn(mockKeyPart); + when(mockMessage.getPart(4)).thenReturn(mockIsDeltaPart); + when(mockMessage.getPart(5)).thenReturn(mockValuePart); + when(mockMessage.getPart(6)).thenReturn(mockEventPart); + when(mockMessage.getPart(7)).thenReturn(mockCallbackArgPart); } } // @Benchmark public void benchmark(ServerConnectionState state, Blackhole blackhole) { - state.command.execute(state.message, state.mockServerConnection); + state.put65Command.execute(state.mockMessage, state.mockServerConnection); // Message replyMessage = state.mockServerConnection.getReplyMessage(); // blackhole.consume(replyMessage); } http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java index a3c4a93..2294fb8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java @@ -317,7 +317,7 @@ public class CacheServerImpl extends AbstractCacheServer implements Distribution getSocketBufferSize(), getMaximumTimeBetweenPings(), this.cache, getMaxConnections(), getMaxThreads(), getMaximumMessageCount(), getMessageTimeToLive(), this.loadMonitor, overflowAttributesList, this.isGatewayReceiver, this.gatewayTransportFilters, - this.tcpNoDelay); + this.tcpNoDelay, this.cache.getCancelCriterion()); this.acceptor.start(); this.advisor.handshake(); http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java index 9a3241b..97dcba5 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/Acceptor.java @@ -25,7 +25,7 @@ import org.apache.geode.internal.Version; * * @since GemFire 2.0.2 */ -public abstract class Acceptor { +public interface Acceptor { // The following are communications "mode" bytes sent as the first byte of a // client/server handshake. They must not be larger than 1 byte http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java index ed29472..47749f8 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/AcceptorImpl.java @@ -57,6 +57,9 @@ import java.util.concurrent.atomic.AtomicInteger; import javax.net.ssl.SSLException; +import org.apache.geode.CancelCriterion; +import org.apache.geode.StatisticsFactory; +import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.logging.log4j.Logger; import org.apache.geode.CancelException; @@ -97,7 +100,7 @@ import org.apache.geode.internal.util.ArrayUtils; * @since GemFire 2.0.2 */ @SuppressWarnings("deprecation") -public class AcceptorImpl extends Acceptor implements Runnable { +public class AcceptorImpl implements Acceptor, Runnable { private static final Logger logger = LogService.getLogger(); private static final boolean isJRockit = System.getProperty("java.vm.name").contains("JRockit"); @@ -283,21 +286,31 @@ public class AcceptorImpl extends Acceptor implements Runnable { * @param internalCache The GemFire cache whose contents is served to clients * @param maxConnections the maximum number of connections allowed in the server pool * @param maxThreads the maximum number of threads allowed in the server pool - * + * + * @param cancelCriterion * @see SocketCreator#createServerSocket(int, int, InetAddress) * @see ClientHealthMonitor * @since GemFire 5.7 */ public AcceptorImpl(int port, String bindHostName, boolean notifyBySubscription, - int socketBufferSize, int maximumTimeBetweenPings, InternalCache internalCache, - int maxConnections, int maxThreads, int maximumMessageCount, int messageTimeToLive, - ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver, - List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay) throws IOException { + int socketBufferSize, int maximumTimeBetweenPings, + InternalCache internalCache, + int maxConnections, int maxThreads, int maximumMessageCount, + int messageTimeToLive, + ConnectionListener listener, List overflowAttributesList, + boolean isGatewayReceiver, + List<GatewayTransportFilter> transportFilter, boolean tcpNoDelay, + final CancelCriterion cancelCriterion) throws IOException { this.bindHostName = calcBindHostName(internalCache, bindHostName); this.connectionListener = listener == null ? new ConnectionListenerAdapter() : listener; this.notifyBySubscription = notifyBySubscription; this.isGatewayReceiver = isGatewayReceiver; this.gatewayTransportFilters = transportFilter; + + this.socketBufferSize = socketBufferSize; + this.cache = internalCache; + this.crHelper = new CachedRegionHelper(this.cache); + { int tmp_maxConnections = maxConnections; if (tmp_maxConnections < MINIMUM_MAX_CONNECTIONS) { @@ -375,12 +388,6 @@ public class AcceptorImpl extends Acceptor implements Runnable { .getSocketCreatorForComponent(SecurableCommunicationChannel.GATEWAY); } - final GemFireCacheImpl gc; - if (getCachedRegionHelper() != null) { - gc = (GemFireCacheImpl) getCachedRegionHelper().getCache(); - } else { - gc = null; - } final int backLog = Integer.getInteger(BACKLOG_PROPERTY_NAME, DEFAULT_BACKLOG).intValue(); final long tilt = System.currentTimeMillis() + 120 * 1000; @@ -422,9 +429,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { Thread.currentThread().interrupt(); } } - if (gc != null) { - gc.getCancelCriterion().checkCancelInProgress(null); - } + cancelCriterion.checkCancelInProgress(null); } // for } // isSelector else { // !isSelector @@ -452,9 +457,7 @@ public class AcceptorImpl extends Acceptor implements Runnable { Thread.currentThread().interrupt(); } } - if (gc != null) { - gc.getCancelCriterion().checkCancelInProgress(null); - } + cancelCriterion.checkCancelInProgress(null); } // for } // !isSelector @@ -485,15 +488,14 @@ public class AcceptorImpl extends Acceptor implements Runnable { } - this.cache = internalCache; - this.crHelper = new CachedRegionHelper(this.cache); + final StatisticsFactory statsFactory = isGatewayReceiver ? + new DummyStatisticsFactory() : this.cache.getDistributedSystem(); - this.clientNotifier = CacheClientNotifier.getInstance(cache, this.stats, maximumMessageCount, - messageTimeToLive, connectionListener, overflowAttributesList, isGatewayReceiver); - this.socketBufferSize = socketBufferSize; + this.clientNotifier = CacheClientNotifier.getInstance(this.cache, this.stats, statsFactory, maximumMessageCount, + messageTimeToLive, this.connectionListener, overflowAttributesList, isGatewayReceiver); // Create the singleton ClientHealthMonitor - this.healthMonitor = ClientHealthMonitor.getInstance(internalCache, maximumTimeBetweenPings, + this.healthMonitor = ClientHealthMonitor.getInstance(this.cache, maximumTimeBetweenPings, this.clientNotifier.getStats()); { http://git-wip-us.apache.org/repos/asf/geode/blob/a071d0c0/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java index 28d6ae2..25142a0 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientNotifier.java @@ -43,6 +43,7 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import org.apache.geode.internal.security.SecurityService; import org.apache.logging.log4j.Logger; import org.apache.shiro.subject.Subject; @@ -79,7 +80,6 @@ import org.apache.geode.distributed.internal.MessageWithReply; import org.apache.geode.distributed.internal.ReplyMessage; import org.apache.geode.distributed.internal.ReplyProcessor21; import org.apache.geode.internal.ClassLoadUtil; -import org.apache.geode.internal.statistics.DummyStatisticsFactory; import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.InternalInstantiator; import org.apache.geode.internal.net.SocketCloser; @@ -113,10 +113,8 @@ import org.apache.geode.internal.cache.tier.Acceptor; import org.apache.geode.internal.cache.tier.MessageType; import org.apache.geode.internal.cache.versions.VersionTag; import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.internal.logging.InternalLogWriter; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LocalizedMessage; -import org.apache.geode.internal.net.SocketCloser; import org.apache.geode.security.AccessControl; import org.apache.geode.security.AuthenticationFailedException; import org.apache.geode.security.AuthenticationRequiredException; @@ -125,32 +123,29 @@ import org.apache.geode.security.AuthenticationRequiredException; * Class <code>CacheClientNotifier</code> works on the server and manages client socket connections * to clients requesting notification of updates and notifies them when updates occur. * - * * @since GemFire 3.2 */ @SuppressWarnings({"synthetic-access", "deprecation"}) public class CacheClientNotifier { private static final Logger logger = LogService.getLogger(); + private static final Logger securityLogger = LogService.getSecurityLogger(); private static volatile CacheClientNotifier ccnSingleton; /** * Factory method to construct a CacheClientNotifier <code>CacheClientNotifier</code> instance. - * - * @param cache The GemFire <code>Cache</code> - * @param acceptorStats - * @param maximumMessageCount - * @param messageTimeToLive - * @param listener - * @param overflowAttributesList - * @return A <code>CacheClientNotifier</code> instance */ - public static synchronized CacheClientNotifier getInstance(Cache cache, - CacheServerStats acceptorStats, int maximumMessageCount, int messageTimeToLive, - ConnectionListener listener, List overflowAttributesList, boolean isGatewayReceiver) { + public static synchronized CacheClientNotifier getInstance(final Cache cache, + final CacheServerStats acceptorStats, + final StatisticsFactory statsFactory, + final int maximumMessageCount, + final int messageTimeToLive, + final ConnectionListener listener, + final List overflowAttributesList, + final boolean isGatewayReceiver) { if (ccnSingleton == null) { - ccnSingleton = new CacheClientNotifier(cache, acceptorStats, maximumMessageCount, - messageTimeToLive, listener, overflowAttributesList, isGatewayReceiver); + ccnSingleton = new CacheClientNotifier(cache, acceptorStats, statsFactory, maximumMessageCount, + messageTimeToLive, listener); } if (!isGatewayReceiver && ccnSingleton.getHaContainer() == null) { @@ -158,13 +153,6 @@ public class CacheClientNotifier { // In this case, the HaContainer should be lazily created here ccnSingleton.initHaContainer(overflowAttributesList); } - // else { - // ccnSingleton.acceptorStats = acceptorStats; - // ccnSingleton.maximumMessageCount = maximumMessageCount; - // ccnSingleton.messageTimeToLive = messageTimeToLive; - // ccnSingleton._connectionListener = listener; - // ccnSingleton.setCache((GemFireCache)cache); - // } return ccnSingleton; } @@ -173,6 +161,53 @@ public class CacheClientNotifier { } /** + * Constructor. + * @param cache The GemFire <code>Cache</code> + * @param acceptorStats + * @param statsFactory + * @param maximumMessageCount + * @param messageTimeToLive + * @param listener a listener which should receive notifications abouts queues being added or + * removed. + */ + private CacheClientNotifier(final Cache cache, + final CacheServerStats acceptorStats, + final StatisticsFactory statsFactory, + final int maximumMessageCount, + final int messageTimeToLive, + final ConnectionListener listener) { + // Set the Cache + this.setCache((GemFireCacheImpl) cache); + this.acceptorStats = acceptorStats; + // we only need one thread per client and wait 50ms for close + this.socketCloser = new SocketCloser(1, 50); + this._connectionListener = listener; + + this.maximumMessageCount = maximumMessageCount; + this.messageTimeToLive = messageTimeToLive; + + this._statistics = new CacheClientNotifierStats(statsFactory); + + try { + this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); + if (this.logFrequency <= 0) { + this.logFrequency = DEFAULT_LOG_FREQUENCY; + } + } catch (Exception e) { + this.logFrequency = DEFAULT_LOG_FREQUENCY; + } + + eventEnqueueWaitTime = + Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); + if (eventEnqueueWaitTime < 0) { + eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; + } + + // Schedule task to periodically ping clients. + scheduleClientPingTask(); + } + + /** * Writes a given message to the output stream * * @param dos the <code>DataOutputStream</code> to use for writing the message @@ -257,32 +292,12 @@ public class CacheClientNotifier { writeMessage(dos, type, ex.toString(), clientVersion); } - // /** - // * Factory method to return the singleton <code>CacheClientNotifier</code> - // * instance. - // * @return the singleton <code>CacheClientNotifier</code> instance - // */ - // public static CacheClientNotifier getInstance() - // { - // return _instance; - // } - - // /** - // * Shuts down the singleton <code>CacheClientNotifier</code> instance. - // */ - // public static void shutdownInstance() - // { - // if (_instance == null) return; - // _instance.shutdown(); - // _instance = null; - // } - /** * Registers a new client updater that wants to receive updates with this server. * * @param socket The socket over which the server communicates with the client. */ - public void registerClient(Socket socket, boolean isPrimary, long acceptorId, + void registerClient(Socket socket, boolean isPrimary, long acceptorId, boolean notifyBySubscription) throws IOException { // Since no remote ports were specified in the message, wait for them. long startTime = this._statistics.startTime(); @@ -329,7 +344,7 @@ public class CacheClientNotifier { } } - protected void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket, + private void registerGFEClient(DataInputStream dis, DataOutputStream dos, Socket socket, boolean isPrimary, long startTime, Version clientVersion, long acceptorId, boolean notifyBySubscription) throws IOException { // Read the ports and throw them away. We no longer need them @@ -382,26 +397,25 @@ public class CacheClientNotifier { // TODO:hitesh Properties credentials = HandShake.readCredentials(dis, dos, system); if (credentials != null && proxy != null) { - if (securityLogWriter.fineEnabled()) { - securityLogWriter - .fine("CacheClientNotifier: verifying credentials for proxyID: " + proxyID); + if (securityLogger.isDebugEnabled()) { + securityLogger + .debug("CacheClientNotifier: verifying credentials for proxyID: {}", proxyID); } Object subject = HandShake.verifyCredentials(authenticator, credentials, - system.getSecurityProperties(), this.logWriter, this.securityLogWriter, member); + system.getSecurityProperties(), system.getLogWriter(), system.getSecurityLogWriter(), member); if (subject instanceof Principal) { Principal principal = (Principal) subject; - if (securityLogWriter.fineEnabled()) { - securityLogWriter - .fine("CacheClientNotifier: successfully verified credentials for proxyID: " - + proxyID + " having principal: " + principal.getName()); + if (securityLogger.isDebugEnabled()) { + securityLogger + .debug("CacheClientNotifier: successfully verified credentials for proxyID: {} having principal: {}", proxyID, principal.getName()); } String postAuthzFactoryName = sysProps.getProperty(SECURITY_CLIENT_ACCESSOR_PP); if (postAuthzFactoryName != null && postAuthzFactoryName.length() > 0) { if (principal == null) { - securityLogWriter.warning( - LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1, - new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID}); + securityLogger.warn( + LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_CACHECLIENTNOTIFIER_POST_PROCESS_AUTHORIZATION_CALLBACK_ENABLED_BUT_AUTHENTICATION_CALLBACK_0_RETURNED_WITH_NULL_CREDENTIALS_FOR_PROXYID_1, + new Object[] {SECURITY_CLIENT_AUTHENTICATOR, proxyID})); } Method authzMethod = ClassLoadUtil.methodFromName(postAuthzFactoryName); authzCallback = (AccessControl) authzMethod.invoke(null, (Object[]) null); @@ -417,15 +431,15 @@ public class CacheClientNotifier { LocalizedStrings.CacheClientNotifier_CLIENTPROXYMEMBERSHIPID_OBJECT_COULD_NOT_BE_CREATED_EXCEPTION_OCCURRED_WAS_0 .toLocalizedString(e)); } catch (AuthenticationRequiredException ex) { - securityLogWriter.warning( - LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, - new Object[] {proxyID, ex}); + securityLogger.warn( + LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, + new Object[] {proxyID, ex})); writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_REQUIRED, ex, clientVersion); return; } catch (AuthenticationFailedException ex) { - securityLogWriter.warning( - LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, - new Object[] {proxyID, ex}); + securityLogger.warn( + LocalizedMessage.create(LocalizedStrings.CacheClientNotifier_AN_EXCEPTION_WAS_THROWN_FOR_CLIENT_0_1, + new Object[] {proxyID, ex})); writeException(dos, HandShake.REPLY_EXCEPTION_AUTHENTICATION_FAILED, ex, clientVersion); return; } catch (CacheException e) { @@ -445,11 +459,9 @@ public class CacheClientNotifier { return; } - this._statistics.endClientRegistration(startTime); } - /** * Registers a new client that wants to receive updates with this server. * @@ -504,7 +516,8 @@ public class CacheClientNotifier { "CacheClientNotifier: No proxy exists for durable client with id {}. It must be created.", proxyId.getDurableId()); } - l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, + l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache, this._cache.getDistributedSystem(), SecurityService + .getSecurityService(), socket, proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); successful = this.initializeProxy(l_proxy); } else { @@ -516,8 +529,8 @@ public class CacheClientNotifier { qSize = proxy.getQueueSize(); // A proxy exists for this durable client. It must be reinitialized. if (l_proxy.isPaused()) { - if (CacheClientProxy.testHook != null) { - CacheClientProxy.testHook.doTestHook("CLIENT_PRE_RECONNECT"); + if (CacheClientProxy.getTestHook() != null) { + CacheClientProxy.getTestHook().doTestHook("CLIENT_PRE_RECONNECT"); } if (l_proxy.lockDrain()) { try { @@ -531,8 +544,8 @@ public class CacheClientNotifier { l_proxy.reinitialize(socket, proxyId, this.getCache(), isPrimary, clientConflation, clientVersion); l_proxy.setMarkerEnqueued(true); - if (CacheClientProxy.testHook != null) { - CacheClientProxy.testHook.doTestHook("CLIENT_RECONNECTED"); + if (CacheClientProxy.getTestHook() != null) { + CacheClientProxy.getTestHook().doTestHook("CLIENT_RECONNECTED"); } } finally { l_proxy.unlockDrain(); @@ -543,8 +556,8 @@ public class CacheClientNotifier { .toLocalizedString(); logger.warn(unsuccessfulMsg); responseByte = HandShake.REPLY_REFUSED; - if (CacheClientProxy.testHook != null) { - CacheClientProxy.testHook.doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED"); + if (CacheClientProxy.getTestHook() != null) { + CacheClientProxy.getTestHook().doTestHook("CLIENT_REJECTED_DUE_TO_CQ_BEING_DRAINED"); } } } else { @@ -582,7 +595,7 @@ public class CacheClientNotifier { if (toCreateNewProxy) { // Create the new proxy for this non-durable client - l_proxy = new CacheClientProxy(this, socket, proxyId, isPrimary, clientConflation, + l_proxy = CacheClientProxy.createCacheClientProxy(this, this._cache, this._cache.getDistributedSystem(), SecurityService.getSecurityService(), socket, proxyId, isPrimary, clientConflation, clientVersion, acceptorId, notifyBySubscription); successful = this.initializeProxy(l_proxy); } @@ -754,10 +767,8 @@ public class CacheClientNotifier { * Unregisters an existing client from this server. * * @param memberId Uniquely identifies the client - * - * */ - public void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) { + void unregisterClient(ClientProxyMembershipID memberId, boolean normalShutdown) { if (logger.isDebugEnabled()) { logger.debug("CacheClientNotifier: Unregistering all clients with member id: {}", memberId); } @@ -781,8 +792,6 @@ public class CacheClientNotifier { /** * The client represented by the proxyId is ready to receive updates. - * - * @param proxyId */ public void readyForEvents(ClientProxyMembershipID proxyId) { CacheClientProxy proxy = getClientProxy(proxyId); @@ -817,7 +826,6 @@ public class CacheClientNotifier { CacheClientNotifier instance = ccnSingleton; if (instance != null) { instance.singletonNotifyClients(event, null); - } } @@ -829,7 +837,6 @@ public class CacheClientNotifier { CacheClientNotifier instance = ccnSingleton; if (instance != null) { instance.singletonNotifyClients(event, cmsg); - } } @@ -839,10 +846,6 @@ public class CacheClientNotifier { FilterInfo filterInfo = event.getLocalFilterInfo(); - // if (_logger.fineEnabled()) { - // _logger.fine("Client dispatcher processing event " + event); - // } - FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile(); if (filterInfo != null) { // if the routing was made using an old profile we need to recompute it @@ -964,10 +967,8 @@ public class CacheClientNotifier { if (filterInfo.filterProcessedLocally) { removeDestroyTokensFromCqResultKeys(event, filterInfo); } - } - private void removeDestroyTokensFromCqResultKeys(InternalCacheEvent event, FilterInfo filterInfo) { FilterProfile regionProfile = ((LocalRegion) event.getRegion()).getFilterProfile(); @@ -986,38 +987,22 @@ public class CacheClientNotifier { } } - /** * delivers the given message to all proxies for routing. The message should already have client * interest established, or override the isClientInterested method to implement its own routing - * - * @param clientMessage */ public static void routeClientMessage(Conflatable clientMessage) { CacheClientNotifier instance = ccnSingleton; if (instance != null) { - instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); // ok - // to - // use - // keySet - // here - // because - // all - // we - // do - // is - // call - // getClientProxy - // with - // these - // keys + // ok to use keySet here because all we do is call getClientProxy with these keys + instance.singletonRouteClientMessage(clientMessage, instance._clientProxies.keySet()); } } /* * this is for server side registration of client queue */ - public static void routeSingleClientMessage(ClientUpdateMessage clientMessage, + static void routeSingleClientMessage(ClientUpdateMessage clientMessage, ClientProxyMembershipID clientProxyMembershipId) { CacheClientNotifier instance = ccnSingleton; if (instance != null) { @@ -1029,8 +1014,8 @@ public class CacheClientNotifier { private void singletonRouteClientMessage(Conflatable conflatable, Collection<ClientProxyMembershipID> filterClients) { - this._cache.getCancelCriterion().checkCancelInProgress(null); // bug #43942 - client notified - // but no p2p distribution + // bug #43942 - client notified but no p2p distribution + this._cache.getCancelCriterion().checkCancelInProgress(null); List<CacheClientProxy> deadProxies = null; for (ClientProxyMembershipID clientId : filterClients) { @@ -1061,7 +1046,8 @@ public class CacheClientNotifier { * processes the given collection of durable and non-durable client identifiers, returning a * collection of non-durable identifiers of clients connected to this VM */ - public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { + Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs) { + // TODO: false is ignored here because true is hardcoded in other method return getProxyIDs(mixedDurableAndNonDurableIDs, false); } @@ -1070,7 +1056,7 @@ public class CacheClientNotifier { * collection of non-durable identifiers of clients connected to this VM. This version can check * for proxies in initialization as well as fully initialized proxies. */ - public Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs, + Set<ClientProxyMembershipID> getProxyIDs(Set mixedDurableAndNonDurableIDs, boolean proxyInInitMode) { Set<ClientProxyMembershipID> result = new HashSet(); for (Object id : mixedDurableAndNonDurableIDs) { @@ -1209,7 +1195,7 @@ public class CacheClientNotifier { * @param operation The operation that occurred (e.g. AFTER_CREATE) * @return whether the <code>CacheClientNotifier</code> supports the input operation */ - protected boolean supportsOperation(EnumListenerEvent operation) { + private boolean supportsOperation(EnumListenerEvent operation) { return operation == EnumListenerEvent.AFTER_CREATE || operation == EnumListenerEvent.AFTER_UPDATE || operation == EnumListenerEvent.AFTER_DESTROY @@ -1219,87 +1205,6 @@ public class CacheClientNotifier { || operation == EnumListenerEvent.AFTER_REGION_INVALIDATE; } - // /** - // * Queues the <code>ClientUpdateMessage</code> to be distributed - // * to interested clients. This method is not being used currently. - // * @param clientMessage The <code>ClientUpdateMessage</code> to be queued - // */ - // protected void notifyClients(final ClientUpdateMessage clientMessage) - // { - // if (USE_SYNCHRONOUS_NOTIFICATION) - // { - // // Execute the method in the same thread as the caller - // deliver(clientMessage); - // } - // else { - // // Obtain an Executor and use it to execute the method in its own thread - // try - // { - // getExecutor().execute(new Runnable() - // { - // public void run() - // { - // deliver(clientMessage); - // } - // } - // ); - // } catch (InterruptedException e) - // { - // _logger.warning("CacheClientNotifier: notifyClients interrupted", e); - // Thread.currentThread().interrupt(); - // } - // } - // } - - // /** - // * Updates the information this <code>CacheClientNotifier</code> maintains - // * for a given edge client. It is invoked when a edge client re-connects to - // * the server. - // * - // * @param clientHost - // * The host on which the client runs (i.e. the host the - // * CacheClientNotifier uses to communicate with the - // * CacheClientUpdater) This is used with the clientPort to uniquely - // * identify the client - // * @param clientPort - // * The port through which the server communicates with the client - // * (i.e. the port the CacheClientNotifier uses to communicate with - // * the CacheClientUpdater) This is used with the clientHost to - // * uniquely identify the client - // * @param remotePort - // * The port through which the client communicates with the server - // * (i.e. the new port the ConnectionImpl uses to communicate with the - // * ServerConnection) - // * @param membershipID - // * Uniquely idenifies the client - // */ - // public void registerClientPort(String clientHost, int clientPort, - // int remotePort, ClientProxyMembershipID membershipID) - // { - // if (_logger.fineEnabled()) - // _logger.fine("CacheClientNotifier: Registering client port: " - // + clientHost + ":" + clientPort + " with remote port " + remotePort - // + " and ID " + membershipID); - // for (Iterator i = getClientProxies().iterator(); i.hasNext();) { - // CacheClientProxy proxy = (CacheClientProxy)i.next(); - // if (_logger.finerEnabled()) - // _logger.finer("CacheClientNotifier: Potential client: " + proxy); - // //if (proxy.representsCacheClientUpdater(clientHost, clientPort)) - // if (proxy.isMember(membershipID)) { - // if (_logger.finerEnabled()) - // _logger - // .finer("CacheClientNotifier: Updating remotePorts since host and port are a match"); - // proxy.addPort(remotePort); - // } - // else { - // if (_logger.finerEnabled()) - // _logger.finer("CacheClientNotifier: Host and port " - // + proxy.getRemoteHostAddress() + ":" + proxy.getRemotePort() - // + " do not match " + clientHost + ":" + clientPort); - // } - // } - // } - /** * Registers client interest in the input region and key. * @@ -1315,7 +1220,8 @@ public class CacheClientNotifier { public void registerClientInterest(String regionName, Object keyOfInterest, ClientProxyMembershipID membershipID, int interestType, boolean isDurable, boolean sendUpdatesAsInvalidates, boolean manageEmptyRegions, int regionDataPolicy, - boolean flushState) throws IOException, RegionDestroyedException { + boolean flushState) + throws IOException, RegionDestroyedException { CacheClientProxy proxy = getClientProxy(membershipID, true); @@ -1350,18 +1256,6 @@ public class CacheClientNotifier { } } - /* - * protected void addFilterRegisteredClients(String regionName, ClientProxyMembershipID - * membershipID) throws RegionNotFoundException { // Update Regions book keeping. LocalRegion - * region = (LocalRegion)this._cache.getRegion(regionName); if (region == null) { //throw new - * AssertionError("Could not find region named '" + regionName + "'"); // @todo: see bug 36805 // - * fix for bug 37979 if (_logger.fineEnabled()) { _logger .fine("CacheClientNotifier: Client " + - * membershipID + " :Throwing RegionDestroyedException as region: " + regionName + - * " is not present."); } throw new RegionDestroyedException("registerInterest failed", - * regionName); } else { region.getFilterProfile().addFilterRegisteredClients(this, membershipID); - * } } - */ - /** * Store region and delta relation * @@ -1457,7 +1351,6 @@ public class CacheClientNotifier { } } - /** * If the conflatable is an instance of HAEventWrapper, and if the corresponding entry is present * in the haContainer, set the reference to the clientUpdateMessage to null and putInProgress flag @@ -1484,9 +1377,6 @@ public class CacheClientNotifier { } } } - // else { - // This is a replay-of-event case. - // } } else { // This wrapper resides in haContainer. wrapper.setClientUpdateMessage(null); @@ -1541,7 +1431,7 @@ public class CacheClientNotifier { * * @return the <code>CacheClientProxy</code> associated to the durableClientId */ - public CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) { + private CacheClientProxy getClientProxy(String durableClientId, boolean proxyInInitMode) { final boolean isDebugEnabled = logger.isDebugEnabled(); final boolean isTraceEnabled = logger.isTraceEnabled(); @@ -1584,46 +1474,10 @@ public class CacheClientNotifier { } /** - * Returns the <code>CacheClientProxySameDS</code> associated to the membershipID * - * - * @return the <code>CacheClientProxy</code> associated to the same distributed system - */ - public CacheClientProxy getClientProxySameDS(ClientProxyMembershipID membershipID) { - final boolean isDebugEnabled = logger.isDebugEnabled(); - if (isDebugEnabled) { - logger.debug("{}::getClientProxySameDS(), Determining client for host {}", this, - membershipID); - logger.debug("{}::getClientProxySameDS(), Number of proxies in the Cache Clinet Notifier: {}", - this, getClientProxies().size()); - /* - * _logger.fine(this + "::getClientProxySameDS(), Proxies in the Cache Clinet Notifier: " + - * getClientProxies()); - */ - } - CacheClientProxy proxy = null; - for (Iterator i = getClientProxies().iterator(); i.hasNext();) { - CacheClientProxy clientProxy = (CacheClientProxy) i.next(); - if (isDebugEnabled) { - logger.debug("CacheClientNotifier: Checking client {}", clientProxy); - } - if (clientProxy.isSameDSMember(membershipID)) { - proxy = clientProxy; - if (isDebugEnabled) { - logger.debug("CacheClientNotifier: {} represents the client running on host {}", proxy, - membershipID); - } - break; - } - } - return proxy; - } - - - /** * It will remove the clients connected to the passed acceptorId. If its the only server, shuts * down this instance. */ - protected synchronized void shutdown(long acceptorId) { + synchronized void shutdown(long acceptorId) { final boolean isDebugEnabled = logger.isDebugEnabled(); if (isDebugEnabled) { logger.debug("At cache server shutdown time, the number of cache servers in the cache is {}", @@ -1685,14 +1539,14 @@ public class CacheClientNotifier { * * @param proxy The <code>CacheClientProxy</code> to add */ - protected void addClientProxy(CacheClientProxy proxy) throws IOException { + private void addClientProxy(CacheClientProxy proxy) throws IOException { // this._logger.info(LocalizedStrings.DEBUG, "adding client proxy " + proxy); getCache(); // ensure cache reference is up to date so firstclient state is correct this._clientProxies.put(proxy.getProxyID(), proxy); // Remove this proxy from the init proxy list. removeClientInitProxy(proxy); this._connectionListener.queueAdded(proxy.getProxyID()); - if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { + if (!(proxy.isClientConflationOn())) { // Delta not supported with conflation ON ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); /* @@ -1704,22 +1558,20 @@ public class CacheClientNotifier { } } this.timedOutDurableClientProxies.remove(proxy.getProxyID()); - } - protected void addClientInitProxy(CacheClientProxy proxy) throws IOException { + private void addClientInitProxy(CacheClientProxy proxy) throws IOException { this._initClientProxies.put(proxy.getProxyID(), proxy); } - protected void removeClientInitProxy(CacheClientProxy proxy) throws IOException { + private void removeClientInitProxy(CacheClientProxy proxy) throws IOException { this._initClientProxies.remove(proxy.getProxyID()); } - protected boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException { + private boolean isProxyInInitializationMode(CacheClientProxy proxy) throws IOException { return this._initClientProxies.containsKey(proxy.getProxyID()); } - /** * Returns (possibly stale) set of memberIds for all clients being actively notified by this * server. @@ -1781,7 +1633,6 @@ public class CacheClientNotifier { * @since GemFire 5.6 */ public boolean hasPrimaryForDurableClient(String durableId) { - for (Iterator iter = this._clientProxies.values().iterator(); iter.hasNext();) { CacheClientProxy proxy = (CacheClientProxy) iter.next(); ClientProxyMembershipID proxyID = proxy.getProxyID(); @@ -1818,7 +1669,9 @@ public class CacheClientNotifier { return ccp.getQueueSizeStat(); } - // closes the cq and drains the queue + /** + * closes the cq and drains the queue + */ public boolean closeClientCq(String durableClientId, String clientCQName) throws CqException { CacheClientProxy proxy = getClientProxy(durableClientId); // close and drain @@ -1828,33 +1681,29 @@ public class CacheClientNotifier { return false; } - /** * Removes an existing <code>CacheClientProxy</code> from the list of known client proxies * * @param proxy The <code>CacheClientProxy</code> to remove */ - protected void removeClientProxy(CacheClientProxy proxy) { - // this._logger.info(LocalizedStrings.DEBUG, "removing client proxy " + proxy, new - // Exception("stack trace")); + void removeClientProxy(CacheClientProxy proxy) { ClientProxyMembershipID client = proxy.getProxyID(); this._clientProxies.remove(client); this._connectionListener.queueRemoved(); ((GemFireCacheImpl) this.getCache()).cleanupForClient(this, client); - if (!(proxy.clientConflation == HandShake.CONFLATION_ON)) { + if (!(proxy.isClientConflationOn())) { ClientHealthMonitor chm = ClientHealthMonitor.getInstance(); if (chm != null) { chm.numOfClientsPerVersion.decrementAndGet(proxy.getVersion().ordinal()); } } - } void durableClientTimedOut(ClientProxyMembershipID client) { this.timedOutDurableClientProxies.add(client); } - public boolean isTimedOut(ClientProxyMembershipID client) { + private boolean isTimedOut(ClientProxyMembershipID client) { return this.timedOutDurableClientProxies.contains(client); } @@ -1868,17 +1717,6 @@ public class CacheClientNotifier { return Collections.unmodifiableCollection(this._clientProxies.values()); } - // /** - // * Returns the <code>Executor</code> that delivers messages to the - // * <code>CacheClientProxy</code> instances. - // * @return the <code>Executor</code> that delivers messages to the - // * <code>CacheClientProxy</code> instances - // */ - // protected Executor getExecutor() - // { - // return _executor; - // } - private void closeAllClientCqs(CacheClientProxy proxy) { CqService cqService = proxy.getCache().getCqService(); if (cqService != null) { @@ -1901,7 +1739,6 @@ public class CacheClientNotifier { /** * Shuts down durable client proxy - * */ public boolean closeDurableClientProxy(String durableClientId) throws CacheException { CacheClientProxy ccp = getClientProxy(durableClientId); @@ -1930,8 +1767,9 @@ public class CacheClientNotifier { final boolean isDebugEnabled = logger.isDebugEnabled(); for (Iterator i = deadProxies.iterator(); i.hasNext();) { CacheClientProxy proxy = (CacheClientProxy) i.next(); - if (isDebugEnabled) + if (isDebugEnabled) { logger.debug("CacheClientNotifier: Closing dead client: {}", proxy); + } // Close the proxy boolean keepProxy = false; @@ -1939,7 +1777,7 @@ public class CacheClientNotifier { keepProxy = proxy.close(false, stoppedNormally); } catch (CancelException e) { throw e; - } catch (Exception e) { + } catch (Exception e) { // TODO: at least log at debug level } // Remove the proxy if necessary. It might not be necessary to remove the @@ -1960,7 +1798,6 @@ public class CacheClientNotifier { } // for } - /** * Registers a new <code>InterestRegistrationListener</code> with the set of * <code>InterestRegistrationListener</code>s. @@ -1999,18 +1836,16 @@ public class CacheClientNotifier { } /** - * * @since GemFire 5.8Beta */ - protected boolean containsInterestRegistrationListeners() { + boolean containsInterestRegistrationListeners() { return !this.writableInterestRegistrationListeners.isEmpty(); } /** - * * @since GemFire 5.8Beta */ - protected void notifyInterestRegistrationListeners(InterestRegistrationEvent event) { + void notifyInterestRegistrationListeners(InterestRegistrationEvent event) { for (Iterator i = this.writableInterestRegistrationListeners.iterator(); i.hasNext();) { InterestRegistrationListener listener = (InterestRegistrationListener) i.next(); if (event.isRegister()) { @@ -2040,8 +1875,6 @@ public class CacheClientNotifier { GemFireCacheImpl cache = GemFireCacheImpl.getInstance(); if (cache != null) { this._cache = cache; - this.logWriter = cache.getInternalLogWriter(); - this.securityLogWriter = cache.getSecurityInternalLogWriter(); } } return this._cache; @@ -2072,68 +1905,6 @@ public class CacheClientNotifier { } /** - * Constructor. - * - * @param cache The GemFire <code>Cache</code> - * @param acceptorStats - * @param maximumMessageCount - * @param messageTimeToLive - * @param listener a listener which should receive notifications abouts queues being added or - * removed. - * @param overflowAttributesList - */ - private CacheClientNotifier(Cache cache, CacheServerStats acceptorStats, int maximumMessageCount, - int messageTimeToLive, ConnectionListener listener, List overflowAttributesList, - boolean isGatewayReceiver) { - // Set the Cache - this.setCache((GemFireCacheImpl) cache); - this.acceptorStats = acceptorStats; - this.socketCloser = new SocketCloser(1, 50); // we only need one thread per client and wait 50ms - // for close - - // Set the LogWriter - this.logWriter = (InternalLogWriter) cache.getLogger(); - - this._connectionListener = listener; - - // Set the security LogWriter - this.securityLogWriter = (InternalLogWriter) cache.getSecurityLogger(); - - this.maximumMessageCount = maximumMessageCount; - this.messageTimeToLive = messageTimeToLive; - - // Initialize the statistics - StatisticsFactory factory; - if (isGatewayReceiver) { - factory = new DummyStatisticsFactory(); - } else { - factory = this.getCache().getDistributedSystem(); - } - this._statistics = new CacheClientNotifierStats(factory); - - // Initialize the executors - // initializeExecutors(this._logger); - - try { - this.logFrequency = Long.valueOf(System.getProperty(MAX_QUEUE_LOG_FREQUENCY)); - if (this.logFrequency <= 0) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - } catch (Exception e) { - this.logFrequency = DEFAULT_LOG_FREQUENCY; - } - - eventEnqueueWaitTime = - Integer.getInteger(EVENT_ENQUEUE_WAIT_TIME_NAME, DEFAULT_EVENT_ENQUEUE_WAIT_TIME); - if (eventEnqueueWaitTime < 0) { - eventEnqueueWaitTime = DEFAULT_EVENT_ENQUEUE_WAIT_TIME; - } - - // Schedule task to periodically ping clients. - scheduleClientPingTask(); - } - - /** * this message is used to send interest registration to another server. Since interest * registration performs a state-flush operation this message must not transmitted on an ordered * socket @@ -2228,104 +1999,6 @@ public class CacheClientNotifier { } - - // * Initializes the <code>QueuedExecutor</code> and - // <code>PooledExecutor</code> - // * used to deliver messages to <code>CacheClientProxy</code> instances. - // * @param logger The GemFire <code>LogWriterI18n</code> - // */ - // private void initializeExecutors(LogWriterI18n logger) - // { - // // Create the thread groups - // final ThreadGroup loggerGroup = LoggingThreadGroup.createThreadGroup("Cache - // Client Notifier Logger Group", logger); - // final ThreadGroup notifierGroup = - // new ThreadGroup("Cache Client Notifier Group") - // { - // public void uncaughtException(Thread t, Throwable e) - // { - // Thread.dumpStack(); - // loggerGroup.uncaughtException(t, e); - // //CacheClientNotifier.exceptionInThreads = true; - // } - // }; - // - // // Originally set ThreadGroup to be a daemon, but it was causing the - // following - // // exception after five minutes of non-activity (the keep alive time of the - // // threads in the PooledExecutor. - // - // // java.lang.IllegalThreadStateException - // // at java.lang.ThreadGroup.add(Unknown Source) - // // at java.lang.Thread.init(Unknown Source) - // // at java.lang.Thread.<init>(Unknown Source) - // // at - // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier$4.newThread(CacheClientNotifier.java:321) - // // at - // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.addThread(PooledExecutor.java:512) - // // at - // org.apache.edu.oswego.cs.dl.util.concurrent.PooledExecutor.execute(PooledExecutor.java:888) - // // at - // org.apache.geode.internal.cache.tier.sockets.CacheClientNotifier.notifyClients(CacheClientNotifier.java:95) - // // at - // org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:271) - // - // //notifierGroup.setDaemon(true); - // - // if (USE_QUEUED_EXECUTOR) - // createQueuedExecutor(notifierGroup); - // else - // createPooledExecutor(notifierGroup); - // } - - // /** - // * Creates the <code>QueuedExecutor</code> used to deliver messages - // * to <code>CacheClientProxy</code> instances - // * @param notifierGroup The <code>ThreadGroup</code> to which the - // * <code>QueuedExecutor</code>'s <code>Threads</code> belong - // */ - // protected void createQueuedExecutor(final ThreadGroup notifierGroup) - // { - // QueuedExecutor queuedExecutor = new QueuedExecutor(new LinkedQueue()); - // queuedExecutor.setThreadFactory(new ThreadFactory() - // { - // public Thread newThread(Runnable command) - // { - // Thread thread = new Thread(notifierGroup, command, "Queued Cache Client - // Notifier"); - // thread.setDaemon(true); - // return thread; - // } - // }); - // _executor = queuedExecutor; - // } - - // /** - // * Creates the <code>PooledExecutor</code> used to deliver messages - // * to <code>CacheClientProxy</code> instances - // * @param notifierGroup The <code>ThreadGroup</code> to which the - // * <code>PooledExecutor</code>'s <code>Threads</code> belong - // */ - // protected void createPooledExecutor(final ThreadGroup notifierGroup) - // { - // PooledExecutor pooledExecutor = new PooledExecutor(new - // BoundedLinkedQueue(4096), 50); - // pooledExecutor.setMinimumPoolSize(10); - // pooledExecutor.setKeepAliveTime(1000 * 60 * 5); - // pooledExecutor.setThreadFactory(new ThreadFactory() - // { - // public Thread newThread(Runnable command) - // { - // Thread thread = new Thread(notifierGroup, command, "Pooled Cache Client - // Notifier"); - // thread.setDaemon(true); - // return thread; - // } - // }); - // pooledExecutor.createThreads(5); - // _executor = pooledExecutor; - // } - protected void deliverInterestChange(ClientProxyMembershipID proxyID, ClientInterestMessageImpl message) { DM dm = ((InternalDistributedSystem) this.getCache().getDistributedSystem()) @@ -2471,23 +2144,6 @@ public class CacheClientNotifier { */ protected static final int ALL_PORTS = -1; - // /** - // * Whether to synchonously deliver messages to proxies. - // * This is currently hard-coded to true to ensure ordering. - // */ - // protected static final boolean USE_SYNCHRONOUS_NOTIFICATION = - // true; - // Boolean.getBoolean("CacheClientNotifier.USE_SYNCHRONOUS_NOTIFICATION"); - - // /** - // * Whether to use the <code>QueuedExecutor</code> (or the - // * <code>PooledExecutor</code>) to deliver messages to proxies. - // * Currently, delivery is synchronous. No <code>Executor</code> is - // * used. - // */ - // protected static final boolean USE_QUEUED_EXECUTOR = - // Boolean.getBoolean("CacheClientNotifier.USE_QUEUED_EXECUTOR"); - /** * The map of known <code>CacheClientProxy</code> instances. Maps ClientProxyMembershipID to * CacheClientProxy. Note that the keys in this map are not updated when a durable client @@ -2512,14 +2168,7 @@ public class CacheClientNotifier { * direct reference to _cache in CacheClientNotifier code. Instead, you should always use * <code>getCache()</code> */ - private GemFireCacheImpl _cache; - - private InternalLogWriter logWriter; - - /** - * The GemFire security <code>LogWriter</code> - */ - private InternalLogWriter securityLogWriter; + private GemFireCacheImpl _cache; // TODO: not thread-safe /** the maximum number of messages that can be enqueued in a client-queue. */ private int maximumMessageCount; @@ -2543,10 +2192,6 @@ public class CacheClientNotifier { */ private volatile HAContainerWrapper haContainer; - // /** - // * The singleton <code>CacheClientNotifier</code> instance - // */ - // protected static CacheClientNotifier _instance; /** * The size of the server-to-client communication socket buffers. This can be modified using the * BridgeServer.SOCKET_BUFFER_SIZE system property.