This is an automated email from the ASF dual-hosted git repository.

mkevo pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new 3927ba9  GEODE-8202: Two-step serial gw sender threads start (#5900)
3927ba9 is described below

commit 3927ba9eeed5bee7e7639f926b48409c4b6dd3a7
Author: Alberto Bustamante Reyes <alb3rt...@users.noreply.github.com>
AuthorDate: Mon Jan 18 13:57:31 2021 +0100

    GEODE-8202: Two-step serial gw sender threads start (#5900)
    
    * GEODE-8202: Two-step serial gw sender threads start
---
 ...iversWithSamePortAndHostnameForSendersTest.java | 127 +++++++++++-
 .../geode-list-gateway-receivers-server1.gfsh      |  20 ++
 .../geode-list-gateway-receivers-server2.gfsh      |  20 ++
 .../codeAnalysis/sanctionedDataSerializables.txt   |   8 +-
 .../geode/cache/configuration/CacheConfig.java     |  23 +++
 .../org/apache/geode/cache/wan/GatewaySender.java  |  10 +
 .../geode/cache/wan/GatewaySenderFactory.java      |  13 ++
 .../internal/cache/wan/AbstractGatewaySender.java  |  18 ++
 .../wan/AbstractGatewaySenderEventProcessor.java   |   8 +
 .../internal/cache/wan/GatewaySenderAdvisor.java   |  27 ++-
 .../cache/wan/GatewaySenderAttributes.java         |   6 +
 ...oncurrentSerialGatewaySenderEventProcessor.java |  57 ++++--
 .../geode/internal/cache/xmlcache/CacheXml.java    |   5 +-
 .../internal/cache/xmlcache/CacheXmlGenerator.java |  10 +
 .../internal/cache/xmlcache/CacheXmlParser.java    |  11 ++
 .../geode/management/internal/i18n/CliStrings.java |   4 +
 .../geode.apache.org/schema/cache/cache-1.0.xsd    |   1 +
 .../gfsh/command-pages/create.html.md.erb          |   5 +
 .../cli/commands/CreateGatewaySenderCommand.java   |  27 ++-
 .../cli/functions/GatewaySenderCreateFunction.java |   7 +
 .../cli/functions/GatewaySenderFunctionArgs.java   |   6 +
 .../sanctioned-geode-gfsh-serializables.txt        |   2 +-
 .../commands/CreateGatewaySenderCommandTest.java   |  68 ++++++-
 ...CreateDestroyGatewaySenderCommandDUnitTest.java |   2 +-
 .../wan/GatewaySenderEventRemoteDispatcher.java    |  93 +++++++--
 .../cache/wan/GatewaySenderFactoryImpl.java        |  10 +-
 .../RemoteSerialGatewaySenderEventProcessor.java   |   2 +-
 .../cache/wan/serial/SerialGatewaySenderImpl.java  |   1 +
 ...atewaySenderEventRemoteDispatcherJUnitTest.java | 214 ++++++++++++++++++++-
 29 files changed, 751 insertions(+), 54 deletions(-)

diff --git 
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
 
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 6be6066..7247362 100644
--- 
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++ 
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -22,13 +22,17 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.LOCATORS;
 import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.net.URL;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import java.util.StringTokenizer;
+import java.util.Vector;
 
 import com.palantir.docker.compose.DockerComposeRule;
 import org.junit.BeforeClass;
@@ -71,14 +75,8 @@ import org.apache.geode.test.junit.categories.WanTest;
  * traffic directed to the 2324 port to the receivers in a round robin fashion.
  *
  * - Another site consisting of a 1-server, 1-locator Geode cluster.
- * The server hosts a partition region (region-wan) and has a parallel gateway 
receiver
+ * The server hosts a partition region (region-wan) and has a gateway receiver
  * to send events to the remote site.
- *
- * The aim of the tests is verify that when several gateway receivers in a 
remote site
- * share the same port and hostname-for-senders, the pings sent from the 
gateway senders
- * reach the right gateway receiver and not just any of the receivers. Failure 
to do this
- * may result in the closing of connections by a gateway receiver for not 
having
- * received the ping in time.
  */
 @Category({WanTest.class})
 public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
@@ -122,6 +120,13 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
     super();
   }
 
+  /**
+   * The aim of this test is verify that when several gateway receivers in a 
remote site
+   * share the same port and hostname-for-senders, the pings sent from the 
gateway senders
+   * reach the right gateway receiver and not just any of the receivers. 
Failure to do this
+   * may result in the closing of connections by a gateway receiver for not 
having
+   * received the ping in time.
+   */
   @Test
   public void 
testPingsToReceiversWithSamePortAndHostnameForSendersReachTheRightReceivers()
       throws InterruptedException {
@@ -159,6 +164,105 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
     assertEquals(0, senderPoolDisconnects);
   }
 
+  @Test
+  public void testSerialGatewaySenderThreadsConnectToSameReceiver() {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = 20334;
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+    createCache(vm1, locPort);
+
+    createGatewaySender(vm1, senderId, 2, false, 5,
+        3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    assertTrue(allDispatchersConnectedToSameReceiver(1));
+    assertTrue(allDispatchersConnectedToSameReceiver(2));
+
+  }
+
+  @Test
+  public void 
testTwoSendersWithSameIdShouldUseSameValueForEnforceThreadsConnectToSameServer()
 {
+    String senderId = "ln";
+    final int remoteLocPort = 20334;
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+    createCache(vm1, locPort);
+
+    VM vm2 = VM.getVM(2);
+    createCache(vm2, locPort);
+
+    createGatewaySender(vm1, senderId, 2, false, 5,
+        3, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    Exception exception =
+        assertThrows(Exception.class, () -> createGatewaySender(vm2, senderId, 
2, false, 5,
+            3, GatewaySender.DEFAULT_ORDER_POLICY, false));
+    assertEquals(exception.getCause().getMessage(), "Cannot create Gateway 
Sender " + senderId
+        + " with enforceThreadsConnectSameReceiver false because another cache 
has the same Gateway Sender defined with enforceThreadsConnectSameReceiver 
true");
+
+  }
+
+  private boolean allDispatchersConnectedToSameReceiver(int server) {
+
+    String gfshOutput = runListGatewayReceiversCommandInServer(server);
+    Vector<String> sendersConnectedToServer = 
parseSendersConnectedFromGfshOutput(gfshOutput);
+    String firstSenderId = "";
+    for (String senderId : sendersConnectedToServer) {
+      if (firstSenderId.equals("")) {
+        firstSenderId = senderId;
+      } else {
+        assertEquals("Found two different senders (" + firstSenderId + " and " 
+ senderId
+            + ") connected to same receiver in server " + server, 
firstSenderId, senderId);
+      }
+    }
+    return true;
+  }
+
+
+  private String runListGatewayReceiversCommandInServer(int serverN) {
+    String result = "";
+    try {
+      result = docker.get().exec(options("-T"), "locator",
+          arguments("gfsh", "run",
+              "--file=/geode/scripts/geode-list-gateway-receivers-server" + 
serverN + ".gfsh"));
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    } finally {
+      return result;
+    }
+  }
+
+  private Vector<String> parseSendersConnectedFromGfshOutput(String 
gfshOutput) {
+    String lines[] = gfshOutput.split(System.getProperty("line.separator"));
+    final String sendersConnectedColumnHeader = "Senders Connected";
+    String receiverInfo = null;
+    for (int i = 0; i < lines.length; i++) {
+      if (lines[i].contains(sendersConnectedColumnHeader)) {
+        receiverInfo = lines[i + 2];
+        break;
+      }
+    }
+    assertNotNull(
+        "Error parsing gfsh output. '" + sendersConnectedColumnHeader + "' 
column header not found",
+        receiverInfo);
+    String[] tableRow = receiverInfo.split("\\|");
+    String sendersConnectedColumnValue = tableRow[3].trim();
+    Vector<String> senders = new Vector<String>();
+    for (String sender : sendersConnectedColumnValue.split(",")) {
+      senders.add(sender.trim());
+    }
+    return senders;
+  }
+
   private int createLocator(VM memberVM, int dsId, int remoteLocPort) {
     return memberVM.invoke("create locator", () -> {
       Properties props = new Properties();
@@ -182,6 +286,14 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
       boolean isParallel, Integer batchSize,
       int numDispatchers,
       GatewaySender.OrderPolicy orderPolicy) {
+    createGatewaySender(vm, dsName, remoteDsId, isParallel, batchSize, 
numDispatchers, orderPolicy,
+        true);
+  }
+
+  public static void createGatewaySender(VM vm, String dsName, int remoteDsId,
+      boolean isParallel, Integer batchSize,
+      int numDispatchers,
+      GatewaySender.OrderPolicy orderPolicy, boolean 
enforceThreadsConnectToSameReceiver) {
     vm.invoke(() -> {
       final IgnoredException exln = 
IgnoredException.addIgnoredException("Could not connect");
       try {
@@ -191,6 +303,7 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
         gateway.setBatchSize(batchSize);
         gateway.setDispatcherThreads(numDispatchers);
         gateway.setOrderPolicy(orderPolicy);
+        
gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectToSameReceiver);
         gateway.create(dsName, remoteDsId);
 
       } finally {
diff --git 
a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
 
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
new file mode 100644
index 0000000..a0d61bb
--- /dev/null
+++ 
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server1.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server1
diff --git 
a/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
 
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
new file mode 100644
index 0000000..37a16dc
--- /dev/null
+++ 
b/geode-assembly/src/acceptanceTest/resources/org/apache/geode/cache/wan/scripts/geode-list-gateway-receivers-server2.gfsh
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+set variable --name=APP_RESULT_VIEWER --value=200
+connect --locator=locator[20334]
+list gateways --receivers-only --member=server2
diff --git 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
index 65d8796..50ce9e4 100644
--- 
a/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
+++ 
b/geode-core/src/integrationTest/resources/org/apache/geode/codeAnalysis/sanctionedDataSerializables.txt
@@ -1910,10 +1910,12 @@ org/apache/geode/internal/cache/versions/VersionTag,2
 fromData,225
 toData,254
 
-org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,4
-fromData,283
+org/apache/geode/internal/cache/wan/GatewaySenderAdvisor$GatewaySenderProfile,6
+fromData,17
+fromDataPre_GEODE_1_14_0_0,293
 fromDataPre_GFE_8_0_0_0,188
-toData,271
+toData,17
+toDataPre_GEODE_1_14_0_0,281
 toDataPre_GFE_8_0_0_0,236
 
 org/apache/geode/internal/cache/wan/GatewaySenderEventCallbackArgument,2
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
 
b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
index f1b119b..85fe90e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/configuration/CacheConfig.java
@@ -2656,6 +2656,8 @@ public class CacheConfig {
     protected String orderPolicy;
     @XmlAttribute(name = "group-transaction-events")
     protected Boolean groupTransactionEvents;
+    @XmlAttribute(name = "enforce-threads-connect-same-receiver")
+    protected Boolean enforceThreadsConnectSameReceiver;
 
     /**
      * Gets the value of the gatewayEventFilters property.
@@ -3100,6 +3102,27 @@ public class CacheConfig {
       this.orderPolicy = value;
     }
 
+    /**
+     * Sets the value of the enforceThreadsConnectSameReceiver property.
+     *
+     * allowed object is
+     * {@link Boolean }
+     *
+     */
+    public void setEnforceThreadsConnectSameReceiver(Boolean value) {
+      this.enforceThreadsConnectSameReceiver = value;
+    }
+
+    /**
+     * Gets the value of the enforceThreadsConnectSameReceiver property.
+     *
+     * possible object is
+     * {@link Boolean }
+     *
+     */
+    public Boolean getEnforceThreadsConnectSameReceiver() {
+      return this.enforceThreadsConnectSameReceiver;
+    }
   }
 
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java 
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
index ffacf4b..5e0e9f1 100644
--- a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
+++ b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySender.java
@@ -147,6 +147,8 @@ public interface GatewaySender {
 
   boolean DEFAULT_IS_FOR_INTERNAL_USE = false;
 
+  boolean DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER = false;
+
   /**
    * Retry a connection from sender to receiver after specified time interval 
(in milliseconds) in
    * case receiver is not up and running. Default is set to 1000 milliseconds 
i.e. 1 second.
@@ -449,4 +451,12 @@ public interface GatewaySender {
    *
    */
   void destroy();
+
+  /**
+   * Returns enforceThreadsConnectSameReceiver boolean property for this 
GatewaySender.
+   *
+   * @return enforceThreadsConnectSameReceiver boolean property for this 
GatewaySender
+   *
+   */
+  boolean getEnforceThreadsConnectSameReceiver();
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java 
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
index 7c99214..6c9e92b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/wan/GatewaySenderFactory.java
@@ -191,6 +191,19 @@ public interface GatewaySenderFactory {
   GatewaySenderFactory 
setGatewayEventSubstitutionFilter(GatewayEventSubstitutionFilter filter);
 
   /**
+   * If true, receiver member id is checked by all dispatcher threads when the 
connection is
+   * established to ensure they connect to the same receiver. Instead of 
starting all dispatcher
+   * threads in parallel, one thread is started first, and after that the rest 
are started in
+   * parallel. Default is false.
+   *
+   * @param enforceThreadsConnectSameReceiver boolean if true threads will 
verify if they are
+   *        connected to the same receiver
+   *
+   */
+  GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+      boolean enforceThreadsConnectSameReceiver);
+
+  /**
    * Creates a <code>GatewaySender</code> to communicate with remote 
distributed system
    *
    * @param id unique id for this SerialGatewaySender
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
index 7f6c8a1..4ea2c6d 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySender.java
@@ -171,6 +171,8 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
 
   private ServerLocation serverLocation;
 
+  private String expectedReceiverUniqueId = "";
+
   protected Object queuedEventsSync = new Object();
 
   protected volatile boolean enqueuedAllTempQueueEvents = false;
@@ -237,6 +239,8 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
 
   private final StatisticsClock statisticsClock;
 
+  protected boolean enforceThreadsConnectSameReceiver;
+
   protected AbstractGatewaySender() {
     statisticsClock = disabledClock();
   }
@@ -275,6 +279,7 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
     this.maxMemoryPerDispatcherQueue = this.queueMemory / 
this.dispatcherThreads;
     this.serialNumber = DistributionAdvisor.createSerialNumber();
     this.isMetaQueue = attrs.isMetaQueue();
+    this.enforceThreadsConnectSameReceiver = 
attrs.getEnforceThreadsConnectSameReceiver();
     if (!(this.cache instanceof CacheCreation)) {
       this.myDSId = 
this.cache.getInternalDistributedSystem().getDistributionManager()
           .getDistributedSystemId();
@@ -500,6 +505,11 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
   }
 
   @Override
+  public boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
+
+  @Override
   public boolean equals(Object obj) {
     if (obj == null) {
       return false;
@@ -1429,6 +1439,14 @@ public abstract class AbstractGatewaySender implements 
InternalGatewaySender, Di
     }
   }
 
+  public void setExpectedReceiverUniqueId(String expectedReceiverUniqueId) {
+    this.expectedReceiverUniqueId = expectedReceiverUniqueId;
+  }
+
+  public String getExpectedReceiverUniqueId() {
+    return this.expectedReceiverUniqueId;
+  }
+
   /**
    * Has a reference to a GatewayEventImpl and has a timeout value.
    */
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
index 0609ec9..294121a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/AbstractGatewaySenderEventProcessor.java
@@ -154,6 +154,14 @@ public abstract class AbstractGatewaySenderEventProcessor 
extends LoggingThread
     this.threadMonitoring = tMonitoring;
   }
 
+  public void setExpectedReceiverUniqueId(String uniqueId) {
+    this.sender.setExpectedReceiverUniqueId(uniqueId);
+  }
+
+  public String getExpectedReceiverUniqueId() {
+    return this.sender.getExpectedReceiverUniqueId();
+  }
+
   public Object getRunningStateLock() {
     return runningStateLock;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
index adf80cb..6af0866 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAdvisor.java
@@ -232,6 +232,15 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
               "Cannot create Gateway Sender %s with isDiskSynchronous %s 
because another cache has the same Gateway Sender defined with 
isDiskSynchronous %s",
               sp.Id, sp.isDiskSynchronous, sender.isDiskSynchronous()));
     }
+    if 
(sp.getDistributedMember().getVersion().isNotOlderThan(KnownVersion.GEODE_1_14_0))
 {
+      if (sp.enforceThreadsConnectSameReceiver != 
sender.getEnforceThreadsConnectSameReceiver()) {
+        throw new IllegalStateException(
+            String.format(
+                "Cannot create Gateway Sender %s with 
enforceThreadsConnectSameReceiver %s because another cache has the same Gateway 
Sender defined with enforceThreadsConnectSameReceiver %s",
+                sp.Id, sp.enforceThreadsConnectSameReceiver,
+                sender.getEnforceThreadsConnectSameReceiver()));
+      }
+    }
   }
 
   /**
@@ -532,6 +541,8 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
 
     public ServerLocation serverLocation;
 
+    public boolean enforceThreadsConnectSameReceiver = false;
+
     public GatewaySenderProfile(InternalDistributedMember memberId, int 
version) {
       super(memberId, version);
     }
@@ -541,6 +552,12 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
     @Override
     public void fromData(DataInput in,
         DeserializationContext context) throws IOException, 
ClassNotFoundException {
+      fromDataPre_GEODE_1_14_0_0(in, context);
+      this.enforceThreadsConnectSameReceiver = in.readBoolean();
+    }
+
+    public void fromDataPre_GEODE_1_14_0_0(DataInput in,
+        DeserializationContext context) throws IOException, 
ClassNotFoundException {
       super.fromData(in, context);
       this.Id = DataSerializer.readString(in);
       this.startTime = in.readLong();
@@ -578,11 +595,18 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
         this.serverLocation = new ServerLocation();
         InternalDataSerializer.invokeFromData(this.serverLocation, in);
       }
+      this.enforceThreadsConnectSameReceiver = in.readBoolean();
     }
 
     @Override
     public void toData(DataOutput out,
         SerializationContext context) throws IOException {
+      toDataPre_GEODE_1_14_0_0(out, context);
+      out.writeBoolean(enforceThreadsConnectSameReceiver);
+    }
+
+    public void toDataPre_GEODE_1_14_0_0(DataOutput out,
+        SerializationContext context) throws IOException {
       super.toData(out, context);
       DataSerializer.writeString(Id, out);
       out.writeLong(startTime);
@@ -617,6 +641,7 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
       if (serverLocationFound) {
         InternalDataSerializer.invokeToData(serverLocation, out);
       }
+      out.writeBoolean(enforceThreadsConnectSameReceiver);
     }
 
     public void fromDataPre_GFE_8_0_0_0(DataInput in, DeserializationContext 
context)
@@ -684,7 +709,7 @@ public class GatewaySenderAdvisor extends 
DistributionAdvisor {
 
     @Immutable
     private static final KnownVersion[] serializationVersions =
-        new KnownVersion[] {KnownVersion.GFE_80};
+        new KnownVersion[] {KnownVersion.GFE_80, KnownVersion.GEODE_1_14_0};
 
     @Override
     public KnownVersion[] getSerializationVersions() {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
index 1457776..581b576 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderAttributes.java
@@ -85,6 +85,9 @@ public class GatewaySenderAttributes {
 
   public boolean forwardExpirationDestroy = 
GatewaySender.DEFAULT_FORWARD_EXPIRATION_DESTROY;
 
+  public boolean enforceThreadsConnectSameReceiver =
+      GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER;
+
   public int getSocketBufferSize() {
     return this.socketBufferSize;
   }
@@ -205,4 +208,7 @@ public class GatewaySenderAttributes {
     return this.forwardExpirationDestroy;
   }
 
+  public boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
index 06f74ae..7adf996 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/serial/ConcurrentSerialGatewaySenderEventProcessor.java
@@ -179,12 +179,27 @@ public class ConcurrentSerialGatewaySenderEventProcessor
 
   @Override
   public void run() {
-    for (int i = 0; i < this.processors.size(); i++) {
-      if (logger.isDebugEnabled()) {
+    boolean isDebugEnabled = logger.isDebugEnabled();
+    if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+      this.processors.get(0).start();
+      waitForRunningStatus(this.processors.get(0));
+      String receiverUniqueId = 
this.processors.get(0).getExpectedReceiverUniqueId();
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher is connected to " + receiverUniqueId);
+      }
+      for (int j = 1; j < this.processors.size(); j++) {
+        this.processors.get(j).setExpectedReceiverUniqueId(receiverUniqueId);
+      }
+    }
+
+    for (int i = this.sender.getEnforceThreadsConnectSameReceiver() ? 1 : 0; i 
< this.processors
+        .size(); i++) {
+      if (isDebugEnabled) {
         logger.debug("Starting the serialProcessor {}", i);
       }
       this.processors.get(i).start();
     }
+
     try {
       waitForRunningStatus();
     } catch (GatewaySenderException e) {
@@ -205,7 +220,7 @@ public class ConcurrentSerialGatewaySenderEventProcessor
       try {
         serialProcessor.join();
       } catch (InterruptedException e) {
-        if (logger.isDebugEnabled()) {
+        if (isDebugEnabled) {
           logger.debug("Got InterruptedException while waiting for child 
threads to finish.");
           Thread.currentThread().interrupt();
         }
@@ -219,24 +234,28 @@ public class ConcurrentSerialGatewaySenderEventProcessor
     throw new UnsupportedOperationException();
   }
 
-  private void waitForRunningStatus() {
-    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
-      synchronized (serialProcessor.getRunningStateLock()) {
-        while (serialProcessor.getException() == null && 
serialProcessor.isStopped()) {
-          try {
-            serialProcessor.getRunningStateLock().wait();
-          } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-          }
-        }
-        Exception ex = serialProcessor.getException();
-        if (ex != null) {
-          throw new GatewaySenderException(
-              String.format("Could not start a gateway sender %s because of 
exception %s",
-                  new Object[] {this.sender.getId(), ex.getMessage()}),
-              ex.getCause());
+  private void waitForRunningStatus(SerialGatewaySenderEventProcessor 
serialProcessor) {
+    synchronized (serialProcessor.getRunningStateLock()) {
+      while (serialProcessor.getException() == null && 
serialProcessor.isStopped()) {
+        try {
+          serialProcessor.getRunningStateLock().wait();
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
         }
       }
+      Exception ex = serialProcessor.getException();
+      if (ex != null) {
+        throw new GatewaySenderException(
+            String.format("Could not start a gateway sender %s because of 
exception %s",
+                new Object[] {this.sender.getId(), ex.getMessage()}),
+            ex.getCause());
+      }
+    }
+  }
+
+  private void waitForRunningStatus() {
+    for (SerialGatewaySenderEventProcessor serialProcessor : this.processors) {
+      waitForRunningStatus(serialProcessor);
     }
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
index 1ac84a4..9970c55 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXml.java
@@ -433,8 +433,11 @@ public abstract class CacheXml implements EntityResolver2, 
ErrorHandler {
   protected static final String ORDER_POLICY = "order-policy";
   /** The name of the <code>remote-distributed-system</code> attribute */
   protected static final String REMOTE_DISTRIBUTED_SYSTEM_ID = 
"remote-distributed-system-id";
+  /** The name of the <code>group-transaction-events</code> attribute */
   protected static final String GROUP_TRANSACTION_EVENTS = 
"group-transaction-events";
-
+  /** The name of the <code>enforce-threads-connect-same-receiver</code> 
attribute */
+  protected static final String ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+      "enforce-threads-connect-same-receiver";
 
   /** The name of the <code>bind-address</code> attribute */
   protected static final String BIND_ADDRESS = "bind-address";
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
index ea19b88..04ac07e 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlGenerator.java
@@ -1394,6 +1394,16 @@ public class CacheXmlGenerator extends CacheXml 
implements XMLReader {
       }
     }
 
+    // enforce-threads-connect-same-receiver
+    if (version.compareTo(CacheXmlVersion.GEODE_1_0) >= 0) {
+      if (generateDefaults()
+          || sender
+              .getEnforceThreadsConnectSameReceiver() != 
GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER) {
+        atts.addAttribute("", "", ENFORCE_THREADS_CONNECT_SAME_RECEIVER, "",
+            String.valueOf(sender.getEnforceThreadsConnectSameReceiver()));
+      }
+    }
+
     handler.startElement("", GATEWAY_SENDER, GATEWAY_SENDER, atts);
 
     for (GatewayEventFilter gef : sender.getGatewayEventFilters()) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
index c775086..ce4d211 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/xmlcache/CacheXmlParser.java
@@ -707,6 +707,17 @@ public class CacheXmlParser extends CacheXml implements 
ContentHandler {
       gatewaySenderFactory
           
.setGroupTransactionEvents(Boolean.parseBoolean(groupTransactionEvents));
     }
+
+    String enforceThreadsConnectSameReceiver = 
atts.getValue(ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+    if (enforceThreadsConnectSameReceiver == null) {
+      gatewaySenderFactory
+          .setEnforceThreadsConnectSameReceiver(
+              GatewaySender.DEFAULT_ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
+    } else {
+      gatewaySenderFactory
+          .setEnforceThreadsConnectSameReceiver(
+              Boolean.parseBoolean(enforceThreadsConnectSameReceiver));
+    }
   }
 
   private void startGatewayReceiver(Attributes atts) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index 4ce4416..3b08621 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -2267,6 +2267,10 @@ public class CliStrings {
       "GatewaySender \"{0}\" created on \"{1}\"";
   public static final String 
CREATE_GATEWAYSENDER__MSG__CAN_NOT_CREATE_DIFFERENT_VERSIONS =
       "Gateway Sender cannot be created until all members are the current 
version";
+  public static final String 
CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER =
+      "enforce-threads-connect-same-receiver";
+  public static final String 
CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP =
+      "Whether or not the sender threads have to verify the receiver member id 
to verify if they are connected to the same server.";
 
   /* start gateway-sender */
   public static final String START_GATEWAYSENDER = "start gateway-sender";
diff --git 
a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
 
b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
index 53b57f2..db6841a 100755
--- 
a/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
+++ 
b/geode-core/src/main/resources/META-INF/schemas/geode.apache.org/schema/cache/cache-1.0.xsd
@@ -203,6 +203,7 @@ declarative caching XML file elements unless indicated 
otherwise.
             <xsd:attribute name="dispatcher-threads" type="xsd:string" 
use="optional" />
             <xsd:attribute name="order-policy" type="xsd:string" 
use="optional" />
             <xsd:attribute name="group-transaction-events" type="xsd:boolean" 
use="optional" />
+            <xsd:attribute name="enforce-threads-connect-same-receiver" 
type="xsd:boolean" use="optional" />
           </xsd:complexType>
         </xsd:element>
 
diff --git a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb 
b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
index 30b4da1..11b4192 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/create.html.md.erb
@@ -638,7 +638,12 @@ create gateway-sender --id=value 
--remote-distributed-system-id=value
 <p>Only allowed to be set on gateway senders with the <code class="ph 
codeph">parallel</code> attribute set to false and <code class="ph 
codeph">dispatcher-threads</code> attribute equal to 1, or on gateway senders 
with the <code class="ph codeph">parallel</code> attribute set to true. Also, 
the  <code class="ph codeph">enable-batch-conflation</code> attribute of the 
gateway sender must be set to false.</p>
 <p><b>Note:</b> In order to work for a transaction, the regions to which the 
transaction events belong must be replicated by the same set of senders with 
this flag enabled.</p>
 <p><b>Note:</b> If the above condition is not fulfilled or under very high 
load traffic conditions, it may not be guaranteed that all the events for a 
transaction will be sent in the same batch, even if <code class="ph 
codeph">group-transaction-events</code> is enabled. The number of batches sent 
with incomplete transactions can be retrieved from the <code class="ph 
codeph">GatewaySenderMXBean</code> bean.</p></td>
+<td>false</td>
 </td>
+</tr>
+<tr>
+<td><span class="keyword 
parmname">\-\-enforce-threads-connect-same-receiver</span></td>
+<td>This parameter applies only to serial gateway senders. If true, receiver 
member id is checked by all dispatcher threads when the connection is 
established to ensure they connect to the same receiver. Instead of starting 
all dispatcher threads in parallel, one thread is started first, and after that 
the rest are started in parallel.</td>
 <td>false</td>
 </tr>
 </tbody>
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
index a6b40b9..474153b 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommand.java
@@ -137,14 +137,20 @@ public class CreateGatewaySenderCommand extends 
SingleGfshCommand {
           help = CliStrings.CREATE_GATEWAYSENDER__GATEWAYEVENTFILTER__HELP) 
String[] gatewayEventFilters,
 
       @CliOption(key = CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER,
-          help = 
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] 
gatewayTransportFilter) {
+          help = 
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER__HELP) String[] 
gatewayTransportFilter,
+
+      @CliOption(key = 
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER,
+          specifiedDefaultValue = "true",
+          unspecifiedDefaultValue = "false",
+          help = 
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER__HELP) 
Boolean enforceThreadsConnectSameReceiver) {
 
     CacheConfig.GatewaySender configuration =
         buildConfiguration(id, remoteDistributedSystemId, parallel, 
manualStart,
             socketBufferSize, socketReadTimeout, enableBatchConflation, 
batchSize,
             batchTimeInterval, enablePersistence, diskStoreName, 
diskSynchronous, maxQueueMemory,
             alertThreshold, dispatcherThreads, orderPolicy == null ? null : 
orderPolicy.name(),
-            gatewayEventFilters, gatewayTransportFilter, 
groupTransactionEvents);
+            gatewayEventFilters, gatewayTransportFilter, 
groupTransactionEvents,
+            enforceThreadsConnectSameReceiver);
 
     GatewaySenderFunctionArgs gatewaySenderFunctionArgs =
         new GatewaySenderFunctionArgs(configuration);
@@ -228,7 +234,8 @@ public class CreateGatewaySenderCommand extends 
SingleGfshCommand {
       String orderPolicy,
       String[] gatewayEventFilters,
       String[] gatewayTransportFilters,
-      Boolean groupTransactionEvents) {
+      Boolean groupTransactionEvents,
+      Boolean enforceThreadsConnectSameReceiver) {
     CacheConfig.GatewaySender sender = new CacheConfig.GatewaySender();
     sender.setId(id);
     sender.setRemoteDistributedSystemId(int2string(remoteDSId));
@@ -253,7 +260,7 @@ public class CreateGatewaySenderCommand extends 
SingleGfshCommand {
     if (gatewayTransportFilters != null) {
       
sender.getGatewayTransportFilters().addAll(stringsToDeclarableTypes(gatewayTransportFilters));
     }
-
+    
sender.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
     return sender;
   }
 
@@ -284,6 +291,10 @@ public class CreateGatewaySenderCommand extends 
SingleGfshCommand {
       Boolean batchConflationEnabled =
           (Boolean) parseResult
               
.getParamValue(CliStrings.CREATE_GATEWAYSENDER__ENABLEBATCHCONFLATION);
+      Boolean enforceThreadsConnectSameReceiver =
+          (Boolean) parseResult
+              .getParamValue(
+                  
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER);
 
       if (dispatcherThreads != null && dispatcherThreads > 1 && orderPolicy == 
null) {
         return ResultModel.createError(
@@ -306,6 +317,14 @@ public class CreateGatewaySenderCommand extends 
SingleGfshCommand {
             "Gateway Sender cannot be created with both 
--group-transaction-events and --enable-batch-conflation.");
       }
 
+      if (parallel && enforceThreadsConnectSameReceiver) {
+        return ResultModel
+            .createError(
+                "Option --" + 
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+                    + " only applies to serial gateway senders.");
+
+      }
+
       return ResultModel.createInfo("");
     }
   }
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
index 00efaa5..9d7e75f 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction.java
@@ -173,6 +173,13 @@ public class GatewaySenderCreateFunction implements 
InternalFunction<GatewaySend
             gatewayTransportFilterKlass, 
CliStrings.CREATE_GATEWAYSENDER__GATEWAYTRANSPORTFILTER));
       }
     }
+
+    Boolean enforceThreadsConnectSameReceiver =
+        gatewaySenderCreateArgs.getEnforceThreadsConnectSameReceiver();
+    if (enforceThreadsConnectSameReceiver != null) {
+      
gateway.setEnforceThreadsConnectSameReceiver(enforceThreadsConnectSameReceiver);
+    }
+
     return gateway.create(gatewaySenderCreateArgs.getId(),
         gatewaySenderCreateArgs.getRemoteDistributedSystemId());
   }
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
index 2b08ef9..dd70bda 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs.java
@@ -46,6 +46,7 @@ public class GatewaySenderFunctionArgs implements 
Serializable {
   // array of fully qualified class names of the filters
   private final List<String> gatewayEventFilters;
   private final List<String> gatewayTransportFilters;
+  private final Boolean enforceThreadsConnectSameReceiver;
 
   public GatewaySenderFunctionArgs(CacheConfig.GatewaySender sender) {
     this.id = sender.getId();
@@ -77,6 +78,7 @@ public class GatewaySenderFunctionArgs implements 
Serializable {
                 .stream().map(DeclarableType::getClassName)
                 .collect(Collectors.toList()))
             .orElse(null);
+    this.enforceThreadsConnectSameReceiver = 
sender.getEnforceThreadsConnectSameReceiver();
   }
 
   private Integer string2int(String x) {
@@ -158,4 +160,8 @@ public class GatewaySenderFunctionArgs implements 
Serializable {
   public List<String> getGatewayTransportFilter() {
     return this.gatewayTransportFilters;
   }
+
+  public Boolean getEnforceThreadsConnectSameReceiver() {
+    return this.enforceThreadsConnectSameReceiver;
+  }
 }
diff --git 
a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
 
b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
index c60d599..b368b1f 100644
--- 
a/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
+++ 
b/geode-gfsh/src/main/resources/org/apache/geode/management/internal/sanctioned-geode-gfsh-serializables.txt
@@ -65,7 +65,7 @@ 
org/apache/geode/management/internal/cli/functions/GatewayReceiverCreateFunction
 
org/apache/geode/management/internal/cli/functions/GatewaySenderCreateFunction,true,8746830191680509335
 
org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunction,true,1
 
org/apache/geode/management/internal/cli/functions/GatewaySenderDestroyFunctionArgs,true,3848480256348119530,id:java/lang/String,ifExists:boolean
-org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i
 [...]
+org/apache/geode/management/internal/cli/functions/GatewaySenderFunctionArgs,true,4636678328980816780,alertThreshold:java/lang/Integer,batchSize:java/lang/Integer,batchTimeInterval:java/lang/Integer,diskStoreName:java/lang/String,diskSynchronous:java/lang/Boolean,dispatcherThreads:java/lang/Integer,enableBatchConflation:java/lang/Boolean,enablePersistence:java/lang/Boolean,gatewayEventFilters:java/util/List,gatewayTransportFilters:java/util/List,groupTransactionEvents:java/lang/Boolean,i
 [...]
 
org/apache/geode/management/internal/cli/functions/GetMemberConfigInformationFunction,true,1
 
org/apache/geode/management/internal/cli/functions/GetRegionDescriptionFunction,true,1
 org/apache/geode/management/internal/cli/functions/GetRegionsFunction,true,1
diff --git 
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
index 2784ba5..585158d 100644
--- 
a/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
+++ 
b/geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/commands/CreateGatewaySenderCommandTest.java
@@ -275,7 +275,7 @@ public class CreateGatewaySenderCommandTest {
     
assertThat(argsArgumentCaptor.getValue().getGatewayEventFilter()).isNotNull().isEmpty();
     
assertThat(argsArgumentCaptor.getValue().getGatewayTransportFilter()).isNotNull().isEmpty();
     
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isNotNull();
-
+    
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
   }
 
   @Test
@@ -347,4 +347,70 @@ public class CreateGatewaySenderCommandTest {
     
assertThat(argsArgumentCaptor.getValue().mustGroupTransactionEvents()).isFalse();
 
   }
+
+  @Test
+  public void 
testEnforceThreadsConnectSameReceiverCannotBeUsedForParallelSenders() {
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 
--parallel --enforce-threads-connect-same-receiver")
+        .statusIsError()
+        .containsOutput(
+            "Option --" + 
CliStrings.CREATE_GATEWAYSENDER__ENFORCE_THREADS_CONNECT_SAME_RECEIVER
+                + " only applies to serial gateway senders.");
+  }
+
+  @Test
+  public void 
testEnforceThreadsConnectSameReceiverIsTrueWhenUsedWithoutValue() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, 
"cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 
--enforce-threads-connect-same-receiver")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), 
argsArgumentCaptor.capture(), any());
+
+    
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsFalseWhenSetToFalse() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, 
"cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 
--enforce-threads-connect-same-receiver=false")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), 
argsArgumentCaptor.capture(), any());
+
+    
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsTrueWhenSetToTrue() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, 
"cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1 
--enforce-threads-connect-same-receiver=true")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), 
argsArgumentCaptor.capture(), any());
+
+    
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isTrue();
+  }
+
+  @Test
+  public void testEnforceThreadsConnectSameReceiverIsFalseByDefault() {
+    doReturn(mock(Set.class)).when(command).getMembers(any(), any());
+    cliFunctionResult =
+        new CliFunctionResult("member", CliFunctionResult.StatusState.OK, 
"cliFunctionResult");
+    functionResults.add(cliFunctionResult);
+    gfsh.executeAndAssertThat(command,
+        "create gateway-sender --id=1 --remote-distributed-system-id=1")
+        .statusIsSuccess();
+    verify(command).executeAndGetFunctionResult(any(), 
argsArgumentCaptor.capture(), any());
+
+    
assertThat(argsArgumentCaptor.getValue().getEnforceThreadsConnectSameReceiver()).isFalse();
+  }
 }
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
index 734ec8e..854329e 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/CreateDestroyGatewaySenderCommandDUnitTest.java
@@ -112,7 +112,7 @@ public class CreateDestroyGatewaySenderCommandDUnitTest 
implements Serializable
       String xml = 
locator.getConfigurationPersistenceService().getConfiguration("cluster")
           .getCacheXmlContent();
       assertThat(xml).contains(
-          "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" 
parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" 
enable-persistence=\"false\" disk-synchronous=\"true\" 
group-transaction-events=\"false\"/>");
+          "<gateway-sender id=\"ln\" remote-distributed-system-id=\"2\" 
parallel=\"false\" manual-start=\"false\" enable-batch-conflation=\"false\" 
enable-persistence=\"false\" disk-synchronous=\"true\" 
group-transaction-events=\"false\" 
enforce-threads-connect-same-receiver=\"false\"/>");
     });
 
     // destroy gateway sender and verify AEQs cleaned up
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
index 1199fb9..4b7e330 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcher.java
@@ -18,8 +18,10 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.Vector;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Consumer;
+import java.util.regex.Pattern;
 
 import org.apache.logging.log4j.Logger;
 
@@ -63,6 +65,9 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
 
   private ReentrantReadWriteLock connectionLifeCycleLock = new 
ReentrantReadWriteLock();
 
+  protected static final String 
maxAttemptsReachedConnectingServerIdExceptionMessage =
+      "Reached max attempts number trying to connect to desired server id";
+
   /*
    * Called after each attempt at processing an outbound (dispatch) or inbound 
(ack)
    * message, whether the attempt is successful or not. The purpose is 
testability.
@@ -86,7 +91,6 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
   public 
GatewaySenderEventRemoteDispatcher(AbstractGatewaySenderEventProcessor 
eventProcessor) {
     this.processor = eventProcessor;
     this.sender = eventProcessor.getSender();
-    // this.ackReaderThread = new AckReaderThread(sender);
     try {
       initializeConnection();
     } catch (GatewaySenderException e) {
@@ -362,11 +366,70 @@ public class GatewaySenderEventRemoteDispatcher 
implements GatewaySenderEventDis
     }
   }
 
+  Connection retryInitializeConnection(Connection con) {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
+    String connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+    String expectedServerId = this.processor.getExpectedReceiverUniqueId();
+
+    if (expectedServerId.equals("")) {
+      if (isDebugEnabled) {
+        logger.debug("First dispatcher connected to server " + 
connectedServerId);
+      }
+      this.processor.setExpectedReceiverUniqueId(connectedServerId);
+      return con;
+    }
+
+    int attempt = 0;
+    final int attemptsPerServer = 5;
+    int maxAttempts = attemptsPerServer;
+    Vector<String> notExpectedServerIds = new Vector<String>();
+    boolean connectedToExpectedReceiver = 
connectedServerId.equals(expectedServerId);
+    while (!connectedToExpectedReceiver) {
+
+      if (isDebugEnabled) {
+        logger.debug("Dispatcher wants to connect to [" + expectedServerId
+            + "] but got connection to [" + connectedServerId + "]");
+      }
+      attempt++;
+      if (!notExpectedServerIds.contains(connectedServerId)) {
+        if (isDebugEnabled) {
+          logger.debug(
+              "Increasing dispatcher connection max retries number due to 
connection to unknown server ("
+                  + connectedServerId + ")");
+        }
+        notExpectedServerIds.add(connectedServerId);
+        maxAttempts += attemptsPerServer;
+      }
+
+      if (attempt >= maxAttempts) {
+        throw new 
ServerConnectivityException(maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [" + expectedServerId + "] (" + maxAttempts + " attempts).");
+      }
+
+      con.destroy();
+      this.sender.getProxy().returnConnection(con);
+      con = this.sender.getProxy().acquireConnection();
+
+      connectedServerId = con.getEndpoint().getMemberId().getUniqueId();
+      if (connectedServerId.equals(expectedServerId)) {
+        connectedToExpectedReceiver = true;
+      }
+    }
+
+    if (isDebugEnabled) {
+      logger.debug("Dispatcher connected to expected endpoint " + 
connectedServerId
+          + " after " + attempt + " retries.");
+    }
+    return con;
+  }
+
   /**
    * Initializes the <code>Connection</code>.
    *
    */
+  @VisibleForTesting
   void initializeConnection() throws GatewaySenderException, 
GemFireSecurityException {
+    final boolean isDebugEnabled = logger.isDebugEnabled();
     if (ackReaderThread != null) {
       ackReaderThread.shutDownAckReaderConnection(connection);
     }
@@ -397,26 +460,24 @@ public class GatewaySenderEventRemoteDispatcher 
implements GatewaySenderEventDis
           synchronized (this.sender.getLockForConcurrentDispatcher()) {
             ServerLocation server = this.sender.getServerLocation();
             if (server != null) {
-              if (logger.isDebugEnabled()) {
+              if (isDebugEnabled) {
                 logger.debug("ServerLocation is: {}. Connecting to this 
serverLocation...", server);
               }
               con = this.sender.getProxy().acquireConnection(server);
             } else {
-              if (logger.isDebugEnabled()) {
+              if (isDebugEnabled) {
                 logger.debug("ServerLocation is null. Creating new connection. 
");
               }
               con = this.sender.getProxy().acquireConnection();
-              // Acquired connection from pool!! Update the server location
-              // information in the sender and
-              // distribute the information to other senders ONLY IF THIS 
SENDER
-              // IS
-              // PRIMARY
-              if (this.sender.isPrimary()) {
-                if (sender.getServerLocation() == null) {
-                  sender.setServerLocation(con.getServer());
-                }
-                new UpdateAttributesProcessor(this.sender).distribute(false);
+            }
+            if (this.sender.getEnforceThreadsConnectSameReceiver()) {
+              con = retryInitializeConnection(con);
+            }
+            if (this.sender.isPrimary()) {
+              if (sender.getServerLocation() == null) {
+                sender.setServerLocation(con.getServer());
               }
+              new UpdateAttributesProcessor(this.sender).distribute(false);
             }
           }
         }
@@ -486,6 +547,12 @@ public class GatewaySenderEventRemoteDispatcher implements 
GatewaySenderEventDis
                 "No available connection was found, but the following active 
servers exist: %s",
                 buffer.toString());
       }
+      if (this.sender.getEnforceThreadsConnectSameReceiver() && e.getMessage() 
!= null) {
+        if 
(Pattern.compile(maxAttemptsReachedConnectingServerIdExceptionMessage + ".*")
+            .matcher(e.getMessage()).find()) {
+          ioMsg += " " + e.getMessage();
+        }
+      }
       IOException ex = new IOException(ioMsg);
       gse = new GatewaySenderException(
           String.format("%s : Could not connect due to: %s",
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
index 2a7cfd7..c0d2051 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderFactoryImpl.java
@@ -199,6 +199,13 @@ public class GatewaySenderFactoryImpl implements 
InternalGatewaySenderFactory {
   }
 
   @Override
+  public GatewaySenderFactory setEnforceThreadsConnectSameReceiver(
+      boolean enforceThreadsConnectSameReceiver) {
+    this.attrs.enforceThreadsConnectSameReceiver = 
enforceThreadsConnectSameReceiver;
+    return this;
+  }
+
+  @Override
   public GatewaySender create(String id, int remoteDSId) {
     int myDSId = 
InternalDistributedSystem.getAnyInstance().getDistributionManager()
         .getDistributedSystemId();
@@ -291,7 +298,6 @@ public class GatewaySenderFactoryImpl implements 
InternalGatewaySenderFactory {
       if (this.cache instanceof GemFireCacheImpl) {
         sender = new SerialGatewaySenderImpl(cache, statisticsClock, attrs);
         this.cache.addGatewaySender(sender);
-
         if (!this.attrs.isManualStart()) {
           sender.start();
         }
@@ -394,5 +400,7 @@ public class GatewaySenderFactoryImpl implements 
InternalGatewaySenderFactory {
     }
     this.attrs.eventSubstitutionFilter = 
senderCreation.getGatewayEventSubstitutionFilter();
     this.attrs.groupTransactionEvents = 
senderCreation.mustGroupTransactionEvents();
+    this.attrs.enforceThreadsConnectSameReceiver =
+        senderCreation.getEnforceThreadsConnectSameReceiver();
   }
 }
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
index f18ce81..ef3e599 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/RemoteSerialGatewaySenderEventProcessor.java
@@ -35,7 +35,7 @@ public class RemoteSerialGatewaySenderEventProcessor extends 
SerialGatewaySender
   @Override
   public void initializeEventDispatcher() {
     if (logger.isDebugEnabled()) {
-      logger.debug(" Creating the GatewayEventRemoteDispatcher");
+      logger.debug("Creating the GatewayEventRemoteDispatcher");
     }
     // In case of serial there is a way to create gatewaySender and attach
     // asyncEventListener. Not sure of the use-case but there are dunit tests
diff --git 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
index 3474b4a..97436a2 100644
--- 
a/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
+++ 
b/geode-wan/src/main/java/org/apache/geode/internal/cache/wan/serial/SerialGatewaySenderImpl.java
@@ -228,6 +228,7 @@ public class SerialGatewaySenderImpl extends 
AbstractRemoteGatewaySender {
     pf.dispatcherThreads = getDispatcherThreads();
     pf.orderPolicy = getOrderPolicy();
     pf.serverLocation = this.getServerLocation();
+    pf.enforceThreadsConnectSameReceiver = 
getEnforceThreadsConnectSameReceiver();
   }
 
   @Override
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
index 7cfe1f5..8b35ab1 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/GatewaySenderEventRemoteDispatcherJUnitTest.java
@@ -14,9 +14,11 @@
  */
 package org.apache.geode.internal.cache.wan;
 
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -24,11 +26,64 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
 
 import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.Endpoint;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.ServerLocation;
+import org.apache.geode.distributed.internal.ServerLocationAndMemberId;
+import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 
 public class GatewaySenderEventRemoteDispatcherJUnitTest {
+
+  @Mock
+  private AbstractGatewaySender senderMock;
+
+  @Mock
+  private AbstractGatewaySenderEventProcessor eventProcessorMock;
+
+  @InjectMocks
+  private GatewaySenderEventRemoteDispatcher eventDispatcher;
+
+  @Mock
+  private PoolImpl poolMock;
+
+  @Mock
+  private Connection connectionMock;
+
+  @Mock
+  private ServerQueueStatus serverQueueStatusMock;
+
+  @Mock
+  private Endpoint endpointMock;
+
+  @Mock
+  private DistributedMember memberIdMock;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(eventProcessorMock.getSender()).thenReturn(senderMock);
+
+    when(senderMock.isParallel()).thenReturn(false);
+    when(senderMock.getLockForConcurrentDispatcher()).thenReturn(new Object());
+    when(senderMock.getProxy()).thenReturn(poolMock);
+
+    when(poolMock.isDestroyed()).thenReturn(false);
+    when(poolMock.acquireConnection()).thenReturn(connectionMock);
+
+    when(connectionMock.getQueueStatus()).thenReturn(serverQueueStatusMock);
+  }
+
   @Test
   public void 
getConnectionShouldShutdownTheAckThreadReaderWhenEventProcessorIsShutDown() {
     AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
@@ -46,7 +101,7 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest {
   }
 
   @Test
-  public void 
shuttingDownAckThreadReaderConnectionShouldshutdownTheAckThreadReader() {
+  public void 
shuttingDownAckThreadReaderConnectionShouldShutdownTheAckThreadReader() {
     AbstractGatewaySender sender = mock(AbstractGatewaySender.class);
     AbstractGatewaySenderEventProcessor eventProcessor =
         mock(AbstractGatewaySenderEventProcessor.class);
@@ -77,4 +132,161 @@ public class GatewaySenderEventRemoteDispatcherJUnitTest {
     verify(dispatcher, times(1)).initializeConnection();
     verify(dispatcher, times(2)).getConnectionLifeCycleLock();
   }
+
+  @Test
+  public void 
initializeConnectionWithParallelSenderDoesNotRetryInitializeConnection() {
+    when(senderMock.isParallel()).thenReturn(true);
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(0)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).setServerLocation(any());
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameRecieverFalseDoesNotRetryInitializeConnection()
 {
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(false);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(poolMock, times(1)).acquireConnection();
+    verify(dispatcherSpy, times(0)).retryInitializeConnection(connectionMock);
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndNoExpectedReceiverIdSetsReceiverIdAndDoesNotReacquireConnection()
 {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("receiverId");
+    when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("");
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, 
times(1)).setExpectedReceiverUniqueId("receiverId");
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverDoesNotReacquireConnection()
 {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("expectedId");
+    
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(1)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndConnectedToExpectedReceiverOnSecondTryReacquiresConnectionOnce()
 {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    
when(memberIdMock.getUniqueId()).thenReturn("notExpectedId").thenReturn("expectedId");
+    
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+    dispatcherSpy.initializeConnection();
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(1)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(2)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndNoServersAvailableThrowsException()
 {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+    
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+    String expectedExceptionMessage =
+        "There are no active servers. "
+            + 
GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [expectedId] (10 attempts)";
+    assertThatThrownBy(() -> {
+      dispatcherSpy.initializeConnection();
+    
}).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(10)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
+  @Test
+  public void 
initializeConnectionWithSerialSenderAndEnforceThreadsConnectSameReceiverTrueAndMaxRetriesExceededAndServersAvailableThrowsException()
 {
+
+    when(senderMock.getEnforceThreadsConnectSameReceiver()).thenReturn(true);
+
+    when(connectionMock.getEndpoint()).thenReturn(endpointMock);
+    when(endpointMock.getMemberId()).thenReturn(memberIdMock);
+    when(memberIdMock.getUniqueId()).thenReturn("notExpectedId");
+    
when(eventProcessorMock.getExpectedReceiverUniqueId()).thenReturn("expectedId");
+    List<ServerLocationAndMemberId> currentServers = new ArrayList<>();
+    currentServers.add(new ServerLocationAndMemberId(new 
ServerLocation("host1", 1), "id1"));
+    currentServers.add(new ServerLocationAndMemberId(new 
ServerLocation("host2", 2), "id2"));
+    when(poolMock.getCurrentServers()).thenReturn(currentServers);
+
+    eventDispatcher = new 
GatewaySenderEventRemoteDispatcher(eventProcessorMock, null);
+    GatewaySenderEventRemoteDispatcher dispatcherSpy = spy(eventDispatcher);
+
+    String expectedExceptionMessage =
+        "No available connection was found, but the following active servers 
exist: host1:1@id1, host2:2@id2 "
+            + 
GatewaySenderEventRemoteDispatcher.maxAttemptsReachedConnectingServerIdExceptionMessage
+            + " [expectedId] (10 attempts)";
+    assertThatThrownBy(() -> {
+      dispatcherSpy.initializeConnection();
+    
}).isInstanceOf(GatewaySenderException.class).hasMessageContaining(expectedExceptionMessage);
+
+    verify(senderMock, times(1)).getLockForConcurrentDispatcher();
+    verify(senderMock, times(2)).getEnforceThreadsConnectSameReceiver();
+    verify(dispatcherSpy, times(1)).retryInitializeConnection(connectionMock);
+    verify(poolMock, times(10)).acquireConnection();
+    verify(eventProcessorMock, times(0)).setExpectedReceiverUniqueId(any());
+  }
+
 }

Reply via email to