This is an automated email from the ASF dual-hosted git repository.

jasonhuynh pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4a9e651  GEODE-2667:  Add API to destroy GatewayReceiver (#1410)
4a9e651 is described below

commit 4a9e6518f2a3811941a48a95d398557bd2bf2f97
Author: Jason Huynh <huyn...@gmail.com>
AuthorDate: Fri Feb 9 15:41:48 2018 -0800

    GEODE-2667:  Add API to destroy GatewayReceiver (#1410)
    
      * Added API to destroy a cache server
      * Added new RemoveCacheServerProfileMessage
      * The hasCacheServer flag in the profile is set to false if size of cache 
servers is 0
      * Destroying GatewayReceiver will remove the jmx bean and proxy bean
---
 .../apache/geode/cache/wan/GatewayReceiver.java    |   5 +
 .../geode/distributed/internal/ResourceEvent.java  |   1 +
 .../org/apache/geode/internal/DSFIDFactory.java    |   2 +
 .../geode/internal/DataSerializableFixedID.java    |   4 +-
 .../geode/internal/cache/GemFireCacheImpl.java     |  44 ++++++
 .../apache/geode/internal/cache/InternalCache.java |   4 +
 .../cache/RemoveCacheServerProfileMessage.java     | 162 +++++++++++++++++++++
 .../internal/cache/xmlcache/CacheCreation.java     |  15 ++
 .../cache/xmlcache/GatewayReceiverCreation.java    |   4 +
 .../geode/management/JMXNotificationType.java      |   9 ++
 .../geode/management/internal/MBeanJMXAdapter.java |   1 -
 .../management/internal/ManagementConstants.java   |   1 +
 .../internal/beans/GatewayReceiverMBeanBridge.java |   4 +
 .../internal/beans/ManagementAdapter.java          |  25 ++++
 .../internal/beans/ManagementListener.java         |   4 +
 .../geode/internal/cache/GemFireCacheImplTest.java |  23 +++
 .../codeAnalysis/sanctionedDataSerializables.txt   |   5 +
 .../internal/cache/wan/GatewayReceiverImpl.java    |  11 ++
 .../cache/wan/GatewayReceiverImplJUnitTest.java    |  56 +++++++
 .../cache/wan/GatewayReceiverMBeanDUnitTest.java   | 118 +++++++++++++++
 .../cache/wan/serial/GatewayReceiverDUnitTest.java | 144 ++++++++++++++++++
 21 files changed, 640 insertions(+), 2 deletions(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java 
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java
index 43d398e..987029e 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewayReceiver.java
@@ -91,6 +91,11 @@ public interface GatewayReceiver {
   void stop();
 
   /**
+   * Destroy this receiver. Stop should be called before calling destroy
+   */
+  void destroy();
+
+  /**
    * Returns whether or not this receiver is running
    */
   boolean isRunning();
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
index b3f7add..07bb949 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ResourceEvent.java
@@ -38,6 +38,7 @@ public enum ResourceEvent {
   SYSTEM_ALERT,
   CACHE_SERVER_START,
   CACHE_SERVER_STOP,
+  GATEWAYRECEIVER_DESTROY,
   GATEWAYRECEIVER_START,
   GATEWAYRECEIVER_STOP,
   GATEWAYRECEIVER_CREATE,
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java 
b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
index 0237b53..e3379e5 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/DSFIDFactory.java
@@ -241,6 +241,7 @@ import 
org.apache.geode.internal.cache.PartitionRegionConfig;
 import org.apache.geode.internal.cache.PreferBytesCachedDeserializable;
 import org.apache.geode.internal.cache.RegionEventImpl;
 import org.apache.geode.internal.cache.ReleaseClearLockMessage;
+import org.apache.geode.internal.cache.RemoveCacheServerProfileMessage;
 import org.apache.geode.internal.cache.RoleEventImpl;
 import org.apache.geode.internal.cache.SearchLoadAndWriteProcessor;
 import org.apache.geode.internal.cache.ServerPingMessage;
@@ -576,6 +577,7 @@ public class DSFIDFactory implements 
DataSerializableFixedID {
     registerDSFID(REMOTE_PUTALL_MESSAGE, RemotePutAllMessage.class);
     registerDSFID(VERSION_TAG, VMVersionTag.class);
     registerDSFID(ADD_CACHESERVER_PROFILE_UPDATE, 
AddCacheServerProfileMessage.class);
+    registerDSFID(REMOVE_CACHESERVER_PROFILE_UPDATE, 
RemoveCacheServerProfileMessage.class);
     registerDSFID(SERVER_INTEREST_REGISTRATION_MESSAGE, 
ServerInterestRegistrationMessage.class);
     registerDSFID(FILTER_PROFILE_UPDATE, FilterProfile.OperationMessage.class);
     registerDSFID(PR_GET_MESSAGE, GetMessage.class);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
index f55e786..fff0f1d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/DataSerializableFixedID.java
@@ -239,7 +239,9 @@ public interface DataSerializableFixedID extends 
SerializationVersions {
 
   byte LATEST_LAST_ACCESS_TIME_MESSAGE = -20;
 
-  // IDs -19 .. -16 are not used
+  public static final byte REMOVE_CACHESERVER_PROFILE_UPDATE = -19;
+
+  // IDs -18 .. -16 are not used
 
   /**
    * A header byte meaning that the next element in the stream is a 
<code>VMIdProfile</code>.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
index f87b4e9..f40e900 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/GemFireCacheImpl.java
@@ -3812,6 +3812,12 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
     return cacheServer;
   }
 
+  public boolean removeCacheServer(CacheServer cacheServer) {
+    boolean removed = this.allCacheServers.remove(cacheServer);
+    sendRemoveCacheServerProfileMessage();
+    return removed;
+  }
+
   @Override
   public void addGatewaySender(GatewaySender sender) {
     if (isClient()) {
@@ -3901,6 +3907,21 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
     }
   }
 
+  public void removeGatewayReceiver(GatewayReceiver receiver) {
+    if (isClient()) {
+      throw new UnsupportedOperationException("operation is not supported on a 
client cache");
+    }
+    this.stopper.checkCancelInProgress(null);
+    synchronized (this.allGatewayReceiversLock) {
+      Set<GatewayReceiver> newReceivers = new 
HashSet<>(this.allGatewayReceivers.size() + 1);
+      if (!this.allGatewayReceivers.isEmpty()) {
+        newReceivers.addAll(this.allGatewayReceivers);
+      }
+      newReceivers.remove(receiver);
+      this.allGatewayReceivers = Collections.unmodifiableSet(newReceivers);
+    }
+  }
+
   @Override
   public void addAsyncEventQueue(AsyncEventQueueImpl asyncQueue) {
     this.allAsyncEventQueues.add(asyncQueue);
@@ -4568,6 +4589,29 @@ public class GemFireCacheImpl implements InternalCache, 
InternalClientCache, Has
     }
   }
 
+
+  private void sendRemoveCacheServerProfileMessage() {
+    Set otherMembers = this.dm.getOtherDistributionManagerIds();
+    RemoveCacheServerProfileMessage message = new 
RemoveCacheServerProfileMessage();
+    message.operateOnLocalCache(this);
+    if (!otherMembers.isEmpty()) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Sending add cache server profile message to other 
members.");
+      }
+      ReplyProcessor21 replyProcessor = new ReplyProcessor21(this.dm, 
otherMembers);
+      message.setRecipients(otherMembers);
+      message.processorId = replyProcessor.getProcessorId();
+      this.dm.putOutgoing(message);
+
+      // Wait for replies.
+      try {
+        replyProcessor.waitForReplies();
+      } catch (InterruptedException ignore) {
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
   @Override
   public TXManagerImpl getTxManager() {
     return this.transactionManager;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
index d609c44..0c81809 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/InternalCache.java
@@ -307,8 +307,12 @@ public interface InternalCache extends Cache, 
Extensible<Cache>, CacheTime {
 
   void addGatewayReceiver(GatewayReceiver receiver);
 
+  void removeGatewayReceiver(GatewayReceiver receiver);
+
   CacheServer addCacheServer(boolean isGatewayReceiver);
 
+  boolean removeCacheServer(CacheServer cacheServer);
+
   /**
    * A test-hook allowing you to alter the cache setting established by
    * CacheFactory.setPdxReadSerialized()
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java
new file mode 100644
index 0000000..bd3b904
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/RemoveCacheServerProfileMessage.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.CancelException;
+import org.apache.geode.distributed.internal.ClusterDistributionManager;
+import org.apache.geode.distributed.internal.MessageWithReply;
+import org.apache.geode.distributed.internal.ReplyMessage;
+import org.apache.geode.distributed.internal.SerialDistributionMessage;
+import org.apache.geode.internal.logging.LogService;
+
+/**
+ * OperationMessage synchronously propagates a change in the profile to 
another member. It is a
+ * serial message so that there is no chance of out-of-order execution.
+ */
+public class RemoveCacheServerProfileMessage extends SerialDistributionMessage
+    implements MessageWithReply {
+
+  private static final Logger logger = LogService.getLogger();
+
+  int processorId;
+
+  @Override
+  protected void process(ClusterDistributionManager dm) {
+    int oldLevel = 
LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
+    try {
+      InternalCache cache = dm.getCache();
+      // will be null if not initialized
+      if (cache != null && !cache.isClosed()) {
+        operateOnCache(cache);
+      }
+    } finally {
+      LocalRegion.setThreadInitLevelRequirement(oldLevel);
+      ReplyMessage reply = new ReplyMessage();
+      reply.setProcessorId(this.processorId);
+      reply.setRecipient(getSender());
+      try {
+        dm.putOutgoing(reply);
+      } catch (CancelException ignore) {
+        // can't send a reply, so ignore the exception
+      }
+    }
+  }
+
+  private void operateOnCache(InternalCache cache) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    if (cache.getCacheServers().size() == 0) {
+
+      for (DistributedRegion r : getDistributedRegions(cache)) {
+        CacheDistributionAdvisor cda = (CacheDistributionAdvisor) 
r.getDistributionAdvisor();
+        CacheDistributionAdvisor.CacheProfile cp =
+            (CacheDistributionAdvisor.CacheProfile) 
cda.getProfile(getSender());
+        if (cp != null) {
+          if (isDebugEnabled) {
+            logger.debug("Setting hasCacheServer flag to false on region 
\"{}\" for {}",
+                r.getFullPath(), getSender());
+          }
+          cp.hasCacheServer = false;
+        }
+      }
+      for (PartitionedRegion r : this.getPartitionedRegions(cache)) {
+        CacheDistributionAdvisor cda = (CacheDistributionAdvisor) 
r.getDistributionAdvisor();
+        CacheDistributionAdvisor.CacheProfile cp =
+            (CacheDistributionAdvisor.CacheProfile) 
cda.getProfile(getSender());
+        if (cp != null) {
+          if (isDebugEnabled) {
+            logger.debug("Setting hasCacheServer flag to false on region 
\"{}\" for {}",
+                r.getFullPath(), getSender());
+          }
+          cp.hasCacheServer = false;
+        }
+      }
+    }
+  }
+
+  /** set the hasCacheServer flags for all regions in this cache */
+  public void operateOnLocalCache(InternalCache cache) {
+    int oldLevel = 
LocalRegion.setThreadInitLevelRequirement(LocalRegion.BEFORE_INITIAL_IMAGE);
+    if (cache.getCacheServers().size() == 0) {
+      try {
+        for (InternalRegion r : getAllRegions(cache)) {
+          FilterProfile fp = r.getFilterProfile();
+          if (fp != null) {
+            fp.getLocalProfile().hasCacheServer = false;
+          }
+        }
+        for (PartitionedRegion r : getPartitionedRegions(cache)) {
+          FilterProfile fp = r.getFilterProfile();
+          if (fp != null) {
+            fp.getLocalProfile().hasCacheServer = false;
+          }
+        }
+      } finally {
+        LocalRegion.setThreadInitLevelRequirement(oldLevel);
+      }
+    }
+  }
+
+
+  private Set<InternalRegion> getAllRegions(InternalCache internalCache) {
+    return internalCache.getAllRegions();
+  }
+
+  private Set<DistributedRegion> getDistributedRegions(InternalCache 
internalCache) {
+    Set<DistributedRegion> result = new HashSet<>();
+    for (InternalRegion r : internalCache.getAllRegions()) {
+      if (r instanceof DistributedRegion) {
+        result.add((DistributedRegion) r);
+      }
+    }
+    return result;
+  }
+
+  private Set<PartitionedRegion> getPartitionedRegions(InternalCache 
internalCache) {
+    return (Set<PartitionedRegion>) new 
HashSet(internalCache.getPartitionedRegions());
+  }
+
+  /** for deserialization only */
+  public RemoveCacheServerProfileMessage() {}
+
+  @Override
+  public int getDSFID() {
+    return REMOVE_CACHESERVER_PROFILE_UPDATE;
+  }
+
+  @Override
+  public void toData(DataOutput out) throws IOException {
+    super.toData(out);
+    out.writeInt(this.processorId);
+  }
+
+  @Override
+  public void fromData(DataInput in) throws IOException, 
ClassNotFoundException {
+    super.fromData(in);
+    this.processorId = in.readInt();
+  }
+
+  @Override
+  public String toString() {
+    return this.getShortClassName() + "(processorId=" + this.processorId + ")";
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
index a192e04..7727add 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheCreation.java
@@ -643,6 +643,12 @@ public class CacheCreation implements InternalCache {
     }
   }
 
+  void removeCacheServers(List<CacheServer> declarativeCacheServers, Cache 
cache,
+      Integer serverPort, String serverBindAdd, Boolean disableDefaultServer) {
+
+    throw new 
UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
+
   /**
    * Returns a description of the disk store used by the pdx registry.
    */
@@ -1030,6 +1036,11 @@ public class CacheCreation implements InternalCache {
   }
 
   @Override
+  public boolean removeCacheServer(final CacheServer cacheServer) {
+    throw new 
UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
+
+  @Override
   public void setReadSerializedForCurrentThread(final boolean value) {
     throw new 
UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
   }
@@ -1219,6 +1230,10 @@ public class CacheCreation implements InternalCache {
     this.gatewayReceivers.add(receiver);
   }
 
+  public void removeGatewayReceiver(GatewayReceiver receiver) {
+    throw new 
UnsupportedOperationException(LocalizedStrings.SHOULDNT_INVOKE.toLocalizedString());
+  }
+
   public void addAsyncEventQueue(AsyncEventQueue asyncEventQueue) {
     this.asyncEventQueues.add(asyncEventQueue);
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java
index 441290b..d60dcb8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/GatewayReceiverCreation.java
@@ -137,6 +137,10 @@ public class GatewayReceiverCreation implements 
GatewayReceiver {
 
   }
 
+  public void destroy() {
+
+  }
+
   public boolean isRunning() {
     return false;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java 
b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
index 351a3a6..5651903 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/JMXNotificationType.java
@@ -218,6 +218,15 @@ public interface JMXNotificationType {
       DistributionConfig.GEMFIRE_PREFIX + 
"distributedsystem.gateway.receiver.stopped";
 
   /**
+   * Notification type which indicates that a gateway receiver is destroyed 
<BR>
+   * The value of this type string is
+   * <CODE>gemfire.distributedsystem.gateway.receiver.destroyed</CODE>.
+   */
+  public static final String GATEWAY_RECEIVER_DESTROYED =
+      DistributionConfig.GEMFIRE_PREFIX + 
"distributedsystem.gateway.receiver.destroyed";
+
+
+  /**
    * Notification type which indicates that locator is started <BR>
    * The value of this type string is 
<CODE>gemfire.distributedsystem.locator.started</CODE>.
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
index f3c0fe3..dde1387 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/MBeanJMXAdapter.java
@@ -176,7 +176,6 @@ public class MBeanJMXAdapter implements ManagementConstants 
{
   public void unregisterMBean(ObjectName objectName) {
 
     try {
-
       if (!isRegistered(objectName)) {
         return;
       }
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
index 8f8ad7c..004ba21 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/ManagementConstants.java
@@ -183,6 +183,7 @@ public interface ManagementConstants {
   String GATEWAY_SENDER_REMOVED_PREFIX = "GatewaySender Removed in the VM ";
 
   String GATEWAY_RECEIVER_CREATED_PREFIX = "GatewayReceiver Created in the VM 
";
+  String GATEWAY_RECEIVER_DESTROYED_PREFIX = "GatewayReceiver Destroyed in the 
VM ";
   String GATEWAY_RECEIVER_STARTED_PREFIX = "GatewayReceiver Started in the VM 
";
   String GATEWAY_RECEIVER_STOPPED_PREFIX = "GatewayReceiver Stopped in the VM 
";
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
index 3dc9f80..a2ffe01 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/GatewayReceiverMBeanBridge.java
@@ -52,6 +52,10 @@ public class GatewayReceiverMBeanBridge extends ServerBridge 
{
     initializeReceiverStats();
   }
 
+  protected void destroyServer() {
+    removeServer();
+  }
+
   protected void startServer() {
     CacheServer server = rcv.getServer();
     addServer(server);
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
index 171256e..8722f6f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementAdapter.java
@@ -479,6 +479,31 @@ public class ManagementAdapter {
   }
 
   /**
+   * Handles Gateway receiver destroy
+   *
+   * @param recv specific gateway receiver
+   * @throws ManagementException
+   */
+  public void handleGatewayReceiverDestroy(GatewayReceiver recv) throws 
ManagementException {
+    if (!isServiceInitialised("handleGatewayReceiverDestroy")) {
+      return;
+    }
+
+    GatewayReceiverMBean mbean = (GatewayReceiverMBean) 
service.getLocalGatewayReceiverMXBean();
+    GatewayReceiverMBeanBridge bridge = mbean.getBridge();
+
+    bridge.destroyServer();
+    ObjectName objectName = (MBeanJMXAdapter
+        
.getGatewayReceiverMBeanName(internalCache.getDistributedSystem().getDistributedMember()));
+
+    service.unregisterMBean(objectName);
+    Notification notification = new 
Notification(JMXNotificationType.GATEWAY_RECEIVER_DESTROYED,
+        memberSource, SequenceNumber.next(), System.currentTimeMillis(),
+        ManagementConstants.GATEWAY_RECEIVER_DESTROYED_PREFIX);
+    memberLevelNotifEmitter.sendNotification(notification);
+  }
+
+  /**
    * Handles Gateway receiver creation
    *
    * @param recv specific gateway receiver
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
index aca3410..5c2bcd7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/beans/ManagementListener.java
@@ -129,6 +129,10 @@ public class ManagementListener implements 
ResourceEventsListener {
         GatewayReceiver createdRecv = (GatewayReceiver) resource;
         adapter.handleGatewayReceiverCreate(createdRecv);
         break;
+      case GATEWAYRECEIVER_DESTROY:
+        GatewayReceiver destroyedRecv = (GatewayReceiver) resource;
+        adapter.handleGatewayReceiverDestroy(destroyedRecv);
+        break;
       case GATEWAYRECEIVER_START:
         GatewayReceiver startedRecv = (GatewayReceiver) resource;
         adapter.handleGatewayReceiverStart(startedRecv);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
index cd33138..8161926 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/GemFireCacheImplTest.java
@@ -30,6 +30,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 import org.apache.geode.cache.CacheClosedException;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.cache.wan.GatewayReceiver;
 import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.internal.SystemTimer;
 import org.apache.geode.internal.cache.control.InternalResourceManager;
@@ -216,6 +218,27 @@ public class GemFireCacheImplTest {
   }
 
   @Test
+  public void removeGatewayReceiverShouldRemoveFromReceiversList() {
+    GatewayReceiver receiver = mock(GatewayReceiver.class);
+    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
+    cache.addGatewayReceiver(receiver);
+    assertEquals(1, cache.getGatewayReceivers().size());
+    cache.removeGatewayReceiver(receiver);
+    assertEquals(0, cache.getGatewayReceivers().size());
+  }
+
+
+  @Test
+  public void removeFromCacheServerShouldRemoveFromCacheServersList() {
+    cache = GemFireCacheImpl.create(distributedSystem, cacheConfig);
+    CacheServer cacheServer = cache.addCacheServer(false);
+    assertEquals(1, cache.getCacheServers().size());
+    cache.removeCacheServer(cacheServer);
+    assertEquals(0, cache.getCacheServers().size());
+  }
+
+
+  @Test
   public void testIsMisConfigured() {
     Properties clusterProps = new Properties();
     Properties serverProps = new Properties();
diff --git 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index dadb049..3623071 100644
--- 
a/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/test/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -2136,3 +2136,8 @@ org/apache/geode/redis/internal/DoubleWrapper,2
 fromData,9,2a2bb80004b50002b1
 toData,9,2ab400022bb80003b1
 
+org/apache/geode/internal/cache/RemoveCacheServerProfileMessage,2
+fromData,16,2a2bb700322a2bb900330100b50008b1
+toData,16,2a2bb700302b2ab40008b900310200b1
+
+
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
index 190f6d1..caa4453 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewayReceiverImpl.java
@@ -215,6 +215,17 @@ public class GatewayReceiverImpl implements 
GatewayReceiver {
     receiver.stop();
   }
 
+  public void destroy() {
+    if (receiver.isRunning()) {
+      throw new GatewayReceiverException(
+          "Gateway Receiver is running and needs to be stopped first");
+    }
+    this.cache.removeGatewayReceiver(this);
+    this.cache.removeCacheServer(receiver);
+    InternalDistributedSystem system = 
this.cache.getInternalDistributedSystem();
+    system.handleResourceEvent(ResourceEvent.GATEWAYRECEIVER_DESTROY, this);
+  }
+
   public String getBindAddress() {
     return this.bindAdd;
   }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
index 472ba27..32edb9b 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverImplJUnitTest.java
@@ -17,6 +17,8 @@ package org.apache.geode.internal.cache.wan;
 import static org.junit.Assert.*;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
@@ -56,4 +58,58 @@ public class GatewayReceiverImplJUnitTest {
     assertEquals("hello", gateway.getHost());
   }
 
+  @Test
+  public void destroyCalledOnRunningGatewayReceiverShouldThrowException() 
throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+    when(cache.getInternalDistributedSystem()).thenReturn(system);
+    when(server.getExternalAddress()).thenReturn("hello");
+    when(server.isRunning()).thenReturn(true);
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, 
true);
+    gateway.start();
+    try {
+      gateway.destroy();
+      fail();
+    } catch (GatewayReceiverException e) {
+      assertEquals("Gateway Receiver is running and needs to be stopped 
first", e.getMessage());
+    }
+  }
+
+  @Test
+  public void 
destroyCalledOnStoppedGatewayReceiverShouldRemoveRecieverFromCacheServers()
+      throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+    when(cache.getInternalDistributedSystem()).thenReturn(system);
+    when(server.getExternalAddress()).thenReturn("hello");
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, 
true);
+    gateway.start();
+    // sender is mocked already to say running is false
+    gateway.destroy();
+    verify(cache, times(1)).removeCacheServer(server);
+  }
+
+  @Test
+  public void 
destroyCalledOnStoppedGatewayReceiverShouldRemoveRecieverFromReceivers()
+      throws IOException {
+    InternalCache cache = mock(InternalCache.class);
+    CacheServerImpl server = mock(CacheServerImpl.class);
+    InternalDistributedSystem system = mock(InternalDistributedSystem.class);
+    when(cache.getInternalDistributedSystem()).thenReturn(system);
+    when(server.getExternalAddress()).thenReturn("hello");
+    when(cache.addCacheServer(eq(true))).thenReturn(server);
+    GatewayReceiverImpl gateway =
+        new GatewayReceiverImpl(cache, 2000, 2001, 5, 100, null, null, null, 
true);
+    gateway.start();
+    // sender is mocked already to say running is false
+    gateway.destroy();
+    verify(cache, times(1)).removeGatewayReceiver(gateway);
+  }
+
 }
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
new file mode 100644
index 0000000..4eb567a
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewayReceiverMBeanDUnitTest.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan;
+
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import javax.management.ObjectName;
+
+import org.awaitility.Awaitility;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.management.GatewayReceiverMXBean;
+import org.apache.geode.management.ManagementTestBase;
+import org.apache.geode.management.internal.MBeanJMXAdapter;
+import org.apache.geode.management.internal.SystemManagementService;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class GatewayReceiverMBeanDUnitTest extends ManagementTestBase {
+
+  @Test
+  public void testMBeanAndProxiesForGatewayReceiverAreCreated() throws 
Exception {
+    initManagement(true);
+
+
+    // Verify MBean is created in each managed node
+    for (VM vm : getManagedNodeList()) {
+      vm.invoke(() -> {
+        GatewayReceiver receiver = 
getCache().createGatewayReceiverFactory().create();
+      });
+      vm.invoke(() -> verifyMBean());
+    }
+
+    // Verify MBean proxies are created in the managing node
+    getManagingNode().invoke(() -> verifyMBeanProxies(getCache()));
+  }
+
+  @Test
+  public void testMBeanAndProxiesForGatewayReceiverAreRemovedOnDestroy() 
throws Exception {
+    initManagement(true);
+
+    // Verify MBean is created in each managed node
+    for (VM vm : getManagedNodeList()) {
+      vm.invoke(() -> {
+        GatewayReceiver receiver = 
getCache().createGatewayReceiverFactory().create();
+        receiver.start();
+        receiver.stop();
+        receiver.destroy();
+
+      });
+      vm.invoke(() -> verifyMBeanDoesNotExist());
+    }
+
+    // Verify MBean proxies are created in the managing node
+    getManagingNode().invoke(() -> verifyMBeanProxiesDoesNotExist(getCache()));
+  }
+
+  private void verifyMBean() {
+    assertNotNull(getMBean());
+  }
+
+  private void verifyMBeanDoesNotExist() {
+    assertNull(getMBean());
+  }
+
+  private GatewayReceiverMXBean getMBean() {
+    ObjectName objectName =
+        
MBeanJMXAdapter.getGatewayReceiverMBeanName(getSystem().getDistributedMember());
+    return getManagementService().getMBeanInstance(objectName, 
GatewayReceiverMXBean.class);
+  }
+
+  private static void verifyMBeanProxies(final InternalCache cache) {
+    Set<DistributedMember> members =
+        cache.getDistributionManager().getOtherNormalDistributionManagerIds();
+    for (DistributedMember member : members) {
+      Awaitility.await().atMost(60, TimeUnit.SECONDS)
+          .until(() -> assertNotNull(getMBeanProxy(member)));
+    }
+  }
+
+  private static void verifyMBeanProxiesDoesNotExist(final InternalCache 
cache) {
+    Set<DistributedMember> members =
+        cache.getDistributionManager().getOtherNormalDistributionManagerIds();
+    for (DistributedMember member : members) {
+      assertNull(getMBeanProxy(member));
+    }
+  }
+
+  private static GatewayReceiverMXBean getMBeanProxy(DistributedMember member) 
{
+    SystemManagementService service = (SystemManagementService) 
getManagementService();
+    ObjectName objectName = 
MBeanJMXAdapter.getGatewayReceiverMBeanName(member);
+    return service.getMBeanProxy(objectName, GatewayReceiverMXBean.class);
+  }
+
+
+}
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java
new file mode 100644
index 0000000..006a2df
--- /dev/null
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/serial/GatewayReceiverDUnitTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.serial;
+
+import static junit.framework.TestCase.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewayReceiverFactory;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionAdvisor;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.internal.cache.CacheDistributionAdvisor;
+import org.apache.geode.internal.cache.DistributedRegion;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.WANTestBase;
+import org.apache.geode.test.dunit.Assert;
+import org.apache.geode.test.dunit.SerializableCallable;
+import org.apache.geode.test.dunit.SerializableCallableIF;
+import org.apache.geode.test.dunit.SerializableRunnableIF;
+import org.apache.geode.test.junit.categories.DistributedTest;
+
+@Category(DistributedTest.class)
+public class GatewayReceiverDUnitTest extends WANTestBase {
+
+  private static GatewayReceiver receiver;
+
+  @Test
+  public void 
removingGatewayReceiverUsingReplicatedRegionShouldRemoveCacheServerFlagFromProfile()
+      throws Exception {
+    testPrimarySecondaryQueueDrainInOrder_RR(
+        () -> WANTestBase.createReplicatedRegion(getTestMethodName(), null, 
isOffHeap()),
+        () -> ((DistributedRegion) 
WANTestBase.cache.getRegion(getTestMethodName()))
+            .getDistributionAdvisor());
+  }
+
+  @Test
+  public void 
removingGatewayReceiverUsingPartitionedRegionShouldRemoveCacheServerFlagFromProfile()
+      throws Exception {
+    testPrimarySecondaryQueueDrainInOrder_RR(
+        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 10, isOffHeap()),
+        () -> ((PartitionedRegion) 
WANTestBase.cache.getRegion(getTestMethodName()))
+            .getDistributionAdvisor());
+  }
+
+  public <T> void testPrimarySecondaryQueueDrainInOrder_RR(
+      SerializableRunnableIF createRegionLambda,
+      SerializableCallableIF<DistributionAdvisor> extractAdvisorLambda) throws 
Exception {
+    InternalDistributedMember[] memberIds = new InternalDistributedMember[8];
+
+    Integer lnPort = (Integer) vm0.invoke(() -> 
WANTestBase.createFirstLocatorWithDSId(1));
+    Integer nyPort = (Integer) vm1.invoke(() -> 
WANTestBase.createFirstRemoteLocator(2, lnPort));
+
+    vm2.invoke(() -> WANTestBase.createCache(nyPort));
+    vm3.invoke(() -> WANTestBase.createCache(nyPort));
+
+    memberIds[2] = (InternalDistributedMember) vm2
+        .invoke(() -> 
WANTestBase.cache.getDistributedSystem().getDistributedMember());
+
+    memberIds[3] = (InternalDistributedMember) vm3
+        .invoke(() -> 
WANTestBase.cache.getDistributedSystem().getDistributedMember());
+
+    vm2.invoke(createRegionLambda);
+    vm3.invoke(createRegionLambda);
+
+    vm2.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 100));
+
+    vm2.invoke(() -> {
+      GatewayReceiverDUnitTest.receiver = 
GatewayReceiverDUnitTest.createAndReturnReceiver();
+      return;
+    });
+    vm3.invoke(() -> {
+      GatewayReceiverDUnitTest.receiver = 
GatewayReceiverDUnitTest.createAndReturnReceiver();
+      return;
+    });
+
+    vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], true, 
extractAdvisorLambda));
+    vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], true, 
extractAdvisorLambda));
+
+    vm2.invoke(() -> {
+      GatewayReceiverDUnitTest.receiver.stop();
+      GatewayReceiverDUnitTest.receiver.destroy();
+    });
+
+    vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], true, 
extractAdvisorLambda));
+    // vm3 should still see that vm2's profile still has cache server set to 
true
+    vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], false, 
extractAdvisorLambda));
+
+    vm3.invoke(() -> {
+      GatewayReceiverDUnitTest.receiver.stop();
+      GatewayReceiverDUnitTest.receiver.destroy();
+    });
+
+    vm2.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[3], false, 
extractAdvisorLambda));
+    vm3.invoke(() -> assertProfileCacheServerFlagEquals(memberIds[2], false, 
extractAdvisorLambda));
+  }
+
+
+  private void assertProfileCacheServerFlagEquals(InternalDistributedMember 
member,
+      boolean expectedFlag, SerializableCallableIF<DistributionAdvisor> 
extractAdvisor)
+      throws Exception {
+    DistributionAdvisor advisor = extractAdvisor.call();
+    CacheDistributionAdvisor.CacheProfile cp =
+        (CacheDistributionAdvisor.CacheProfile) advisor.getProfile(member);
+    assertEquals(expectedFlag, cp.hasCacheServer);
+  }
+
+  public static GatewayReceiver createAndReturnReceiver() {
+    GatewayReceiverFactory fact = cache.createGatewayReceiverFactory();
+    int port = AvailablePortHelper.getRandomAvailablePortForDUnitSite();
+    fact.setStartPort(port);
+    fact.setEndPort(port);
+    fact.setManualStart(true);
+    GatewayReceiver receiver = fact.create();
+    try {
+      receiver.start();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(
+          "Test " + getTestMethodName() + " failed to start GatewayReceiver on 
port " + port, e);
+    }
+    return receiver;
+  }
+
+}

-- 
To stop receiving notification emails like this one, please contact
jasonhu...@apache.org.

Reply via email to