albertogpz commented on a change in pull request #6036:
URL: https://github.com/apache/geode/pull/6036#discussion_r682552257



##########
File path: 
geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
##########
@@ -258,6 +271,8 @@
    */
   boolean isRunning();
 
+
+

Review comment:
       Remove these extra lines.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ConcurrentParallelGatewaySenderEventProcessor.java
##########
@@ -212,6 +217,7 @@ public void run() {
         }
       }
     }
+

Review comment:
       Remove this line, please.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderEventProcessor.java
##########
@@ -78,13 +88,18 @@ protected void initializeMessageQueue(String id, boolean 
cleanQueues) {
     }
 
     ParallelGatewaySenderQueue queue =
-        new ParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, 
cleanQueues);
+        new ParallelGatewaySenderQueue(sender, targetRs, index, nDispatcher, 
cleanQueues,
+            shouldOnlyRecoverQueues);
 
-    queue.start();
+    if (!shouldOnlyRecoverQueues) {
+      queue.start();
+    }
     this.queue = queue;
 
-    if (queue.localSize() > 0) {
-      queue.notifyEventProcessorIfRequired();
+    if (!shouldOnlyRecoverQueues) {
+      if (((ParallelGatewaySenderQueue) queue).localSize() > 0) {

Review comment:
       The two casts to `ParallelGatewaySenderQueue` are redundant.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
##########
@@ -1230,6 +1239,22 @@ public void enqueueTempEvents() {
     }
   }
 
+  /**
+   * During sender is recovered in stopped state, if there are any cache 
operations while
+   * queue and event processor is being created then these events should be 
stored in
+   * tmpDroppedEvents temporary queue. Once event processor is created then 
queue will be
+   * drained and ParallelQueueRemovalMessage will be sent.
+   */
+  public void enqueueTempDroppedEvents() {

Review comment:
       This method could be better called. `processTempDroppedEvents()`

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/ColocationHelper.java
##########
@@ -181,7 +181,6 @@ public static boolean 
checkMembersColocation(PartitionedRegion partitionedRegion
         return false;
       }
     }
-

Review comment:
       Why was this line removed?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -158,10 +158,18 @@ public void 
testPartitionedRegionWithPersistentGatewaySender() {
 
     LogWriterUtils.getLogWriter().info("Started the senders");
 
-    vm2.invoke(
-        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
-    vm3.invoke(
-        () -> WANTestBase.createPartitionedRegion(getTestMethodName(), null, 
1, 100, isOffHeap()));
+    AsyncInvocation inv1 = vm2.invokeAsync(() -> WANTestBase
+        .createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, 
isOffHeap()));
+    AsyncInvocation inv2 = vm3.invokeAsync(() -> WANTestBase
+        .createPersistentPartitionedRegion(getTestMethodName(), null, 1, 100, 
isOffHeap()));
+    try {
+      inv1.join();
+      inv2.join();
+
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+      fail();
+    }

Review comment:
       What is the reason for changing this to be done in parallel?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPersistenceEnabledGatewaySenderDUnitTest.java
##########
@@ -1892,9 +1908,10 @@ public void 
testpersistentWanGateway_restartSender_expectAllEventsReceived_scena
     LogWriterUtils.getLogWriter().info("All the senders are now running...");
 
     // 
----------------------------------------------------------------------------------------------------
-
-    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
3000));
-    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
3000));
+    vm7.invoke(() -> WANTestBase.doPuts(getTestMethodName(), 500));
+    vm4.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 
3000));
+    vm2.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 500));
+    vm3.invoke(() -> WANTestBase.validateRegionSize(getTestMethodName(), 500));

Review comment:
       I do not understand why this case should behave differently now. I would 
expect that the legacy behavior is kept.
   Why is it?

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/parallel/ParallelWANPropagationLoopBackDUnitTest.java
##########
@@ -365,9 +365,9 @@ public void 
testParallelPropagationLoopBack3SitesNtoNTopologyPutFromOneDS() {
    * Site-NY: dsid=1: senderId="ln": vm3, vm6
    * NY site's sender's manual-start=true
    *
-   * Make sure the events are sent from LN to NY and will not be added into 
tmpDroppedEvents
-   * while normal events put from NY site can still be added to 
tmpDroppedEvents
-   * Start the sender, make sure the events in tmpDroppedEvents are sent to LN 
finally
+   * Verify that events aren't added to tmpDroppedEvents after gateway-sender 
is recovered in
+   * stopped state with manual-start=true, and that none of the events are 
sent to remote site
+   * after sender is started.

Review comment:
       I do not see in this case that the sender is recovered in stopped state. 
The only thing I see is that it is started.
   This test case might need some rethinking.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/cache/wan/internal/parallel/ParallelGatewaySenderImpl.java
##########
@@ -56,6 +62,44 @@ public void start() {
     this.start(false);
   }
 
+  @Override
+  public void recoverInStoppedState() {
+    this.getLifeCycleLock().writeLock().lock();
+    try {
+      if (eventProcessor != null) {
+        // Already recovered in stopped state
+        return;
+      }
+
+      Set<Region> targetRs = new HashSet<Region>();

Review comment:
       The `Region` type in the `HashSet` is not needed.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest.java
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.wancommand;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import org.apache.geode.internal.membership.utils.AvailablePort;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class 
ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest
+    implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locatorSite1;
+  private MemberVM locatorSite2;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite1;
+  private ClientVM clientSite2;
+
+  /**
+   * Verify that gateway-sender state is persisted after pause and resume 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart all servers that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be lost from queue after servers are restarted
+   * 5. Resume gateway-sender
+   * 6. Verify that latest traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+  }
+
+  /**
+   * Verify that gateway-sender queue is persisted while in paused state and 
it is recovered after
+   * the restart of servers.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Restart all servers that host gateway-sender
+   * 3. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 4. Restart all servers that host gateway-sender
+   * 5. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be recovered after servers restarted
+   * 6. Resume gateway-sender
+   * 5. Verify that complete traffic is sent over the gateway-sender to the 
remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartAllServersPersistent(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue data 
should not be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size() + 
keysQueue.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue));
+  }
+
+  /**
+   * Verify that gateway-sender is recovered from redundant server after the
+   * restart of member.
+   *
+   * - Region type: PARTITION and redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart one server that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should not be lost from queue after servers are restarted because of 
redundancy
+   * 5. Resume gateway-sender
+   * 6. Verify that queued traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartOneServerRedundant(String 
isParallel)
+      throws Exception {
+    configureSites(false, "PARTITION", "1", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys1.size()));
+
+    // stop server on site #2
+    server1Site2.stop(false);
+
+    // start again server in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should not be lost
+    // due to configured redundancy
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys.size() + 
keys1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to other site
+    clientSite1.invoke(() -> checkDataAvailable(keys));
+    clientSite1.invoke(() -> checkDataAvailable(keys1));
+  }
+
+  /**
+   * Verify that gateway-sender state is persisted after stop and start 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the other site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that traffic is sent over the 
gateway-sender to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatStopStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that colocated partition regions (gws queue and region) are created
+   * after servers are restarted and gateway-sender is created in stopped 
state.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the remote site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that onl latest traffic is sent over the 
gateway-sender
+   * to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testThatStopStateRemainAfterTheRestartOfMembersAndAllRegionsRecover(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(false, false);
+
+    // check that partition region is created and that accepts new traffic
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that parallel gateway-sender queue recovers only data enqueued 
prior to stop command
+   * from disk-store.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Stop gateway-sender
+   * 4. Run some traffic and verify that data is stored in region, and not 
enqueued
+   * 6. Restart all servers
+   * 7. Check that data that is enqueued prior to stop command is recovered 
from persistent storage,
+   * and that gateway-sender remained in stopped state
+   * 8. Start gateway-senders
+   * 9. Check that data is not replicated to remote site
+   */
+  @Test
+  @Parameters({"false", "true"})
+  public void 
testThatStopStateRemainAfterTheRestartAndQueueDataIsRecovered(String isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    Set<String> keysQueued = clientSite2.invoke(() -> doPutsInRange(70, 85));
+    clientSite2.invoke(() -> checkDataAvailable(keysQueued));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+
+    verifyGatewaySenderState(false, isParallel.equals("true"));
+
+    Set<String> keysNotQueued = clientSite2.invoke(() -> doPutsInRange(100, 
105));
+    clientSite2.invoke(() -> checkDataAvailable(keysNotQueued));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread1.join();
+
+    verifyGatewaySenderState(false, false);
+    if (isParallel.equals("true")) {
+      server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+    }
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // Check that data is sent over the gateway-sender
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueued));
+    clientSite1.invoke(() -> checkDataNotAvailable(keysNotQueued));
+  }
+
+  /**
+   * This test case verifies that the server during recovery of gateway-sender 
in stopped state
+   * stores events in tmpDroppedEvents queue, and handles them after sender 
has been recovered.
+   */
+  @Test
+  public void testSubscriptionQueueWanTrafficWhileServerIsRestarted() throws 
Exception {

Review comment:
       Could we have a more meaningful name for this test?
   I do not see the relation between the description and the name of the test 
case.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest.java
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.wancommand;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import org.apache.geode.internal.membership.utils.AvailablePort;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class 
ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest
+    implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locatorSite1;
+  private MemberVM locatorSite2;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite1;
+  private ClientVM clientSite2;
+
+  /**
+   * Verify that gateway-sender state is persisted after pause and resume 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart all servers that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be lost from queue after servers are restarted
+   * 5. Resume gateway-sender
+   * 6. Verify that latest traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+  }
+
+  /**
+   * Verify that gateway-sender queue is persisted while in paused state and 
it is recovered after
+   * the restart of servers.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Restart all servers that host gateway-sender
+   * 3. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 4. Restart all servers that host gateway-sender
+   * 5. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be recovered after servers restarted
+   * 6. Resume gateway-sender
+   * 5. Verify that complete traffic is sent over the gateway-sender to the 
remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartAllServersPersistent(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue data 
should not be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size() + 
keysQueue.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue));
+  }
+
+  /**
+   * Verify that gateway-sender is recovered from redundant server after the
+   * restart of member.
+   *
+   * - Region type: PARTITION and redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart one server that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should not be lost from queue after servers are restarted because of 
redundancy
+   * 5. Resume gateway-sender
+   * 6. Verify that queued traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartOneServerRedundant(String 
isParallel)
+      throws Exception {
+    configureSites(false, "PARTITION", "1", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys1.size()));
+
+    // stop server on site #2
+    server1Site2.stop(false);
+
+    // start again server in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should not be lost
+    // due to configured redundancy
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys.size() + 
keys1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to other site
+    clientSite1.invoke(() -> checkDataAvailable(keys));
+    clientSite1.invoke(() -> checkDataAvailable(keys1));
+  }
+
+  /**
+   * Verify that gateway-sender state is persisted after stop and start 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the other site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that traffic is sent over the 
gateway-sender to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatStopStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that colocated partition regions (gws queue and region) are created
+   * after servers are restarted and gateway-sender is created in stopped 
state.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the remote site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that onl latest traffic is sent over the 
gateway-sender
+   * to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testThatStopStateRemainAfterTheRestartOfMembersAndAllRegionsRecover(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(false, false);
+
+    // check that partition region is created and that accepts new traffic
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that parallel gateway-sender queue recovers only data enqueued 
prior to stop command
+   * from disk-store.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Stop gateway-sender
+   * 4. Run some traffic and verify that data is stored in region, and not 
enqueued
+   * 6. Restart all servers
+   * 7. Check that data that is enqueued prior to stop command is recovered 
from persistent storage,
+   * and that gateway-sender remained in stopped state
+   * 8. Start gateway-senders
+   * 9. Check that data is not replicated to remote site
+   */
+  @Test
+  @Parameters({"false", "true"})
+  public void 
testThatStopStateRemainAfterTheRestartAndQueueDataIsRecovered(String isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    Set<String> keysQueued = clientSite2.invoke(() -> doPutsInRange(70, 85));
+    clientSite2.invoke(() -> checkDataAvailable(keysQueued));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+
+    verifyGatewaySenderState(false, isParallel.equals("true"));
+
+    Set<String> keysNotQueued = clientSite2.invoke(() -> doPutsInRange(100, 
105));
+    clientSite2.invoke(() -> checkDataAvailable(keysNotQueued));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread1.join();
+
+    verifyGatewaySenderState(false, false);
+    if (isParallel.equals("true")) {
+      server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+    }
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // Check that data is sent over the gateway-sender
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueued));
+    clientSite1.invoke(() -> checkDataNotAvailable(keysNotQueued));
+  }
+
+  /**
+   * This test case verifies that the server during recovery of gateway-sender 
in stopped state
+   * stores events in tmpDroppedEvents queue, and handles them after sender 
has been recovered.
+   */

Review comment:
       Would it be possible to check that tmpDroppedEvents is not zero at some 
point of the test and then, at the end, it is zero?
   Otherwise, I do not think that the test case does what it should.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest.java
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.wancommand;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import org.apache.geode.internal.membership.utils.AvailablePort;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class 
ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest
+    implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locatorSite1;
+  private MemberVM locatorSite2;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite1;
+  private ClientVM clientSite2;
+
+  /**
+   * Verify that gateway-sender state is persisted after pause and resume 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart all servers that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be lost from queue after servers are restarted
+   * 5. Resume gateway-sender
+   * 6. Verify that latest traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+  }
+
+  /**
+   * Verify that gateway-sender queue is persisted while in paused state and 
it is recovered after
+   * the restart of servers.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Restart all servers that host gateway-sender
+   * 3. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 4. Restart all servers that host gateway-sender
+   * 5. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be recovered after servers restarted
+   * 6. Resume gateway-sender
+   * 5. Verify that complete traffic is sent over the gateway-sender to the 
remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartAllServersPersistent(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue data 
should not be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size() + 
keysQueue.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue));
+  }
+
+  /**
+   * Verify that gateway-sender is recovered from redundant server after the
+   * restart of member.
+   *
+   * - Region type: PARTITION and redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart one server that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should not be lost from queue after servers are restarted because of 
redundancy
+   * 5. Resume gateway-sender
+   * 6. Verify that queued traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartOneServerRedundant(String 
isParallel)
+      throws Exception {
+    configureSites(false, "PARTITION", "1", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys1.size()));
+
+    // stop server on site #2
+    server1Site2.stop(false);
+
+    // start again server in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should not be lost
+    // due to configured redundancy
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys.size() + 
keys1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to other site
+    clientSite1.invoke(() -> checkDataAvailable(keys));
+    clientSite1.invoke(() -> checkDataAvailable(keys1));
+  }
+
+  /**
+   * Verify that gateway-sender state is persisted after stop and start 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the other site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that traffic is sent over the 
gateway-sender to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatStopStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that colocated partition regions (gws queue and region) are created
+   * after servers are restarted and gateway-sender is created in stopped 
state.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the remote site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that onl latest traffic is sent over the 
gateway-sender
+   * to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testThatStopStateRemainAfterTheRestartOfMembersAndAllRegionsRecover(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(false, false);
+
+    // check that partition region is created and that accepts new traffic
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that parallel gateway-sender queue recovers only data enqueued 
prior to stop command
+   * from disk-store.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Stop gateway-sender
+   * 4. Run some traffic and verify that data is stored in region, and not 
enqueued
+   * 6. Restart all servers
+   * 7. Check that data that is enqueued prior to stop command is recovered 
from persistent storage,
+   * and that gateway-sender remained in stopped state
+   * 8. Start gateway-senders
+   * 9. Check that data is not replicated to remote site
+   */
+  @Test
+  @Parameters({"false", "true"})
+  public void 
testThatStopStateRemainAfterTheRestartAndQueueDataIsRecovered(String isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    Set<String> keysQueued = clientSite2.invoke(() -> doPutsInRange(70, 85));
+    clientSite2.invoke(() -> checkDataAvailable(keysQueued));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+
+    verifyGatewaySenderState(false, isParallel.equals("true"));
+
+    Set<String> keysNotQueued = clientSite2.invoke(() -> doPutsInRange(100, 
105));
+    clientSite2.invoke(() -> checkDataAvailable(keysNotQueued));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread1.join();
+
+    verifyGatewaySenderState(false, false);
+    if (isParallel.equals("true")) {
+      server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+    }
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // Check that data is sent over the gateway-sender
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueued));
+    clientSite1.invoke(() -> checkDataNotAvailable(keysNotQueued));
+  }
+
+  /**
+   * This test case verifies that the server during recovery of gateway-sender 
in stopped state
+   * stores events in tmpDroppedEvents queue, and handles them after sender 
has been recovered.
+   */
+  @Test
+  public void testSubscriptionQueueWanTrafficWhileServerIsRestarted() throws 
Exception {
+    configureSites(true, "PARTITION_REDUNDANT_PERSISTENT", "1", "true");
+
+    List<MemberVM> allMembers = new ArrayList<>();
+    allMembers.add(server1Site2);
+    allMembers.add(server2Site2);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 500));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    MemberVM startServer = clusterStartupRule.startServerVM(9, 
locatorSite2.getPort());
+
+    // perform rebalance operation to redistribute primaries on running servers
+    String command = new CommandStringBuilder(CliStrings.REBALANCE)
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    startServer.stop(false);
+
+    Thread thread = new Thread(
+        () -> clientSite2.invoke(() -> doPutsInRange(0, 15000)));
+    thread.start();
+
+    startServer = clusterStartupRule.startServerVM(9, locatorSite2.getPort());
+
+    allMembers.add(startServer);
+
+    for (MemberVM member : allMembers) {
+      member.invoke(() -> {
+        // test that non of the events are stored in primary and secondary 
buckets
+        testLocalQueueIsEmpty("ln");
+        // check that tmpDroppedEvent queue has been drained after it 
recovered in stopped state
+        verifyTmpDroppedEventSize("ln", 0);
+      });
+    }
+  }
+
+  public static void verifyTmpDroppedEventSize(String senderId, int size) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    AbstractGatewaySender ags = (AbstractGatewaySender) sender;
+    await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
+        + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
+  }
+
+  public static void testLocalQueueIsEmpty(String senderId) {
+    await()
+        .untilAsserted(() -> localQueueSize(senderId));
+  }
+
+  public static void localQueueSize(String senderId) {
+    GatewaySender sender = getGatewaySender(senderId);
+    int totalSize = 0;
+    Set<RegionQueue> queues = ((AbstractGatewaySender) sender).getQueues();
+    if (queues != null) {
+      for (RegionQueue q : queues) {
+        ConcurrentParallelGatewaySenderQueue prQ = 
(ConcurrentParallelGatewaySenderQueue) q;
+        prQ.localSize(true);

Review comment:
       This line should contain:
   `totalSize += prQ.localSize(true);`

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest.java
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.wan.wancommand;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.RegionQueue;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue;
+import 
org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor;
+import 
org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor;
+import org.apache.geode.internal.membership.utils.AvailablePort;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+@RunWith(JUnitParamsRunner.class)
+public class 
ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest
+    implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locatorSite1;
+  private MemberVM locatorSite2;
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite1;
+  private ClientVM clientSite2;
+
+  /**
+   * Verify that gateway-sender state is persisted after pause and resume 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart all servers that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be lost from queue after servers are restarted
+   * 5. Resume gateway-sender
+   * 6. Verify that latest traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+  }
+
+  /**
+   * Verify that gateway-sender queue is persisted while in paused state and 
it is recovered after
+   * the restart of servers.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Restart all servers that host gateway-sender
+   * 3. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 4. Restart all servers that host gateway-sender
+   * 5. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should be recovered after servers restarted
+   * 6. Resume gateway-sender
+   * 5. Verify that complete traffic is sent over the gateway-sender to the 
remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartAllServersPersistent(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size()));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue data 
should not be lost
+    Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size() + 
keysQueue.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to remote site
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue1));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueue));
+  }
+
+  /**
+   * Verify that gateway-sender is recovered from redundant server after the
+   * restart of member.
+   *
+   * - Region type: PARTITION and redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Restart one server that host gateway-sender
+   * 4. Run some traffic and verify that data is enqueued in gateway-sender 
queues, old
+   * data should not be lost from queue after servers are restarted because of 
redundancy
+   * 5. Resume gateway-sender
+   * 6. Verify that queued traffic is sent over the gateway-sender to remote 
site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatPauseStateRemainAfterRestartOneServerRedundant(String 
isParallel)
+      throws Exception {
+    configureSites(false, "PARTITION", "1", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    // Do some puts and check that data has been enqueued
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys1.size()));
+
+    // stop server on site #2
+    server1Site2.stop(false);
+
+    // start again server in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+
+    verifyGatewaySenderState(true, true);
+
+    // Do some puts and check that data has been enqueued, previous queue 
should not be lost
+    // due to configured redundancy
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    server1Site2.invoke(() -> checkQueueSize("ln", keys.size() + 
keys1.size()));
+
+    executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // check that queue is empty
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    // check that data replicated to other site
+    clientSite1.invoke(() -> checkDataAvailable(keys));
+    clientSite1.invoke(() -> checkDataAvailable(keys1));
+  }
+
+  /**
+   * Verify that gateway-sender state is persisted after stop and start 
gateway-sender
+   * commands are executed, and that gateway-sender works as expected after 
member restart:
+   *
+   * - Region type: PARTITION and non-redundant
+   * - Gateway sender configured without queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the other site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that traffic is sent over the 
gateway-sender to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void testThatStopStateRemainAfterTheRestartOfMembers(String 
isParallel) throws Exception {
+    configureSites(false, "PARTITION", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort());
+
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that colocated partition regions (gws queue and region) are created
+   * after servers are restarted and gateway-sender is created in stopped 
state.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Stop gateway-sender
+   * 2. Run some traffic and verify that data is stored in partition region, 
and not replicated to
+   * the remote site
+   * 3. Restart servers that host gateway-sender
+   * 4. Run some traffic and verify that partition region is recovered and 
that data is not
+   * replicated to the other site
+   * 5. Start gateway-sender
+   * 6. Run some traffic and verify that onl latest traffic is sent over the 
gateway-sender
+   * to remote site
+   */
+  @Test
+  @Parameters({"true", "false"})
+  public void 
testThatStopStateRemainAfterTheRestartOfMembersAndAllRegionsRecover(String 
isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread.join();
+
+    verifyGatewaySenderState(false, false);
+
+    // check that partition region is created and that accepts new traffic
+    Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35));
+    clientSite2.invoke(() -> checkDataAvailable(keys));
+
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55));
+    clientSite2.invoke(() -> checkDataAvailable(keys3));
+    clientSite1.invoke(() -> checkDataAvailable(keys3));
+  }
+
+  /**
+   * Verify that parallel gateway-sender queue recovers only data enqueued 
prior to stop command
+   * from disk-store.
+   *
+   * - Region type: PARTITION_PERSISTENT and non-redundant
+   * - Gateway sender configured with queue persistence
+   *
+   * 1. Pause gateway-sender
+   * 2. Run some traffic and verify that data is enqueued in gateway-sender 
queues
+   * 3. Stop gateway-sender
+   * 4. Run some traffic and verify that data is stored in region, and not 
enqueued
+   * 6. Restart all servers
+   * 7. Check that data that is enqueued prior to stop command is recovered 
from persistent storage,
+   * and that gateway-sender remained in stopped state
+   * 8. Start gateway-senders
+   * 9. Check that data is not replicated to remote site
+   */
+  @Test
+  @Parameters({"false", "true"})
+  public void 
testThatStopStateRemainAfterTheRestartAndQueueDataIsRecovered(String isParallel)
+      throws Exception {
+    configureSites(true, "PARTITION_PERSISTENT", "0", isParallel);
+
+    executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER);
+    verifyGatewaySenderState(true, true);
+    server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+    server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln"));
+
+    Set<String> keysQueued = clientSite2.invoke(() -> doPutsInRange(70, 85));
+    clientSite2.invoke(() -> checkDataAvailable(keysQueued));
+    server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+
+    verifyGatewaySenderState(false, isParallel.equals("true"));
+
+    Set<String> keysNotQueued = clientSite2.invoke(() -> doPutsInRange(100, 
105));
+    clientSite2.invoke(() -> checkDataAvailable(keysNotQueued));
+
+    // stop servers on site #2
+    server1Site2.stop(false);
+    server2Site2.stop(false);
+
+    // start again servers in Site #2
+    Thread thread = new Thread(
+        () -> server1Site2 = clusterStartupRule.startServerVM(5, 
locatorSite2.getPort()));
+    Thread thread1 = new Thread(
+        () -> server2Site2 = clusterStartupRule.startServerVM(6, 
locatorSite2.getPort()));
+    // start threads
+    thread.start();
+    thread1.start();
+    thread.join();
+    thread1.join();
+
+    verifyGatewaySenderState(false, false);
+    if (isParallel.equals("true")) {
+      server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size()));
+    }
+    executeGfshCommand(CliStrings.START_GATEWAYSENDER);
+    verifyGatewaySenderState(true, false);
+
+    // Check that data is sent over the gateway-sender
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+    clientSite1.invoke(() -> checkDataAvailable(keysQueued));
+    clientSite1.invoke(() -> checkDataNotAvailable(keysNotQueued));
+  }
+
+  /**
+   * This test case verifies that the server during recovery of gateway-sender 
in stopped state
+   * stores events in tmpDroppedEvents queue, and handles them after sender 
has been recovered.
+   */
+  @Test
+  public void testSubscriptionQueueWanTrafficWhileServerIsRestarted() throws 
Exception {
+    configureSites(true, "PARTITION_REDUNDANT_PERSISTENT", "1", "true");
+
+    List<MemberVM> allMembers = new ArrayList<>();
+    allMembers.add(server1Site2);
+    allMembers.add(server2Site2);
+
+    executeGfshCommand(CliStrings.STOP_GATEWAYSENDER);
+    verifyGatewaySenderState(false, false);
+
+    Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 500));
+    clientSite2.invoke(() -> checkDataAvailable(keys1));
+    server1Site2.invoke(() -> checkQueueSize("ln", 0));
+
+    MemberVM startServer = clusterStartupRule.startServerVM(9, 
locatorSite2.getPort());
+
+    // perform rebalance operation to redistribute primaries on running servers
+    String command = new CommandStringBuilder(CliStrings.REBALANCE)
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    startServer.stop(false);
+
+    Thread thread = new Thread(
+        () -> clientSite2.invoke(() -> doPutsInRange(0, 15000)));
+    thread.start();
+
+    startServer = clusterStartupRule.startServerVM(9, locatorSite2.getPort());
+
+    allMembers.add(startServer);
+
+    for (MemberVM member : allMembers) {
+      member.invoke(() -> {
+        // test that non of the events are stored in primary and secondary 
buckets
+        testLocalQueueIsEmpty("ln");
+        // check that tmpDroppedEvent queue has been drained after it 
recovered in stopped state
+        verifyTmpDroppedEventSize("ln", 0);
+      });
+    }
+  }
+
+  public static void verifyTmpDroppedEventSize(String senderId, int size) {
+    GatewaySender sender = getGatewaySender(senderId);
+
+    AbstractGatewaySender ags = (AbstractGatewaySender) sender;
+    await().untilAsserted(() -> assertEquals("Expected tmpDroppedEvents size: 
" + size
+        + " but actual size: " + ags.getTmpDroppedEventSize(), size, 
ags.getTmpDroppedEventSize()));
+  }
+
+  public static void testLocalQueueIsEmpty(String senderId) {
+    await()
+        .untilAsserted(() -> localQueueSize(senderId));
+  }
+
+  public static void localQueueSize(String senderId) {

Review comment:
       I would change the name of this method to something like:
   `assertThatLocalQueueSizeIsZero`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to