http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BridgeServerImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BridgeServerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BridgeServerImpl.java deleted file mode 100644 index f7fa15f..0000000 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BridgeServerImpl.java +++ /dev/null @@ -1,816 +0,0 @@ -/*========================================================================= - * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. - * This product is protected by U.S. and international copyright - * and intellectual property laws. Pivotal products are covered by - * one or more patents listed at http://www.pivotal.io/patents. - *========================================================================= - */ -package com.gemstone.gemfire.internal.cache; - -import java.io.File; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.logging.log4j.Logger; - -import com.gemstone.gemfire.CancelCriterion; -import com.gemstone.gemfire.GemFireIOException; -import com.gemstone.gemfire.InternalGemFireError; -import com.gemstone.gemfire.InvalidValueException; -import com.gemstone.gemfire.cache.AttributesFactory; -import com.gemstone.gemfire.cache.ClientSession; -import com.gemstone.gemfire.cache.DataPolicy; -import com.gemstone.gemfire.cache.DiskStore; -import com.gemstone.gemfire.cache.DiskStoreFactory; -import com.gemstone.gemfire.cache.DynamicRegionFactory; -import com.gemstone.gemfire.cache.EvictionAction; -import com.gemstone.gemfire.cache.ExpirationAction; -import com.gemstone.gemfire.cache.ExpirationAttributes; -import com.gemstone.gemfire.cache.InterestRegistrationListener; -import com.gemstone.gemfire.cache.RegionAttributes; -import com.gemstone.gemfire.cache.RegionExistsException; -import com.gemstone.gemfire.cache.Scope; -import com.gemstone.gemfire.cache.server.CacheServer; -import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; -import com.gemstone.gemfire.cache.server.ServerLoadProbe; -import com.gemstone.gemfire.cache.server.internal.LoadMonitor; -import com.gemstone.gemfire.cache.util.BridgeMembership; -import com.gemstone.gemfire.cache.util.BridgeMembershipListener; -import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; -import com.gemstone.gemfire.distributed.DistributedMember; -import com.gemstone.gemfire.distributed.internal.DM; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisee; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; -import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; -import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; -import com.gemstone.gemfire.distributed.internal.ResourceEvent; -import com.gemstone.gemfire.distributed.internal.ServerLocation; -import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes; -import com.gemstone.gemfire.internal.Assert; -import com.gemstone.gemfire.internal.OSProcess; -import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion; -import com.gemstone.gemfire.internal.cache.BridgeServerAdvisor.BridgeServerProfile; -import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; -import com.gemstone.gemfire.internal.cache.tier.Acceptor; -import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; -import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; -import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; -import com.gemstone.gemfire.internal.i18n.LocalizedStrings; -import com.gemstone.gemfire.internal.logging.LogService; -import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; - -/** - * An implementation of the <code>CacheServer</code> interface that delegates - * most of the heavy lifting to an {@link Acceptor}. - * - * @author David Whitlock - * @since 4.0 - */ -@SuppressWarnings("deprecation") -public class BridgeServerImpl - extends AbstractBridgeServer - implements DistributionAdvisee { - - private static final Logger logger = LogService.getLogger(); - - private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue(); - - /** The acceptor that does the actual serving */ - private volatile AcceptorImpl acceptor; - - // moved to AbstractBridgeServer - - - - /** - * The advisor used by this bridge sever. - * @since 5.7 - */ - private volatile BridgeServerAdvisor advisor; - - /** - * The monitor used to monitor load on this - * bridge server and distribute load to the locators - * @since 5.7 - */ - private volatile LoadMonitor loadMonitor; - - /** - * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer - */ - private boolean isGatewayReceiver; - - private List<GatewayTransportFilter> gatewayTransportFilters = Collections.EMPTY_LIST; - - /** - * Needed because this guy is an advisee - * @since 5.7 - */ - private int serialNumber; // changed on each start - - public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = - Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false"); - - - // ////////////////////// Constructors ////////////////////// - - /** - * Creates a new <code>BridgeServerImpl</code> that serves the contents of - * the give <code>Cache</code>. It has the default configuration. - */ - public BridgeServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) { - super(cache); - this.isGatewayReceiver = isGatewayReceiver; - } - - // //////////////////// Instance Methods /////////////////// - - public CancelCriterion getCancelCriterion() { - return cache.getCancelCriterion(); - } - - /** - * Checks to see whether or not this bridge server is running. If so, an - * {@link IllegalStateException} is thrown. - */ - private void checkRunning() { - if (this.isRunning()) { - throw new IllegalStateException(LocalizedStrings.BridgeServerImpl_A_BRIDGE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString()); - } - } - - public boolean isGatewayReceiver() { - return this.isGatewayReceiver; - } - - @Override - public int getPort() { - if (this.acceptor != null) { - return this.acceptor.getPort(); - } - else { - return super.getPort(); - } - } - - @Override - public void setPort(int port) { - checkRunning(); - super.setPort(port); - } - - @Override - public void setBindAddress(String address) { - checkRunning(); - super.setBindAddress(address); - } - @Override - public void setHostnameForClients(String name) { - checkRunning(); - super.setHostnameForClients(name); - } - - @Override - public void setMaxConnections(int maxCon) { - checkRunning(); - super.setMaxConnections(maxCon); - } - - @Override - public void setMaxThreads(int maxThreads) { - checkRunning(); - super.setMaxThreads(maxThreads); - } - - @Override - public void setNotifyBySubscription(boolean b) { - checkRunning(); - if (BridgeServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) { - this.notifyBySubscription = b; - } - } - - @Override - public void setMaximumMessageCount(int maximumMessageCount) { - checkRunning(); - super.setMaximumMessageCount(maximumMessageCount); - } - - @Override - public void setSocketBufferSize(int socketBufferSize) { - this.socketBufferSize = socketBufferSize; - } - - @Override - public int getSocketBufferSize() { - return this.socketBufferSize; - } - - @Override - public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) { - this.maximumTimeBetweenPings = maximumTimeBetweenPings; - } - - @Override - public int getMaximumTimeBetweenPings() { - return this.maximumTimeBetweenPings; - } - - - @Override - public void setLoadPollInterval(long loadPollInterval) { - checkRunning(); - super.setLoadPollInterval(loadPollInterval); - } - - @Override - public int getMaximumMessageCount() { - return this.maximumMessageCount; - } - - @Override - public void setLoadProbe(ServerLoadProbe loadProbe) { - checkRunning(); - super.setLoadProbe(loadProbe); - } - - public void setGatewayTransportFilter( - List<GatewayTransportFilter> transportFilters) { - this.gatewayTransportFilters = transportFilters; - } - - @Override - public int getMessageTimeToLive() { - return this.messageTimeToLive; - } - - - public ClientSubscriptionConfig getClientSubscriptionConfig(){ - return this.clientSubscriptionConfig; - } - - /** - * Sets the configuration of <b>this</b> <code>CacheServer</code> based on - * the configuration of <b>another</b> <code>CacheServer</code>. - */ - public void configureFrom(CacheServer other) { - setPort(other.getPort()); - setBindAddress(other.getBindAddress()); - setHostnameForClients(other.getHostnameForClients()); - setMaxConnections(other.getMaxConnections()); - setMaxThreads(other.getMaxThreads()); - setNotifyBySubscription(other.getNotifyBySubscription()); - setSocketBufferSize(other.getSocketBufferSize()); - setTcpNoDelay(other.getTcpNoDelay()); - setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings()); - setMaximumMessageCount(other.getMaximumMessageCount()); - setMessageTimeToLive(other.getMessageTimeToLive()); -// setTransactionTimeToLive(other.getTransactionTimeToLive()); not implemented in CacheServer for v6.6 - setGroups(other.getGroups()); - setLoadProbe(other.getLoadProbe()); - setLoadPollInterval(other.getLoadPollInterval()); - ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig(); - ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig(); - // added for configuration of ha overflow - cscThis.setEvictionPolicy(cscOther.getEvictionPolicy()); - cscThis.setCapacity(cscOther.getCapacity()); - String diskStoreName = cscOther.getDiskStoreName(); - if (diskStoreName != null) { - cscThis.setDiskStoreName(diskStoreName); - } else { - cscThis.setOverflowDirectory(cscOther.getOverflowDirectory()); - } - } - - @Override - public synchronized void start() throws IOException { - Assert.assertTrue(this.cache != null); - boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem(); - - this.serialNumber = createSerialNumber(); - if (DynamicRegionFactory.get().isOpen()) { - // force notifyBySubscription to be true so that meta info is pushed - // from servers to clients instead of invalidates. - if (!this.notifyBySubscription) { - logger.info(LocalizedMessage.create(LocalizedStrings.BridgeServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS)); - this.notifyBySubscription = true; - } - } - this.advisor = BridgeServerAdvisor.createBridgeServerAdvisor(this); - this.loadMonitor = new LoadMonitor(loadProbe, maxConnections, - loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY, - advisor); - List overflowAttributesList = new LinkedList(); - ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); - overflowAttributesList.add(0, csc.getEvictionPolicy()); - overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity())); - overflowAttributesList.add(2, Integer.valueOf(this.port)); - String diskStoreName = csc.getDiskStoreName(); - if (diskStoreName != null) { - overflowAttributesList.add(3, diskStoreName); - overflowAttributesList.add(4, true); // indicator to use diskstore - } else { - overflowAttributesList.add(3, csc.getOverflowDirectory()); - overflowAttributesList.add(4, false); - } - - this.acceptor = new AcceptorImpl(getPort(), - getBindAddress(), - getNotifyBySubscription(), - getSocketBufferSize(), - getMaximumTimeBetweenPings(), - this.cache, - getMaxConnections(), - getMaxThreads(), - getMaximumMessageCount(), - getMessageTimeToLive(), - getTransactionTimeToLive(), - this.loadMonitor, - overflowAttributesList, - isSqlFabricSystem, - this.isGatewayReceiver, - this.gatewayTransportFilters, this.tcpNoDelay); - - this.acceptor.start(); - this.advisor.handshake(); - this.loadMonitor.start(new ServerLocation(getExternalAddress(), - getPort()), acceptor.getStats()); - - // TODO : Need to provide facility to enable/disable client health monitoring. - //Creating ClientHealthMonitoring region. - // Force initialization on current cache - if(cache instanceof GemFireCacheImpl) { - ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache); - } - this.cache.getLoggerI18n().config(LocalizedStrings.BridgeServerImpl_CACHESERVER_CONFIGURATION___0, getConfig()); - - /* - * If the stopped bridge server is restarted, we'll need to re-register the - * client membership listener. If the listener is already registered it - * won't be registered as would the case when start() is invoked for the - * first time. - */ - BridgeMembershipListener[] membershipListeners = - BridgeMembership.getBridgeMembershipListeners(); - - boolean membershipListenerRegistered = false; - for (BridgeMembershipListener membershipListener : membershipListeners) { - //just checking by reference as the listener instance is final - if (listener == membershipListener) { - membershipListenerRegistered = true; - break; - } - } - - if (!membershipListenerRegistered) { - BridgeMembership.registerBridgeMembershipListener(listener); - } - - if (!isGatewayReceiver) { - InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this); - } - - } - - - /** - * Gets the address that this bridge server can be contacted on from external - * processes. - * @since 5.7 - */ - public String getExternalAddress() { - return getExternalAddress(true); - } - - public String getExternalAddress(boolean checkServerRunning) { - if (checkServerRunning) { - if (!this.isRunning()) { - String s = "A bridge server's bind address is only available if it has been started"; - this.cache.getCancelCriterion().checkCancelInProgress(null); - throw new IllegalStateException(s); - } - } - if (this.hostnameForClients == null || this.hostnameForClients.equals("")) { - if (this.acceptor != null) { - return this.acceptor.getExternalAddress(); - } - else { - return null; - } - } - else { - return this.hostnameForClients; - } - } - - public boolean isRunning() { - return this.acceptor != null && this.acceptor.isRunning(); - } - - public synchronized void stop() { - if (!isRunning()) { - return; - } - - RuntimeException firstException = null; - - try { - if(this.loadMonitor != null) { - this.loadMonitor.stop(); - } - } catch(RuntimeException e) { - cache.getLoggerI18n().warning(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e); - firstException = e; - } - - try { - if (this.advisor != null) { - this.advisor.close(); - } - } catch(RuntimeException e) { - cache.getLoggerI18n().warning(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e); - firstException = e; - } - - try { - if (this.acceptor != null) { - this.acceptor.close(); - } - } catch(RuntimeException e) { - logger.warn(LocalizedMessage.create(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e); - if (firstException != null) { - firstException = e; - } - } - - if(firstException != null) { - throw firstException; - } - - //TODO : We need to clean up the admin region created for client - //monitoring. - - // BridgeServer is still available, just not running, so we don't take - // it out of the cache's list... - // cache.removeBridgeServer(this); - - /* Assuming start won't be called after stop */ - BridgeMembership.unregisterBridgeMembershipListener(this.listener); - - TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager(); - txMgr.removeHostedTXStatesForClients(); - - if (!isGatewayReceiver) { - InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) - .getDistributedSystem(); - system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this); - } - - } - - private String getConfig() { - ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); - String str = - "port=" + getPort() + " max-connections=" + getMaxConnections() - + " max-threads=" + getMaxThreads() + " notify-by-subscription=" - + getNotifyBySubscription() + " socket-buffer-size=" - + getSocketBufferSize() + " maximum-time-between-pings=" - + getMaximumTimeBetweenPings() + " maximum-message-count=" - + getMaximumMessageCount() + " message-time-to-live=" - + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy() - + " capacity=" + csc.getCapacity() + " overflow directory="; - if (csc.getDiskStoreName() != null) { - str += csc.getDiskStoreName(); - } else { - str += csc.getOverflowDirectory(); - } - str += - " groups=" + Arrays.asList(getGroups()) - + " loadProbe=" + loadProbe - + " loadPollInterval=" + loadPollInterval - + " tcpNoDelay=" + tcpNoDelay; - return str; - } - - @Override - public String toString() { - ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); - String str = - "CacheServer on port=" + getPort() + " client subscription config policy=" - + csc.getEvictionPolicy() + " client subscription config capacity=" - + csc.getCapacity(); - if (csc.getDiskStoreName() != null) { - str += " client subscription config overflow disk store=" - + csc.getDiskStoreName(); - } else { - str += " client subscription config overflow directory=" - + csc.getOverflowDirectory(); - } - return str; - } - - /** - * Test method used to access the internal acceptor - * - * @return the internal acceptor - */ - public AcceptorImpl getAcceptor() { - return this.acceptor; - } - - // DistributionAdvisee methods - - public DM getDistributionManager() { - return getSystem().getDistributionManager(); - } - - public ClientSession getClientSession(String durableClientId) { - return getCacheClientNotifier().getClientProxy(durableClientId); - } - - public ClientSession getClientSession(DistributedMember member) { - return getCacheClientNotifier().getClientProxy( - ClientProxyMembershipID.getClientId(member)); - } - - public Set getAllClientSessions() { - return new HashSet(getCacheClientNotifier().getClientProxies()); - } - - /** - * create client subscription - * - * @param cache - * @param ePolicy - * @param capacity - * @param port - * @param overFlowDir - * @param isDiskStore - * @return client subscription name - * @since 5.7 - */ - public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy, - int capacity, int port, String overFlowDir, boolean isDiskStore) { - AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, - ePolicy, capacity, overFlowDir, isDiskStore); - RegionAttributes attr = factory.create(); - - return createClientMessagesRegion(attr, cache, capacity, port); - } - - public static AttributesFactory getAttribFactoryForClientMessagesRegion( - GemFireCacheImpl cache, - String ePolicy, int capacity, String overflowDir, boolean isDiskStore) - throws InvalidValueException, GemFireIOException { - AttributesFactory factory = new AttributesFactory(); - factory.setScope(Scope.LOCAL); - - if (isDiskStore) { - // overflowDir parameter is actually diskstore name - factory.setDiskStoreName(overflowDir); - // client subscription queue is always overflow to disk, so do async - // see feature request #41479 - factory.setDiskSynchronous(true); - } else if (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) { - factory.setDiskStoreName(null); - // client subscription queue is always overflow to disk, so do async - // see feature request #41479 - factory.setDiskSynchronous(true); - } else { - File dir = new File(overflowDir + File.separatorChar - + generateNameForClientMsgsRegion(OSProcess.getId())); - // This will delete the overflow directory when virtual machine terminates. - dir.deleteOnExit(); - if (!dir.mkdirs() && !dir.isDirectory()) { - throw new GemFireIOException("Could not create client subscription overflow directory: " - + dir.getAbsolutePath()); - } - File[] dirs = { dir }; - DiskStoreFactory dsf = cache.createDiskStoreFactory(); - DiskStore bsi = dsf.setAutoCompact(true) - .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE }) - .create("bsi"); - factory.setDiskStoreName("bsi"); - // backward compatibility, it was sync - factory.setDiskSynchronous(true); - } - factory.setDataPolicy(DataPolicy.NORMAL); - // enable statistics - factory.setStatisticsEnabled(true); - /* setting LIFO related eviction attributes */ - if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) { - factory - .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes( - capacity, EvictionAction.OVERFLOW_TO_DISK)); - } - else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement - factory - .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes( - capacity, EvictionAction.OVERFLOW_TO_DISK)); - } - else { - // throw invalid eviction policy exception - throw new InvalidValueException( - LocalizedStrings.BridgeServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy)); - } - return factory; - } - - public static String createClientMessagesRegion(RegionAttributes attr, - GemFireCacheImpl cache, int capacity, int port) { - // generating unique name in VM for ClientMessagesRegion - String regionName = generateNameForClientMsgsRegion(port); - try { - cache.createVMRegion(regionName, attr, - new InternalRegionArguments().setDestroyLockFlag(true) - .setRecreateFlag(false).setSnapshotInputStream(null) - .setImageTarget(null).setIsUsedForMetaRegion(true)); - } - catch (RegionExistsException ree) { - InternalGemFireError assErr = new InternalGemFireError( - "unexpected exception"); - assErr.initCause(ree); - throw assErr; - } - catch (IOException e) { - // only if loading snapshot, not here - InternalGemFireError assErr = new InternalGemFireError( - "unexpected exception"); - assErr.initCause(e); - throw assErr; - } - catch (ClassNotFoundException e) { - // only if loading snapshot, not here - InternalGemFireError assErr = new InternalGemFireError( - "unexpected exception"); - assErr.initCause(e); - throw assErr; - } - return regionName; - } - - public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache, - String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) { - AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, - ePolicy, capacity, overFlowDir, isDiskStore); - ExpirationAttributes ea = new ExpirationAttributes(expiryTime, - ExpirationAction.LOCAL_INVALIDATE); - factory.setEntryTimeToLive(ea); - RegionAttributes attr = factory.create(); - - return createClientMessagesRegion(attr, cache, capacity, port); - } - - /** - * Generates the name for the client subscription using the given id. - * - * @param id - * @return String - * @since 5.7 - */ - public static String generateNameForClientMsgsRegion(int id) { - return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id; - } - - /* - * Marker class name to identify the lock more easily in thread dumps private - * static class ClientMessagesRegionLock extends Object { } - */ - public DistributionAdvisor getDistributionAdvisor() { - return this.advisor; - } - - /** - * Returns the BridgeServerAdvisor for this server - */ - public BridgeServerAdvisor getCacheServerAdvisor() { - return this.advisor; - } - - public Profile getProfile() { - return getDistributionAdvisor().createProfile(); - } - - public DistributionAdvisee getParentAdvisee() { - return null; - } - - /** - * Returns the underlying <code>InternalDistributedSystem</code> connection. - * @return the underlying <code>InternalDistributedSystem</code> - */ - public InternalDistributedSystem getSystem() { - return (InternalDistributedSystem)this.cache.getDistributedSystem(); - } - - public String getName() { - return "CacheServer"; - } - - public String getFullPath() { - return getName(); - } - - private final static AtomicInteger profileSN = new AtomicInteger(); - - private static int createSerialNumber() { - return profileSN.incrementAndGet(); - } - - /** - * Returns an array of all the groups of this bridge server. - * This includes those from the groups gemfire property - * and those explicitly added to this server. - */ - public String[] getCombinedGroups() { - ArrayList<String> groupList = new ArrayList<String>(); - for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) { - if (!groupList.contains(g)) { - groupList.add(g); - } - } - for (String g: getGroups()) { - if (!groupList.contains(g)) { - groupList.add(g); - } - } - String[] groups = new String[groupList.size()]; - return groupList.toArray(groups); - } - - public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) { - assert profile instanceof BridgeServerProfile; - BridgeServerProfile bp = (BridgeServerProfile)profile; - bp.setHost(getExternalAddress(false)); - bp.setPort(getPort()); - bp.setGroups(getCombinedGroups()); - bp.setMaxConnections(maxConnections); - bp.setInitialLoad(loadMonitor.getLastLoad()); - bp.setLoadPollInterval(getLoadPollInterval()); - bp.serialNumber = getSerialNumber(); - bp.finishInit(); - } - - public int getSerialNumber() { - return this.serialNumber; - } - - - protected CacheClientNotifier getCacheClientNotifier() { - return getAcceptor().getCacheClientNotifier(); - } - - /** - * Registers a new <code>InterestRegistrationListener</code> with the set of - * <code>InterestRegistrationListener</code>s. - * - * @param listener - * The <code>InterestRegistrationListener</code> to register - * @throws IllegalStateException if the BridgeServer has not been started - * @since 5.8Beta - */ - public void registerInterestRegistrationListener( - InterestRegistrationListener listener) { - if (!this.isRunning()) { - throw new IllegalStateException(LocalizedStrings.BridgeServerImpl_MUST_BE_RUNNING.toLocalizedString()); - } - getCacheClientNotifier().registerInterestRegistrationListener(listener); - } - - /** - * Unregisters an existing <code>InterestRegistrationListener</code> from - * the set of <code>InterestRegistrationListener</code>s. - * - * @param listener - * The <code>InterestRegistrationListener</code> to - * unregister - * - * @since 5.8Beta - */ - public void unregisterInterestRegistrationListener( - InterestRegistrationListener listener) { - getCacheClientNotifier().unregisterInterestRegistrationListener(listener); - } - - /** - * Returns a read-only set of <code>InterestRegistrationListener</code>s - * registered with this notifier. - * - * @return a read-only set of <code>InterestRegistrationListener</code>s - * registered with this notifier - * - * @since 5.8Beta - */ - public Set getInterestRegistrationListeners() { - return getCacheClientNotifier().getInterestRegistrationListeners(); - } -}
http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java index 1299d75..ec75a92 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/BucketAdvisor.java @@ -1789,7 +1789,7 @@ public final class BucketAdvisor extends CacheDistributionAdvisor { HashSet<BucketServerLocation66> serverLocations = new HashSet<BucketServerLocation66>(); for (Object object : servers) { - BridgeServerImpl server = (BridgeServerImpl)object; + CacheServerImpl server = (CacheServerImpl)object; if (server.isRunning() && (server.getExternalAddress() != null)) { BucketServerLocation66 location = new BucketServerLocation66( getBucket().getId(), server.getPort(), server http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheConfig.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheConfig.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheConfig.java index 7aaa241..4c29879 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheConfig.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheConfig.java @@ -9,7 +9,7 @@ package com.gemstone.gemfire.internal.cache; import java.util.List; -import com.gemstone.gemfire.internal.cache.xmlcache.BridgeServerCreation; +import com.gemstone.gemfire.internal.cache.xmlcache.CacheServerCreation; import com.gemstone.gemfire.internal.i18n.LocalizedStrings; import com.gemstone.gemfire.pdx.PdxSerializer; import com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer; @@ -40,7 +40,7 @@ public class CacheConfig { /** * list of cache servers to create after auto-reconnect if cluster configuration is being used */ - private List<BridgeServerCreation> cacheServerCreation; + private List<CacheServerCreation> cacheServerCreation; /** * This indicates if the pdxReadSerialized value is set by user. This is used @@ -137,12 +137,12 @@ public class CacheConfig { } - public List<BridgeServerCreation> getCacheServerCreation() { + public List<CacheServerCreation> getCacheServerCreation() { return this.cacheServerCreation; } - public void setCacheServerCreation(List<BridgeServerCreation> servers) { + public void setCacheServerCreation(List<CacheServerCreation> servers) { this.cacheServerCreation = servers; } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java new file mode 100644 index 0000000..646f336 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerAdvisor.java @@ -0,0 +1,164 @@ +/*========================================================================= + * Copyright (c) 2002-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ + +package com.gemstone.gemfire.internal.cache; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import com.gemstone.gemfire.DataSerializer; +import com.gemstone.gemfire.cache.server.ServerLoad; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisee; +import com.gemstone.gemfire.distributed.internal.DistributionManager; +import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedMember; +import com.gemstone.gemfire.internal.InternalDataSerializer; + + +/** + * Used to give advice to a cache server. + * Cache server currently need to know about controller's + * @author darrel + * + */ +public class CacheServerAdvisor extends GridAdvisor { + + private CacheServerAdvisor(DistributionAdvisee server) { + super(server); + } + + public static CacheServerAdvisor createCacheServerAdvisor(DistributionAdvisee server) { + CacheServerAdvisor advisor = new CacheServerAdvisor(server); + advisor.initialize(); + return advisor; + } + + @Override + public String toString() { + return "CacheServerAdvisor for " + getAdvisee().getFullPath(); + } + + /** Instantiate new distribution profile for this member */ + @Override + protected Profile instantiateProfile( + InternalDistributedMember memberId, int version) { + return new CacheServerProfile(memberId, version); + } + + /** + * Describes a cache server for distribution purposes. + */ + public static class CacheServerProfile extends GridAdvisor.GridProfile { + private String[] groups; + private int maxConnections; + private ServerLoad initialLoad; + private long loadPollInterval; + + /** for internal use, required for DataSerializer.readObject */ + public CacheServerProfile() { + } + + public CacheServerProfile(InternalDistributedMember memberId, int version) { + super(memberId, version); + } + + public CacheServerProfile(CacheServerProfile toCopy) { + super(toCopy); + this.groups = toCopy.groups; + } + + /** don't modify the returned array! */ + public String[] getGroups() { + return this.groups; + } + public void setGroups(String[] groups) { + this.groups = groups; + } + + public ServerLoad getInitialLoad() { + return initialLoad; + } + + public int getMaxConnections() { + return maxConnections; + } + + public void setMaxConnections(int maxConnections) { + this.maxConnections = maxConnections; + } + + public void setInitialLoad(ServerLoad initialLoad) { + this.initialLoad = initialLoad; + } + public long getLoadPollInterval() { + return this.loadPollInterval; + } + public void setLoadPollInterval(long v) { + this.loadPollInterval = v; + } + + /** + * Used to process an incoming cache server profile. Any controller in this + * vm needs to be told about this incoming new cache server. The reply + * needs to contain any controller(s) that exist in this vm. + * + * @since 5.7 + */ + @Override + public void processIncoming(DistributionManager dm, String adviseePath, + boolean removeProfile, boolean exchangeProfiles, + final List<Profile> replyProfiles) { + // tell local controllers about this cache server + tellLocalControllers(removeProfile, exchangeProfiles, replyProfiles); + // for QRM messaging we need cache servers to know about each other + tellLocalBridgeServers(removeProfile, exchangeProfiles, replyProfiles); + } + + @Override + public int getDSFID() { + return CACHE_SERVER_PROFILE; + } + + @Override + public void toData(DataOutput out) throws IOException { + super.toData(out); + DataSerializer.writeStringArray(this.groups, out); + out.writeInt(maxConnections); + InternalDataSerializer.invokeToData(initialLoad, out); + out.writeLong(getLoadPollInterval()); + } + + @Override + public void fromData(DataInput in) throws IOException, ClassNotFoundException { + super.fromData(in); + this.groups = DataSerializer.readStringArray(in); + this.maxConnections = in.readInt(); + this.initialLoad = new ServerLoad(); + InternalDataSerializer.invokeFromData(initialLoad, in); + setLoadPollInterval(in.readLong()); + } + + @Override + public StringBuilder getToStringHeader() { + return new StringBuilder("BridgeServerProfile"); + } + + @Override + public void fillInToString(StringBuilder sb) { + super.fillInToString(sb); + if (this.groups != null) { + sb.append("; groups=" + Arrays.asList(this.groups)); + sb.append("; maxConnections=" + maxConnections); + sb.append("; initialLoad=" + initialLoad); + sb.append("; loadPollInterval=" + getLoadPollInterval()); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java new file mode 100644 index 0000000..4591831 --- /dev/null +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/CacheServerImpl.java @@ -0,0 +1,812 @@ +/*========================================================================= + * Copyright (c) 2010-2014 Pivotal Software, Inc. All Rights Reserved. + * This product is protected by U.S. and international copyright + * and intellectual property laws. Pivotal products are covered by + * one or more patents listed at http://www.pivotal.io/patents. + *========================================================================= + */ +package com.gemstone.gemfire.internal.cache; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.logging.log4j.Logger; + +import com.gemstone.gemfire.CancelCriterion; +import com.gemstone.gemfire.GemFireIOException; +import com.gemstone.gemfire.InternalGemFireError; +import com.gemstone.gemfire.InvalidValueException; +import com.gemstone.gemfire.cache.AttributesFactory; +import com.gemstone.gemfire.cache.ClientSession; +import com.gemstone.gemfire.cache.DataPolicy; +import com.gemstone.gemfire.cache.DiskStore; +import com.gemstone.gemfire.cache.DiskStoreFactory; +import com.gemstone.gemfire.cache.DynamicRegionFactory; +import com.gemstone.gemfire.cache.EvictionAction; +import com.gemstone.gemfire.cache.ExpirationAction; +import com.gemstone.gemfire.cache.ExpirationAttributes; +import com.gemstone.gemfire.cache.InterestRegistrationListener; +import com.gemstone.gemfire.cache.RegionAttributes; +import com.gemstone.gemfire.cache.RegionExistsException; +import com.gemstone.gemfire.cache.Scope; +import com.gemstone.gemfire.cache.server.CacheServer; +import com.gemstone.gemfire.cache.server.ClientSubscriptionConfig; +import com.gemstone.gemfire.cache.server.ServerLoadProbe; +import com.gemstone.gemfire.cache.server.internal.LoadMonitor; +import com.gemstone.gemfire.cache.util.BridgeMembership; +import com.gemstone.gemfire.cache.util.BridgeMembershipListener; +import com.gemstone.gemfire.cache.wan.GatewayTransportFilter; +import com.gemstone.gemfire.distributed.DistributedMember; +import com.gemstone.gemfire.distributed.internal.DM; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisee; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor; +import com.gemstone.gemfire.distributed.internal.DistributionAdvisor.Profile; +import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; +import com.gemstone.gemfire.distributed.internal.ResourceEvent; +import com.gemstone.gemfire.distributed.internal.ServerLocation; +import com.gemstone.gemfire.distributed.internal.membership.MemberAttributes; +import com.gemstone.gemfire.internal.Assert; +import com.gemstone.gemfire.internal.OSProcess; +import com.gemstone.gemfire.internal.admin.ClientHealthMonitoringRegion; +import com.gemstone.gemfire.internal.cache.CacheServerAdvisor.CacheServerProfile; +import com.gemstone.gemfire.internal.cache.ha.HARegionQueue; +import com.gemstone.gemfire.internal.cache.tier.Acceptor; +import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; +import com.gemstone.gemfire.internal.cache.tier.sockets.CacheClientNotifier; +import com.gemstone.gemfire.internal.cache.tier.sockets.ClientProxyMembershipID; +import com.gemstone.gemfire.internal.i18n.LocalizedStrings; +import com.gemstone.gemfire.internal.logging.LogService; +import com.gemstone.gemfire.internal.logging.log4j.LocalizedMessage; + +/** + * An implementation of the <code>CacheServer</code> interface that delegates + * most of the heavy lifting to an {@link Acceptor}. + * + * @author David Whitlock + * @since 4.0 + */ +@SuppressWarnings("deprecation") +public class CacheServerImpl + extends AbstractCacheServer + implements DistributionAdvisee { + + private static final Logger logger = LogService.getLogger(); + + private static final int FORCE_LOAD_UPDATE_FREQUENCY= Integer.getInteger("gemfire.BridgeServer.FORCE_LOAD_UPDATE_FREQUENCY", 10).intValue(); + + /** The acceptor that does the actual serving */ + private volatile AcceptorImpl acceptor; + + /** + * The advisor used by this cache server. + * @since 5.7 + */ + private volatile CacheServerAdvisor advisor; + + /** + * The monitor used to monitor load on this + * bridge server and distribute load to the locators + * @since 5.7 + */ + private volatile LoadMonitor loadMonitor; + + /** + * boolean that represents whether this server is a GatewayReceiver or a simple BridgeServer + */ + private boolean isGatewayReceiver; + + private List<GatewayTransportFilter> gatewayTransportFilters = Collections.EMPTY_LIST; + + /** + * Needed because this guy is an advisee + * @since 5.7 + */ + private int serialNumber; // changed on each start + + public static final boolean ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE = + Boolean.getBoolean("gemfire.cache-server.enable-notify-by-subscription-false"); + + + // ////////////////////// Constructors ////////////////////// + + /** + * Creates a new <code>BridgeServerImpl</code> that serves the contents of + * the give <code>Cache</code>. It has the default configuration. + */ + public CacheServerImpl(GemFireCacheImpl cache, boolean isGatewayReceiver) { + super(cache); + this.isGatewayReceiver = isGatewayReceiver; + } + + // //////////////////// Instance Methods /////////////////// + + public CancelCriterion getCancelCriterion() { + return cache.getCancelCriterion(); + } + + /** + * Checks to see whether or not this bridge server is running. If so, an + * {@link IllegalStateException} is thrown. + */ + private void checkRunning() { + if (this.isRunning()) { + throw new IllegalStateException(LocalizedStrings.BridgeServerImpl_A_BRIDGE_SERVERS_CONFIGURATION_CANNOT_BE_CHANGED_ONCE_IT_IS_RUNNING.toLocalizedString()); + } + } + + public boolean isGatewayReceiver() { + return this.isGatewayReceiver; + } + + @Override + public int getPort() { + if (this.acceptor != null) { + return this.acceptor.getPort(); + } + else { + return super.getPort(); + } + } + + @Override + public void setPort(int port) { + checkRunning(); + super.setPort(port); + } + + @Override + public void setBindAddress(String address) { + checkRunning(); + super.setBindAddress(address); + } + @Override + public void setHostnameForClients(String name) { + checkRunning(); + super.setHostnameForClients(name); + } + + @Override + public void setMaxConnections(int maxCon) { + checkRunning(); + super.setMaxConnections(maxCon); + } + + @Override + public void setMaxThreads(int maxThreads) { + checkRunning(); + super.setMaxThreads(maxThreads); + } + + @Override + public void setNotifyBySubscription(boolean b) { + checkRunning(); + if (CacheServerImpl.ENABLE_NOTIFY_BY_SUBSCRIPTION_FALSE) { + this.notifyBySubscription = b; + } + } + + @Override + public void setMaximumMessageCount(int maximumMessageCount) { + checkRunning(); + super.setMaximumMessageCount(maximumMessageCount); + } + + @Override + public void setSocketBufferSize(int socketBufferSize) { + this.socketBufferSize = socketBufferSize; + } + + @Override + public int getSocketBufferSize() { + return this.socketBufferSize; + } + + @Override + public void setMaximumTimeBetweenPings(int maximumTimeBetweenPings) { + this.maximumTimeBetweenPings = maximumTimeBetweenPings; + } + + @Override + public int getMaximumTimeBetweenPings() { + return this.maximumTimeBetweenPings; + } + + + @Override + public void setLoadPollInterval(long loadPollInterval) { + checkRunning(); + super.setLoadPollInterval(loadPollInterval); + } + + @Override + public int getMaximumMessageCount() { + return this.maximumMessageCount; + } + + @Override + public void setLoadProbe(ServerLoadProbe loadProbe) { + checkRunning(); + super.setLoadProbe(loadProbe); + } + + public void setGatewayTransportFilter( + List<GatewayTransportFilter> transportFilters) { + this.gatewayTransportFilters = transportFilters; + } + + @Override + public int getMessageTimeToLive() { + return this.messageTimeToLive; + } + + + public ClientSubscriptionConfig getClientSubscriptionConfig(){ + return this.clientSubscriptionConfig; + } + + /** + * Sets the configuration of <b>this</b> <code>CacheServer</code> based on + * the configuration of <b>another</b> <code>CacheServer</code>. + */ + public void configureFrom(CacheServer other) { + setPort(other.getPort()); + setBindAddress(other.getBindAddress()); + setHostnameForClients(other.getHostnameForClients()); + setMaxConnections(other.getMaxConnections()); + setMaxThreads(other.getMaxThreads()); + setNotifyBySubscription(other.getNotifyBySubscription()); + setSocketBufferSize(other.getSocketBufferSize()); + setTcpNoDelay(other.getTcpNoDelay()); + setMaximumTimeBetweenPings(other.getMaximumTimeBetweenPings()); + setMaximumMessageCount(other.getMaximumMessageCount()); + setMessageTimeToLive(other.getMessageTimeToLive()); +// setTransactionTimeToLive(other.getTransactionTimeToLive()); not implemented in CacheServer for v6.6 + setGroups(other.getGroups()); + setLoadProbe(other.getLoadProbe()); + setLoadPollInterval(other.getLoadPollInterval()); + ClientSubscriptionConfig cscOther = other.getClientSubscriptionConfig(); + ClientSubscriptionConfig cscThis = this.getClientSubscriptionConfig(); + // added for configuration of ha overflow + cscThis.setEvictionPolicy(cscOther.getEvictionPolicy()); + cscThis.setCapacity(cscOther.getCapacity()); + String diskStoreName = cscOther.getDiskStoreName(); + if (diskStoreName != null) { + cscThis.setDiskStoreName(diskStoreName); + } else { + cscThis.setOverflowDirectory(cscOther.getOverflowDirectory()); + } + } + + @Override + public synchronized void start() throws IOException { + Assert.assertTrue(this.cache != null); + boolean isSqlFabricSystem = ((GemFireCacheImpl)this.cache).isSqlfSystem(); + + this.serialNumber = createSerialNumber(); + if (DynamicRegionFactory.get().isOpen()) { + // force notifyBySubscription to be true so that meta info is pushed + // from servers to clients instead of invalidates. + if (!this.notifyBySubscription) { + logger.info(LocalizedMessage.create(LocalizedStrings.BridgeServerImpl_FORCING_NOTIFYBYSUBSCRIPTION_TO_SUPPORT_DYNAMIC_REGIONS)); + this.notifyBySubscription = true; + } + } + this.advisor = CacheServerAdvisor.createCacheServerAdvisor(this); + this.loadMonitor = new LoadMonitor(loadProbe, maxConnections, + loadPollInterval, FORCE_LOAD_UPDATE_FREQUENCY, + advisor); + List overflowAttributesList = new LinkedList(); + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + overflowAttributesList.add(0, csc.getEvictionPolicy()); + overflowAttributesList.add(1, Integer.valueOf(csc.getCapacity())); + overflowAttributesList.add(2, Integer.valueOf(this.port)); + String diskStoreName = csc.getDiskStoreName(); + if (diskStoreName != null) { + overflowAttributesList.add(3, diskStoreName); + overflowAttributesList.add(4, true); // indicator to use diskstore + } else { + overflowAttributesList.add(3, csc.getOverflowDirectory()); + overflowAttributesList.add(4, false); + } + + this.acceptor = new AcceptorImpl(getPort(), + getBindAddress(), + getNotifyBySubscription(), + getSocketBufferSize(), + getMaximumTimeBetweenPings(), + this.cache, + getMaxConnections(), + getMaxThreads(), + getMaximumMessageCount(), + getMessageTimeToLive(), + getTransactionTimeToLive(), + this.loadMonitor, + overflowAttributesList, + isSqlFabricSystem, + this.isGatewayReceiver, + this.gatewayTransportFilters, this.tcpNoDelay); + + this.acceptor.start(); + this.advisor.handshake(); + this.loadMonitor.start(new ServerLocation(getExternalAddress(), + getPort()), acceptor.getStats()); + + // TODO : Need to provide facility to enable/disable client health monitoring. + //Creating ClientHealthMonitoring region. + // Force initialization on current cache + if(cache instanceof GemFireCacheImpl) { + ClientHealthMonitoringRegion.getInstance((GemFireCacheImpl)cache); + } + this.cache.getLoggerI18n().config(LocalizedStrings.BridgeServerImpl_CACHESERVER_CONFIGURATION___0, getConfig()); + + /* + * If the stopped bridge server is restarted, we'll need to re-register the + * client membership listener. If the listener is already registered it + * won't be registered as would the case when start() is invoked for the + * first time. + */ + BridgeMembershipListener[] membershipListeners = + BridgeMembership.getBridgeMembershipListeners(); + + boolean membershipListenerRegistered = false; + for (BridgeMembershipListener membershipListener : membershipListeners) { + //just checking by reference as the listener instance is final + if (listener == membershipListener) { + membershipListenerRegistered = true; + break; + } + } + + if (!membershipListenerRegistered) { + BridgeMembership.registerBridgeMembershipListener(listener); + } + + if (!isGatewayReceiver) { + InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.CACHE_SERVER_START, this); + } + + } + + + /** + * Gets the address that this bridge server can be contacted on from external + * processes. + * @since 5.7 + */ + public String getExternalAddress() { + return getExternalAddress(true); + } + + public String getExternalAddress(boolean checkServerRunning) { + if (checkServerRunning) { + if (!this.isRunning()) { + String s = "A bridge server's bind address is only available if it has been started"; + this.cache.getCancelCriterion().checkCancelInProgress(null); + throw new IllegalStateException(s); + } + } + if (this.hostnameForClients == null || this.hostnameForClients.equals("")) { + if (this.acceptor != null) { + return this.acceptor.getExternalAddress(); + } + else { + return null; + } + } + else { + return this.hostnameForClients; + } + } + + public boolean isRunning() { + return this.acceptor != null && this.acceptor.isRunning(); + } + + public synchronized void stop() { + if (!isRunning()) { + return; + } + + RuntimeException firstException = null; + + try { + if(this.loadMonitor != null) { + this.loadMonitor.stop(); + } + } catch(RuntimeException e) { + cache.getLoggerI18n().warning(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_LOAD_MONITOR, e); + firstException = e; + } + + try { + if (this.advisor != null) { + this.advisor.close(); + } + } catch(RuntimeException e) { + cache.getLoggerI18n().warning(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_ADVISOR, e); + firstException = e; + } + + try { + if (this.acceptor != null) { + this.acceptor.close(); + } + } catch(RuntimeException e) { + logger.warn(LocalizedMessage.create(LocalizedStrings.BridgeServerImpl_CACHESERVER_ERROR_CLOSING_ACCEPTOR_MONITOR), e); + if (firstException != null) { + firstException = e; + } + } + + if(firstException != null) { + throw firstException; + } + + //TODO : We need to clean up the admin region created for client + //monitoring. + + // BridgeServer is still available, just not running, so we don't take + // it out of the cache's list... + // cache.removeBridgeServer(this); + + /* Assuming start won't be called after stop */ + BridgeMembership.unregisterBridgeMembershipListener(this.listener); + + TXManagerImpl txMgr = (TXManagerImpl) cache.getCacheTransactionManager(); + txMgr.removeHostedTXStatesForClients(); + + if (!isGatewayReceiver) { + InternalDistributedSystem system = ((GemFireCacheImpl) this.cache) + .getDistributedSystem(); + system.handleResourceEvent(ResourceEvent.CACHE_SERVER_STOP, this); + } + + } + + private String getConfig() { + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + String str = + "port=" + getPort() + " max-connections=" + getMaxConnections() + + " max-threads=" + getMaxThreads() + " notify-by-subscription=" + + getNotifyBySubscription() + " socket-buffer-size=" + + getSocketBufferSize() + " maximum-time-between-pings=" + + getMaximumTimeBetweenPings() + " maximum-message-count=" + + getMaximumMessageCount() + " message-time-to-live=" + + getMessageTimeToLive() + " eviction-policy=" + csc.getEvictionPolicy() + + " capacity=" + csc.getCapacity() + " overflow directory="; + if (csc.getDiskStoreName() != null) { + str += csc.getDiskStoreName(); + } else { + str += csc.getOverflowDirectory(); + } + str += + " groups=" + Arrays.asList(getGroups()) + + " loadProbe=" + loadProbe + + " loadPollInterval=" + loadPollInterval + + " tcpNoDelay=" + tcpNoDelay; + return str; + } + + @Override + public String toString() { + ClientSubscriptionConfig csc = this.getClientSubscriptionConfig(); + String str = + "CacheServer on port=" + getPort() + " client subscription config policy=" + + csc.getEvictionPolicy() + " client subscription config capacity=" + + csc.getCapacity(); + if (csc.getDiskStoreName() != null) { + str += " client subscription config overflow disk store=" + + csc.getDiskStoreName(); + } else { + str += " client subscription config overflow directory=" + + csc.getOverflowDirectory(); + } + return str; + } + + /** + * Test method used to access the internal acceptor + * + * @return the internal acceptor + */ + public AcceptorImpl getAcceptor() { + return this.acceptor; + } + + // DistributionAdvisee methods + + public DM getDistributionManager() { + return getSystem().getDistributionManager(); + } + + public ClientSession getClientSession(String durableClientId) { + return getCacheClientNotifier().getClientProxy(durableClientId); + } + + public ClientSession getClientSession(DistributedMember member) { + return getCacheClientNotifier().getClientProxy( + ClientProxyMembershipID.getClientId(member)); + } + + public Set getAllClientSessions() { + return new HashSet(getCacheClientNotifier().getClientProxies()); + } + + /** + * create client subscription + * + * @param cache + * @param ePolicy + * @param capacity + * @param port + * @param overFlowDir + * @param isDiskStore + * @return client subscription name + * @since 5.7 + */ + public static String clientMessagesRegion(GemFireCacheImpl cache, String ePolicy, + int capacity, int port, String overFlowDir, boolean isDiskStore) { + AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, + ePolicy, capacity, overFlowDir, isDiskStore); + RegionAttributes attr = factory.create(); + + return createClientMessagesRegion(attr, cache, capacity, port); + } + + public static AttributesFactory getAttribFactoryForClientMessagesRegion( + GemFireCacheImpl cache, + String ePolicy, int capacity, String overflowDir, boolean isDiskStore) + throws InvalidValueException, GemFireIOException { + AttributesFactory factory = new AttributesFactory(); + factory.setScope(Scope.LOCAL); + + if (isDiskStore) { + // overflowDir parameter is actually diskstore name + factory.setDiskStoreName(overflowDir); + // client subscription queue is always overflow to disk, so do async + // see feature request #41479 + factory.setDiskSynchronous(true); + } else if (overflowDir == null || overflowDir.equals(ClientSubscriptionConfig.DEFAULT_OVERFLOW_DIRECTORY)) { + factory.setDiskStoreName(null); + // client subscription queue is always overflow to disk, so do async + // see feature request #41479 + factory.setDiskSynchronous(true); + } else { + File dir = new File(overflowDir + File.separatorChar + + generateNameForClientMsgsRegion(OSProcess.getId())); + // This will delete the overflow directory when virtual machine terminates. + dir.deleteOnExit(); + if (!dir.mkdirs() && !dir.isDirectory()) { + throw new GemFireIOException("Could not create client subscription overflow directory: " + + dir.getAbsolutePath()); + } + File[] dirs = { dir }; + DiskStoreFactory dsf = cache.createDiskStoreFactory(); + DiskStore bsi = dsf.setAutoCompact(true) + .setDiskDirsAndSizes(dirs, new int[] { Integer.MAX_VALUE }) + .create("bsi"); + factory.setDiskStoreName("bsi"); + // backward compatibility, it was sync + factory.setDiskSynchronous(true); + } + factory.setDataPolicy(DataPolicy.NORMAL); + // enable statistics + factory.setStatisticsEnabled(true); + /* setting LIFO related eviction attributes */ + if (HARegionQueue.HA_EVICTION_POLICY_ENTRY.equals(ePolicy)) { + factory + .setEvictionAttributes(EvictionAttributesImpl.createLIFOEntryAttributes( + capacity, EvictionAction.OVERFLOW_TO_DISK)); + } + else if (HARegionQueue.HA_EVICTION_POLICY_MEMORY.equals(ePolicy)) { // condition refinement + factory + .setEvictionAttributes(EvictionAttributesImpl.createLIFOMemoryAttributes( + capacity, EvictionAction.OVERFLOW_TO_DISK)); + } + else { + // throw invalid eviction policy exception + throw new InvalidValueException( + LocalizedStrings.BridgeServerImpl__0_INVALID_EVICTION_POLICY.toLocalizedString(ePolicy)); + } + return factory; + } + + public static String createClientMessagesRegion(RegionAttributes attr, + GemFireCacheImpl cache, int capacity, int port) { + // generating unique name in VM for ClientMessagesRegion + String regionName = generateNameForClientMsgsRegion(port); + try { + cache.createVMRegion(regionName, attr, + new InternalRegionArguments().setDestroyLockFlag(true) + .setRecreateFlag(false).setSnapshotInputStream(null) + .setImageTarget(null).setIsUsedForMetaRegion(true)); + } + catch (RegionExistsException ree) { + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(ree); + throw assErr; + } + catch (IOException e) { + // only if loading snapshot, not here + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(e); + throw assErr; + } + catch (ClassNotFoundException e) { + // only if loading snapshot, not here + InternalGemFireError assErr = new InternalGemFireError( + "unexpected exception"); + assErr.initCause(e); + throw assErr; + } + return regionName; + } + + public static String createClientMessagesRegionForTesting(GemFireCacheImpl cache, + String ePolicy, int capacity, int port, int expiryTime, String overFlowDir, boolean isDiskStore) { + AttributesFactory factory = getAttribFactoryForClientMessagesRegion(cache, + ePolicy, capacity, overFlowDir, isDiskStore); + ExpirationAttributes ea = new ExpirationAttributes(expiryTime, + ExpirationAction.LOCAL_INVALIDATE); + factory.setEntryTimeToLive(ea); + RegionAttributes attr = factory.create(); + + return createClientMessagesRegion(attr, cache, capacity, port); + } + + /** + * Generates the name for the client subscription using the given id. + * + * @param id + * @return String + * @since 5.7 + */ + public static String generateNameForClientMsgsRegion(int id) { + return ClientSubscriptionConfigImpl.CLIENT_SUBSCRIPTION + "_" + id; + } + + /* + * Marker class name to identify the lock more easily in thread dumps private + * static class ClientMessagesRegionLock extends Object { } + */ + public DistributionAdvisor getDistributionAdvisor() { + return this.advisor; + } + + /** + * Returns the BridgeServerAdvisor for this server + */ + public CacheServerAdvisor getCacheServerAdvisor() { + return this.advisor; + } + + public Profile getProfile() { + return getDistributionAdvisor().createProfile(); + } + + public DistributionAdvisee getParentAdvisee() { + return null; + } + + /** + * Returns the underlying <code>InternalDistributedSystem</code> connection. + * @return the underlying <code>InternalDistributedSystem</code> + */ + public InternalDistributedSystem getSystem() { + return (InternalDistributedSystem)this.cache.getDistributedSystem(); + } + + public String getName() { + return "CacheServer"; + } + + public String getFullPath() { + return getName(); + } + + private final static AtomicInteger profileSN = new AtomicInteger(); + + private static int createSerialNumber() { + return profileSN.incrementAndGet(); + } + + /** + * Returns an array of all the groups of this bridge server. + * This includes those from the groups gemfire property + * and those explicitly added to this server. + */ + public String[] getCombinedGroups() { + ArrayList<String> groupList = new ArrayList<String>(); + for (String g: MemberAttributes.parseGroups(null, getSystem().getConfig().getGroups())) { + if (!groupList.contains(g)) { + groupList.add(g); + } + } + for (String g: getGroups()) { + if (!groupList.contains(g)) { + groupList.add(g); + } + } + String[] groups = new String[groupList.size()]; + return groupList.toArray(groups); + } + + public /*synchronized causes deadlock*/ void fillInProfile(Profile profile) { + assert profile instanceof CacheServerProfile; + CacheServerProfile bp = (CacheServerProfile)profile; + bp.setHost(getExternalAddress(false)); + bp.setPort(getPort()); + bp.setGroups(getCombinedGroups()); + bp.setMaxConnections(maxConnections); + bp.setInitialLoad(loadMonitor.getLastLoad()); + bp.setLoadPollInterval(getLoadPollInterval()); + bp.serialNumber = getSerialNumber(); + bp.finishInit(); + } + + public int getSerialNumber() { + return this.serialNumber; + } + + + protected CacheClientNotifier getCacheClientNotifier() { + return getAcceptor().getCacheClientNotifier(); + } + + /** + * Registers a new <code>InterestRegistrationListener</code> with the set of + * <code>InterestRegistrationListener</code>s. + * + * @param listener + * The <code>InterestRegistrationListener</code> to register + * @throws IllegalStateException if the BridgeServer has not been started + * @since 5.8Beta + */ + public void registerInterestRegistrationListener( + InterestRegistrationListener listener) { + if (!this.isRunning()) { + throw new IllegalStateException(LocalizedStrings.BridgeServerImpl_MUST_BE_RUNNING.toLocalizedString()); + } + getCacheClientNotifier().registerInterestRegistrationListener(listener); + } + + /** + * Unregisters an existing <code>InterestRegistrationListener</code> from + * the set of <code>InterestRegistrationListener</code>s. + * + * @param listener + * The <code>InterestRegistrationListener</code> to + * unregister + * + * @since 5.8Beta + */ + public void unregisterInterestRegistrationListener( + InterestRegistrationListener listener) { + getCacheClientNotifier().unregisterInterestRegistrationListener(listener); + } + + /** + * Returns a read-only set of <code>InterestRegistrationListener</code>s + * registered with this notifier. + * + * @return a read-only set of <code>InterestRegistrationListener</code>s + * registered with this notifier + * + * @since 5.8Beta + */ + public Set getInterestRegistrationListeners() { + return getCacheClientNotifier().getInterestRegistrationListeners(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java index 96db451..8d782a9 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/FindDurableQueueProcessor.java @@ -80,7 +80,7 @@ public class FindDurableQueueProcessor extends ReplyProcessor21 { if(l!=null) { Iterator i = l.iterator(); while(i.hasNext()) { - BridgeServerImpl bs = (BridgeServerImpl)i.next(); + CacheServerImpl bs = (CacheServerImpl)i.next(); if(bs.getAcceptor().getCacheClientNotifier().getClientProxy(proxyId)!=null) { ServerLocation loc = new ServerLocation(bs.getExternalAddress(),bs.getPort()); matches.add(loc); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java index c7ba070..4bf0f42 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GemFireCacheImpl.java @@ -1552,7 +1552,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer { Iterator allCacheServersItr = inst.allCacheServers.iterator(); while (allCacheServersItr.hasNext()) { - BridgeServerImpl bs = (BridgeServerImpl) allCacheServersItr.next(); + CacheServerImpl bs = (CacheServerImpl) allCacheServersItr.next(); AcceptorImpl ai = bs.getAcceptor(); if (ai != null) { ai.emergencyClose(); @@ -2619,7 +2619,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer boolean stoppedCacheServer = false; Iterator allCacheServersIterator = this.allCacheServers.iterator(); while (allCacheServersIterator.hasNext()) { - BridgeServerImpl bridge = (BridgeServerImpl) allCacheServersIterator.next(); + CacheServerImpl bridge = (CacheServerImpl) allCacheServersIterator.next(); if (isDebugEnabled) { logger.debug("stopping bridge {}", bridge); } @@ -3792,7 +3792,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer } stopper.checkCancelInProgress(null); - BridgeServerImpl bridge = new BridgeServerImpl(this, isGatewayReceiver); + CacheServerImpl bridge = new CacheServerImpl(this, isGatewayReceiver); allCacheServers.add(bridge); sendAddCacheServerProfileMessage(); @@ -3971,7 +3971,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer if (!allCacheServers.isEmpty()) { Iterator allCacheServersIterator = allCacheServers.iterator(); while (allCacheServersIterator.hasNext()) { - BridgeServerImpl cacheServer = (BridgeServerImpl) allCacheServersIterator.next(); + CacheServerImpl cacheServer = (CacheServerImpl) allCacheServersIterator.next(); // If CacheServer is a GatewayReceiver, don't return as part of CacheServers if (!cacheServer.isGatewayReceiver()) { if (cacheServersWithoutReceiver == null) { @@ -4118,7 +4118,7 @@ public class GemFireCacheImpl implements InternalCache, ClientCache, HasCachePer if (!result) { Iterator allCacheServersIterator = allCacheServers.iterator(); while (allCacheServersIterator.hasNext()) { - BridgeServerImpl server = (BridgeServerImpl) allCacheServersIterator.next(); + CacheServerImpl server = (CacheServerImpl) allCacheServersIterator.next(); if (!server.getNotifyBySubscription()) { result = true; break; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java index 9432b13..13d6068 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/GridAdvisor.java @@ -45,7 +45,7 @@ public abstract class GridAdvisor extends DistributionAdvisor { private static final Filter BRIDGE_SERVER_FILTER = new Filter() { public boolean include(Profile profile) { - return profile instanceof BridgeServerAdvisor.BridgeServerProfile; + return profile instanceof CacheServerAdvisor.CacheServerProfile; } }; @@ -329,7 +329,7 @@ public abstract class GridAdvisor extends DistributionAdvisor { if (cache != null && !cache.isClosed()) { List<?> bridgeServers = cache.getCacheServersAndGatewayReceiver(); for (int i = 0; i < bridgeServers.size(); i++) { - BridgeServerImpl bsi = (BridgeServerImpl)bridgeServers.get(i); + CacheServerImpl bsi = (CacheServerImpl)bridgeServers.get(i); if (bsi.isRunning()) { if(bsi.getProfile().equals(this)) { continue; http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java index 5da6fbd..629c5a4 100644 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/PartitionedRegion.java @@ -5896,7 +5896,7 @@ public class PartitionedRegion extends LocalRegion implements Collections.addAll(localServerGroups, MemberAttributes.parseGroups(null, c.getSystem().getConfig().getGroups())); for (Object object : servers) { - BridgeServerImpl server = (BridgeServerImpl)object; + CacheServerImpl server = (CacheServerImpl)object; if (server.isRunning() && (server.getExternalAddress() != null)) { Collections.addAll(localServerGroups, server.getGroups()); } http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java index 3a436c4..e094648 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/ha/HARegionQueue.java @@ -68,7 +68,7 @@ import com.gemstone.gemfire.distributed.internal.membership.InternalDistributedM import com.gemstone.gemfire.internal.Assert; import com.gemstone.gemfire.internal.DataSerializableFixedID; import com.gemstone.gemfire.internal.Version; -import com.gemstone.gemfire.internal.cache.BridgeServerImpl; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.Conflatable; import com.gemstone.gemfire.internal.cache.EventID; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; @@ -3013,9 +3013,9 @@ protected boolean checkEventForRemoval(Long counter, ThreadIdentifier threadid, && !queueRemovalMessageList.isEmpty()) { // messages exist QueueRemovalMessage qrm = new QueueRemovalMessage(); qrm.resetRecipients(); - List<BridgeServerImpl> servers = this.cache.getCacheServers(); + List<CacheServerImpl> servers = this.cache.getCacheServers(); List<DistributedMember> recipients = new LinkedList(); - for (BridgeServerImpl server: servers) { + for (CacheServerImpl server: servers) { recipients.addAll(server.getCacheServerAdvisor().adviseBridgeServers()); } qrm.setRecipients(recipients); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalBridgeMembership.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalBridgeMembership.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalBridgeMembership.java index ec96b89..d6515b9 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalBridgeMembership.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/InternalBridgeMembership.java @@ -33,7 +33,7 @@ import com.gemstone.gemfire.distributed.DistributedMember; import com.gemstone.gemfire.distributed.DistributedSystemDisconnectedException; import com.gemstone.gemfire.distributed.internal.InternalDistributedSystem; import com.gemstone.gemfire.distributed.internal.ServerLocation; -import com.gemstone.gemfire.internal.cache.BridgeServerImpl; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.cache.tier.sockets.AcceptorImpl; import com.gemstone.gemfire.internal.cache.tier.sockets.ClientHealthMonitor; @@ -284,7 +284,7 @@ public final class InternalBridgeMembership { // Note it is not necessary to synchronize on the list of bridge servers here, // since this is only a status (snapshot) of the system. for (Iterator bsii = CacheFactory.getAnyInstance().getCacheServers().iterator(); bsii.hasNext(); ) { - BridgeServerImpl bsi = (BridgeServerImpl) bsii.next(); + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); AcceptorImpl ai = bsi.getAcceptor(); if (ai != null && ai.getCacheClientNotifier() != null) { if (filterProxyIDs != null) { @@ -339,7 +339,7 @@ public final class InternalBridgeMembership { // Get all clients Map allClients = new HashMap(); for (Iterator bsii = CacheFactory.getAnyInstance().getCacheServers().iterator(); bsii.hasNext(); ) { - BridgeServerImpl bsi = (BridgeServerImpl) bsii.next(); + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); AcceptorImpl ai = bsi.getAcceptor(); if (ai != null && ai.getCacheClientNotifier() != null) { allClients.putAll(ai.getCacheClientNotifier().getAllClients()); @@ -360,7 +360,7 @@ public final class InternalBridgeMembership { return clientQueueSizes; for (Iterator bsii = c.getCacheServers().iterator(); bsii.hasNext(); ) { - BridgeServerImpl bsi = (BridgeServerImpl) bsii.next(); + CacheServerImpl bsi = (CacheServerImpl) bsii.next(); AcceptorImpl ai = bsi.getAcceptor(); if (ai != null && ai.getCacheClientNotifier() != null) { clientQueueSizes.putAll(ai.getCacheClientNotifier().getClientQueueSizes()); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java index 2cede25..073b7ef 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/CacheClientNotifier.java @@ -79,7 +79,7 @@ import com.gemstone.gemfire.internal.Version; import com.gemstone.gemfire.internal.cache.BridgeObserver; import com.gemstone.gemfire.internal.cache.BridgeObserverHolder; import com.gemstone.gemfire.internal.cache.BridgeRegionEventImpl; -import com.gemstone.gemfire.internal.cache.BridgeServerImpl; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.CacheClientStatus; import com.gemstone.gemfire.internal.cache.CacheDistributionAdvisor; import com.gemstone.gemfire.internal.cache.CachedDeserializable; @@ -2134,7 +2134,7 @@ public class CacheClientNotifier { && !HARegionQueue.HA_EVICTION_POLICY_NONE.equals(overflowAttributesList .get(0))) { haContainer = new HAContainerRegion(cache.getRegion(Region.SEPARATOR - + BridgeServerImpl.clientMessagesRegion((GemFireCacheImpl)cache, + + CacheServerImpl.clientMessagesRegion((GemFireCacheImpl)cache, (String)overflowAttributesList.get(0), ((Integer)overflowAttributesList.get(1)).intValue(), ((Integer)overflowAttributesList.get(2)).intValue(), http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientBlacklistProcessor.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientBlacklistProcessor.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientBlacklistProcessor.java index 01bcfac..73c3731 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientBlacklistProcessor.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/ClientBlacklistProcessor.java @@ -17,7 +17,7 @@ import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage; import com.gemstone.gemfire.distributed.internal.ReplyException; import com.gemstone.gemfire.distributed.internal.ReplyMessage; import com.gemstone.gemfire.distributed.internal.ReplyProcessor21; -import com.gemstone.gemfire.internal.cache.BridgeServerImpl; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import java.io.DataInput; @@ -96,7 +96,7 @@ public class ClientBlacklistProcessor extends ReplyProcessor21 { if (l != null) { Iterator i = l.iterator(); while (i.hasNext()) { - BridgeServerImpl bs = (BridgeServerImpl)i.next(); + CacheServerImpl bs = (CacheServerImpl)i.next(); CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); //add client to the black list. ccn.addToBlacklistedClient(this.proxyId); http://git-wip-us.apache.org/repos/asf/incubator-geode/blob/51bddd70/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/RemoveClientFromBlacklistMessage.java ---------------------------------------------------------------------- diff --git a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/RemoveClientFromBlacklistMessage.java b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/RemoveClientFromBlacklistMessage.java index 3e57a1b..6f0e033 100755 --- a/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/RemoveClientFromBlacklistMessage.java +++ b/gemfire-core/src/main/java/com/gemstone/gemfire/internal/cache/tier/sockets/RemoveClientFromBlacklistMessage.java @@ -21,7 +21,7 @@ import com.gemstone.gemfire.cache.Cache; import com.gemstone.gemfire.distributed.DistributedSystem; import com.gemstone.gemfire.distributed.internal.DistributionManager; import com.gemstone.gemfire.distributed.internal.PooledDistributionMessage; -import com.gemstone.gemfire.internal.cache.BridgeServerImpl; +import com.gemstone.gemfire.internal.cache.CacheServerImpl; import com.gemstone.gemfire.internal.cache.GemFireCacheImpl; import com.gemstone.gemfire.internal.logging.LogService; /** @@ -60,7 +60,7 @@ public class RemoveClientFromBlacklistMessage extends PooledDistributionMessage if (l != null) { Iterator i = l.iterator(); while (i.hasNext()) { - BridgeServerImpl bs = (BridgeServerImpl)i.next(); + CacheServerImpl bs = (CacheServerImpl)i.next(); CacheClientNotifier ccn = bs.getAcceptor().getCacheClientNotifier(); Set s = ccn.getBlacklistedClient(); if (s != null) {