This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new e627e60bae GEODE-10056: Improve gateway-receiver load balance (#7378)
e627e60bae is described below

commit e627e60bae087a2874f2439994ab6d745dbd66a1
Author: Jakov Varenina <62134331+jvaren...@users.noreply.github.com>
AuthorDate: Thu Sep 8 10:59:42 2022 +0200

    GEODE-10056: Improve gateway-receiver load balance (#7378)
    
    * GEODE-10056: Improve gateway-receiver load balance
    
    The problem is that servers send incorrect gateway-receiver connection
    load to locators within CacheServerLoadMessage. Additionally, locators
    do not refresh gateway-receivers load with the load received in
    CacheServerLoadMessage. The only time locator increments
    gateway-receiver load is after it receives
    ClientConnectionRequest{group=__recv_group...} and returns selected
    server in ClientConnectionResponse message. This is done only by
    coordinator, so that means that other locators will have load with
    initial values, since it is never updated.
    
    The solution is to correctly track gateway-receiver acceptor
    connection count and then based on it correctly calculate the load
    when sending CacheServerLoadMessage. Additionally each locator will
    read the load received from CacheServerLoadMessage and update load
    for gateway-receiver location id in group __recv__group accordingly.
    
    * Updates after the review
    
    * Fix for the flaky test cases
    
    * Updates after review
    
    * Empty commit to trigger test
    
    * Updates after review
    
    * Fix failed distributed test
    
    The test case testMultiUser failed because Wan service is available
    in geode-core distributed tests, and therefore test now throws:
    
    org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException
    : Locators must be configured before starting gateway-sender.
    
    instead of:
    
    java.lang.IllegalStateException: WAN service is not available.
    
    * Synchronize handling of receiver load
    
    This commit synchronizes the getting and sending of gateway-receiver
    load (CacheServerLoadMessage) on all servers.
---
 geode-core/build.gradle                            |   1 +
 .../WanConnectionsLoadBalanceDistributedTest.java  | 299 +++++++++++++
 .../internal/security/MultiGfshDUnitTest.java      |   3 +-
 .../internal/locator/ClientConnectionRequest.java  |   2 +-
 .../internal/locator/QueueConnectionRequest.java   |   7 +-
 .../geode/cache/server/internal/LoadMonitor.java   |  33 +-
 .../distributed/internal/LocatorLoadSnapshot.java  |  80 ++--
 .../geode/distributed/internal/ServerLocator.java  |  20 +-
 .../geode/internal/cache/CacheServerImpl.java      |   2 +-
 .../internal/cache/FindDurableQueueProcessor.java  |   7 +-
 ...JUnitTest.java => LocatorLoadSnapshotTest.java} | 466 +++++++++++----------
 11 files changed, 658 insertions(+), 262 deletions(-)

diff --git a/geode-core/build.gradle b/geode-core/build.gradle
index a63aefa0ae..e15b1eb6bb 100755
--- a/geode-core/build.gradle
+++ b/geode-core/build.gradle
@@ -378,6 +378,7 @@ dependencies {
 
 
   distributedTestImplementation(project(':geode-gfsh'))
+  distributedTestImplementation(project(':geode-wan'))
   distributedTestImplementation(project(':geode-junit')) {
     exclude module: 'geode-core'
   }
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/WanConnectionsLoadBalanceDistributedTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/WanConnectionsLoadBalanceDistributedTest.java
new file mode 100644
index 0000000000..503d5fdb75
--- /dev/null
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/distributed/internal/WanConnectionsLoadBalanceDistributedTest.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.distributed.internal;
+
+import static 
org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID;
+import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
+import static 
org.apache.geode.management.MXBeanAwaitility.awaitGatewaySenderMXBeanProxy;
+import static org.apache.geode.test.awaitility.GeodeAwaitility.await;
+import static org.assertj.core.api.Assertions.assertThat;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.server.ServerLoad;
+import org.apache.geode.cache.wan.GatewayReceiver;
+import org.apache.geode.cache.wan.GatewaySender;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.management.GatewaySenderMXBean;
+import org.apache.geode.management.internal.cli.util.CommandStringBuilder;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.test.awaitility.GeodeAwaitility;
+import org.apache.geode.test.dunit.VM;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.test.junit.rules.GfshCommandRule;
+
+@Category({WanTest.class})
+public class WanConnectionsLoadBalanceDistributedTest implements Serializable {
+
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9);
+
+  @Rule
+  public transient GfshCommandRule gfsh = new GfshCommandRule();
+
+  private MemberVM locator1Site2;
+  private MemberVM locator1Site1;
+  private MemberVM locator2Site1;
+
+  private MemberVM server1Site1;
+  private MemberVM server2Site1;
+  private MemberVM server3Site1;
+
+  private MemberVM server1Site2;
+  private MemberVM server2Site2;
+
+  private ClientVM clientSite2;
+
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE1 = "1";
+  private static final String DISTRIBUTED_SYSTEM_ID_SITE2 = "2";
+  private static final String REGION_NAME = "test1";
+  private static final int NUMBER_OF_DISPATCHER_THREADS = 21;
+
+  @Before
+  public void setupMultiSite() throws Exception {
+    Properties props = new Properties();
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE1);
+    locator1Site1 = clusterStartupRule.startLocatorVM(0, props);
+    locator2Site1 = clusterStartupRule.startLocatorVM(1, props, 
locator1Site1.getPort());
+
+    // start servers for site #1
+    server1Site1 =
+        clusterStartupRule.startServerVM(2, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server2Site1 =
+        clusterStartupRule.startServerVM(3, locator1Site1.getPort(), 
locator2Site1.getPort());
+    server3Site1 =
+        clusterStartupRule.startServerVM(4, locator1Site1.getPort(), 
locator2Site1.getPort());
+
+    connectGfshToSite(locator1Site1);
+
+    // create partition region on site #1
+    CommandStringBuilder regionCmd = new 
CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+
+    String csb = new CommandStringBuilder(CliStrings.CREATE_GATEWAYRECEIVER)
+        .addOption(CliStrings.CREATE_GATEWAYRECEIVER__BINDADDRESS, "localhost")
+        .getCommandString();
+
+    gfsh.executeAndAssertThat(csb).statusIsSuccess();
+
+    server1Site1.invoke(() -> verifyReceiverState(true));
+    server2Site1.invoke(() -> verifyReceiverState(true));
+    server3Site1.invoke(() -> verifyReceiverState(true));
+
+    props.setProperty(DISTRIBUTED_SYSTEM_ID, DISTRIBUTED_SYSTEM_ID_SITE2);
+    props.setProperty(REMOTE_LOCATORS,
+        "localhost[" + locator1Site1.getPort() + "],localhost[" + 
locator2Site1.getPort() + "]");
+    locator1Site2 = clusterStartupRule.startLocatorVM(5, props);
+
+    // start servers for site #2
+    server1Site2 = clusterStartupRule.startServerVM(6, 
locator1Site2.getPort());
+    server2Site2 = clusterStartupRule.startServerVM(7, 
locator1Site2.getPort());
+
+    // create parallel gateway-sender on site #2
+    connectGfshToSite(locator1Site2);
+    String command = new CommandStringBuilder(CliStrings.CREATE_GATEWAYSENDER)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ID, "ln")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__REMOTEDISTRIBUTEDSYSTEMID, 
"1")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__PARALLEL, "true")
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__DISPATCHERTHREADS,
+            "" + NUMBER_OF_DISPATCHER_THREADS)
+        .addOption(CliStrings.CREATE_GATEWAYSENDER__ORDERPOLICY, "key")
+        .getCommandString();
+    gfsh.executeAndAssertThat(command).statusIsSuccess();
+
+    verifyGatewaySenderState(server1Site2, true);
+    verifyGatewaySenderState(server2Site2, true);
+
+    // create partition region on site #2
+    regionCmd = new CommandStringBuilder(CliStrings.CREATE_REGION);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGION, REGION_NAME);
+    regionCmd.addOption(CliStrings.CREATE_REGION__REGIONSHORTCUT, "PARTITION");
+    regionCmd.addOption(CliStrings.CREATE_REGION__GATEWAYSENDERID, "ln");
+    regionCmd.addOption(CliStrings.CREATE_REGION__REDUNDANTCOPIES, "1");
+    gfsh.executeAndAssertThat(regionCmd.toString()).statusIsSuccess();
+  }
+
+  @Test
+  public void testGatewayConnectionLoadUpdatedOnBothLocators() throws 
Exception {
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 500);
+
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.06));
+  }
+
+  @Test
+  public void 
testGatewayConnectionLoadUpdatedLocatorsStopAndStartGatewaySenders()
+      throws Exception {
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.06));
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
 null);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0));
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
 null);
+
+    doPutsClientSite2(400, 800);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.06));
+  }
+
+  @Test
+  public void 
testGatewayConnectionLoadUpdatedLocatorsStopAndStartOneGatewaySender()
+      throws Exception {
+    // Do put operations to initialize gateway-sender connections
+    startClientSite2(locator1Site2.getPort());
+    doPutsClientSite2(0, 400);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.06));
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.STOP_GATEWAYSENDER,
+        server1Site2);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.03));
+
+    
executeGatewaySenderActionCommandAndValidateStateSite2(CliStrings.START_GATEWAYSENDER,
+        server1Site2);
+
+    doPutsClientSite2(400, 800);
+    checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal.valueOf(0.06));
+  }
+
+  void checkGatewayReceiverLoadUpdatedOnLocators(BigDecimal 
expectedConnectionLoad) {
+    locator1Site1.invoke(
+        () -> await().untilAsserted(() -> assertThat(
+            
getTotalGatewayReceiverConnectionLoad().compareTo(expectedConnectionLoad))
+                .isEqualTo(0)));
+    locator2Site1.invoke(
+        () -> await().untilAsserted(() -> assertThat(
+            
getTotalGatewayReceiverConnectionLoad().compareTo(expectedConnectionLoad))
+                .isEqualTo(0)));
+  }
+
+  BigDecimal getTotalGatewayReceiverConnectionLoad() {
+    Map<ServerLocationAndMemberId, ServerLoad> servers =
+        
ClusterStartupRule.getLocator().getServerLocatorAdvisee().getLoadSnapshot()
+            .getGatewayReceiverLoadMap();
+    BigDecimal totalLoadOnLocator = new BigDecimal(0);
+    for (ServerLoad load : servers.values()) {
+      BigDecimal serverLoad = new 
BigDecimal(String.valueOf(load.getConnectionLoad()));
+      totalLoadOnLocator = totalLoadOnLocator.add(serverLoad);
+    }
+    return totalLoadOnLocator;
+  }
+
+  static void verifyReceiverState(boolean isRunning) {
+    Set<GatewayReceiver> receivers = 
ClusterStartupRule.getCache().getGatewayReceivers();
+    for (GatewayReceiver receiver : receivers) {
+      assertThat(receiver.isRunning()).isEqualTo(isRunning);
+    }
+  }
+
+  static void validateGatewaySenderMXBeanProxy(final InternalDistributedMember 
member,
+      final String senderId, final boolean isRunning, final boolean isPaused) {
+    GatewaySenderMXBean gatewaySenderMXBean = 
awaitGatewaySenderMXBeanProxy(member, senderId);
+    GeodeAwaitility.await(
+        "Awaiting GatewaySenderMXBean.isRunning(" + isRunning + ").isPaused(" 
+ isPaused + ")")
+        .untilAsserted(() -> {
+          assertThat(gatewaySenderMXBean.isRunning()).isEqualTo(isRunning);
+          assertThat(gatewaySenderMXBean.isPaused()).isEqualTo(isPaused);
+        });
+    assertThat(gatewaySenderMXBean).isNotNull();
+  }
+
+  static void verifySenderState(String senderId, boolean isRunning, boolean 
isPaused) {
+    GatewaySender sender = 
ClusterStartupRule.getCache().getGatewaySenders().stream()
+        .filter(x -> senderId.equals(x.getId())).findFirst().orElse(null);
+    assertThat(sender.isRunning()).isEqualTo(isRunning);
+    assertThat(sender.isPaused()).isEqualTo(isPaused);
+  }
+
+  void verifyGatewaySenderState(MemberVM memberVM, boolean isRunning) {
+    memberVM.invoke(() -> verifySenderState("ln", isRunning, false));
+    locator1Site2.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(memberVM.getVM()), 
"ln", isRunning,
+            false));
+  }
+
+  void executeGatewaySenderActionCommandAndValidateStateSite2(String 
cliString, MemberVM memberVM)
+      throws Exception {
+    connectGfshToSite(locator1Site2);
+    String command;
+    if (memberVM == null) {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(server1Site2, isRunning(cliString));
+      verifyGatewaySenderState(server2Site2, isRunning(cliString));
+    } else {
+      command = new CommandStringBuilder(cliString)
+          .addOption(CliStrings.STOP_GATEWAYSENDER__ID, "ln")
+          .addOption(CliStrings.MEMBERS, 
getMember(memberVM.getVM()).toString())
+          .getCommandString();
+      gfsh.executeAndAssertThat(command).statusIsSuccess();
+      verifyGatewaySenderState(memberVM, isRunning(cliString));
+    }
+  }
+
+  boolean isRunning(String cliString) {
+    return CliStrings.START_GATEWAYSENDER.equals(cliString);
+  }
+
+  public static InternalDistributedMember getMember(final VM vm) {
+    return vm.invoke(() -> ClusterStartupRule.getCache().getMyId());
+  }
+
+  void startClientSite2(int locatorPort) throws Exception {
+    clientSite2 =
+        clusterStartupRule.startClientVM(8, c -> 
c.withLocatorConnection(locatorPort));
+    clientSite2.invoke(() -> {
+      ClusterStartupRule.clientCacheRule.createProxyRegion(REGION_NAME);
+    });
+  }
+
+  void doPutsClientSite2(int startRange, int stopRange) {
+    clientSite2.invoke(() -> {
+      Region<Integer, Integer> region =
+          ClusterStartupRule.clientCacheRule.getCache().getRegion(REGION_NAME);
+      for (int i = startRange; i < stopRange; i++) {
+        region.put(i, i);
+      }
+    });
+  }
+
+  void connectGfshToSite(MemberVM locator) throws Exception {
+    if (gfsh.isConnected()) {
+      gfsh.disconnect();
+    }
+    gfsh.connectAndVerify(locator);
+  }
+}
diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/security/MultiGfshDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/security/MultiGfshDUnitTest.java
index 35466cd27a..59ccd66bc3 100644
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/management/internal/security/MultiGfshDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/management/internal/security/MultiGfshDUnitTest.java
@@ -51,7 +51,8 @@ public class MultiGfshDUnitTest {
   public void testMultiUser() throws Exception {
     IgnoredException.addIgnoredException("java.util.zip.ZipException: zip file 
is empty");
     IgnoredException
-        .addIgnoredException("java.lang.IllegalStateException: WAN service is 
not available.");
+        .addIgnoredException(
+            
"org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException: 
Locators must be configured before starting gateway-sender.");
     int jmxPort = server.getJmxPort();
 
     // set up vm_1 as a gfsh vm, data-reader will login and log out constantly 
in this vm until the
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/ClientConnectionRequest.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/ClientConnectionRequest.java
index 31975aac54..96625a22a9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/ClientConnectionRequest.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/ClientConnectionRequest.java
@@ -56,7 +56,7 @@ public class ClientConnectionRequest extends 
ServerLocationRequest {
     SerializationHelper.writeServerLocationSet(excludedServers, out);
   }
 
-  public Set getExcludedServers() {
+  public Set<ServerLocation> getExcludedServers() {
     return excludedServers;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/QueueConnectionRequest.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/QueueConnectionRequest.java
index 51bb7d5311..4659c46988 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/QueueConnectionRequest.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/locator/QueueConnectionRequest.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.Set;
 
 import org.apache.geode.DataSerializer;
+import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DeserializationContext;
@@ -32,7 +33,7 @@ import 
org.apache.geode.internal.serialization.SerializationContext;
  */
 public class QueueConnectionRequest extends ServerLocationRequest {
   private ClientProxyMembershipID proxyId;
-  private Set excludedServers;
+  private Set<ServerLocation> excludedServers;
   private int redundantCopies;
   private boolean findDurable = false;
 
@@ -41,7 +42,7 @@ public class QueueConnectionRequest extends 
ServerLocationRequest {
   }
 
   public QueueConnectionRequest(ClientProxyMembershipID proxyId, int 
redundantCopies,
-      Set excludedServers, String serverGroup, boolean findDurable) {
+      Set<ServerLocation> excludedServers, String serverGroup, boolean 
findDurable) {
     super(serverGroup);
     this.proxyId = proxyId;
     this.excludedServers = excludedServers;
@@ -70,7 +71,7 @@ public class QueueConnectionRequest extends 
ServerLocationRequest {
     out.writeBoolean(findDurable);
   }
 
-  public Set getExcludedServers() {
+  public Set<ServerLocation> getExcludedServers() {
     return excludedServers;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
 
b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
index aab2ce93d3..f2f8242c3f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/server/internal/LoadMonitor.java
@@ -49,6 +49,7 @@ public class LoadMonitor implements ConnectionListener {
   private final PollingThread pollingThread;
   protected volatile ServerLoad lastLoad;
   protected CacheServerStats stats;
+  private boolean isGatewayReceiver;
 
   public LoadMonitor(ServerLoadProbe probe, int maxConnections, long 
pollInterval,
       int forceUpdateFrequency, CacheServerAdvisor advisor) {
@@ -63,12 +64,14 @@ public class LoadMonitor implements ConnectionListener {
    * Start the load monitor. Starts the background thread which polls the load 
monitor and sends
    * updates about load.
    */
-  public void start(ServerLocation location, CacheServerStats 
cacheServerStats) {
+  public void start(ServerLocation location, CacheServerStats cacheServerStats,
+      boolean isGatewayReceiver) {
     probe.open();
     this.location = location;
     pollingThread.start();
     stats = cacheServerStats;
     stats.setLoad(lastLoad);
+    this.isGatewayReceiver = isGatewayReceiver;
   }
 
   /**
@@ -87,7 +90,7 @@ public class LoadMonitor implements ConnectionListener {
 
   @Override
   public void connectionClosed(boolean lastConnection, CommunicationMode 
communicationMode) {
-    if (communicationMode.isClientOperations()) {
+    if (communicationMode.isClientOperations() || communicationMode.isWAN()) {
       metrics.decConnectionCount();
     }
     if (lastConnection) {
@@ -101,7 +104,7 @@ public class LoadMonitor implements ConnectionListener {
 
   @Override
   public void connectionOpened(boolean firstConnection, CommunicationMode 
communicationMode) {
-    if (communicationMode.isClientOperations()) {
+    if (communicationMode.isClientOperations() || communicationMode.isWAN()) {
       metrics.incConnectionCount();
     }
     if (firstConnection) {
@@ -159,13 +162,33 @@ public class LoadMonitor implements ConnectionListener {
       }
     }
 
+    /**
+     * This function calculates next interval absolute time that is same on 
all servers in
+     * the cluster if following conditions are fulfilled:
+     * - same pollInterval value is used
+     * - time is synchronized on servers
+     *
+     * @return absolute time of next interval
+     */
+    private long getNextIntervalSynchronizedAbsoluteTime(final long 
currentTime,
+        final long pollInterval) {
+      return (currentTime - (currentTime % pollInterval)) + pollInterval;
+    }
+
     @Override
     public void run() {
       while (alive) {
         try {
           synchronized (signal) {
-            long end = System.currentTimeMillis() + pollInterval;
-            long remaining = pollInterval;
+            long currentTime = System.currentTimeMillis();
+            long end, remaining;
+            if (isGatewayReceiver) {
+              end = getNextIntervalSynchronizedAbsoluteTime(currentTime, 
pollInterval);
+              remaining = end - currentTime;
+            } else {
+              end = currentTime + pollInterval;
+              remaining = pollInterval;
+            }
             while (alive && remaining > 0) {
               signal.wait(remaining);
               remaining = end - System.currentTimeMillis();
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
index fa0798b2c2..f5182d18a6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/LocatorLoadSnapshot.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
+import org.jetbrains.annotations.TestOnly;
+
 import org.apache.geode.InternalGemFireException;
 import org.apache.geode.annotations.VisibleForTesting;
 import org.apache.geode.cache.server.ServerLoad;
@@ -129,7 +131,6 @@ public class LocatorLoadSnapshot {
    */
   synchronized void updateLoad(ServerLocation location, String memberId, 
ServerLoad newLoad,
       List<ClientProxyMembershipID> clientIds) {
-
     String[] groups = serverGroupMap.get(location);
     // the server was asynchronously removed, so don't do anything.
     if (groups == null) {
@@ -142,9 +143,9 @@ public class LocatorLoadSnapshot {
       }
     }
 
-    updateMap(connectionLoadMap, location, memberId, 
newLoad.getConnectionLoad(),
+    updateConnectionLoadMap(location, memberId, newLoad.getConnectionLoad(),
         newLoad.getLoadPerConnection());
-    updateMap(queueLoadMap, location, newLoad.getSubscriptionConnectionLoad(),
+    updateQueueLoadMap(location, newLoad.getSubscriptionConnectionLoad(),
         newLoad.getLoadPerSubscriptionConnection());
   }
 
@@ -253,17 +254,17 @@ public class LocatorLoadSnapshot {
     }
 
     {
-      List bestLHs = findBestServers(groupServers, excludedServers, 1);
+      List<LoadHolder> bestLHs = findBestServers(groupServers, 
excludedServers, 1);
       if (bestLHs.isEmpty()) {
         return null;
       }
-      LoadHolder lh = (LoadHolder) bestLHs.get(0);
+      LoadHolder lh = bestLHs.get(0);
       lh.incConnections();
       return lh.getLocation();
     }
   }
 
-  public synchronized ArrayList getServers(String group) {
+  public synchronized ArrayList<ServerLocation> getServers(String group) {
     if ("".equals(group)) {
       group = null;
     }
@@ -271,7 +272,7 @@ public class LocatorLoadSnapshot {
     if (groupServers == null || groupServers.isEmpty()) {
       return null;
     }
-    ArrayList result = new ArrayList<>();
+    ArrayList<ServerLocation> result = new ArrayList<>();
     for (ServerLocationAndMemberId locationAndMemberId : 
groupServers.keySet()) {
       result.add(locationAndMemberId.getServerLocation());
     }
@@ -332,7 +333,8 @@ public class LocatorLoadSnapshot {
    * @return a list containing the best servers. The size of the list will be 
less than or equal to
    *         count, depending on if there are enough servers available.
    */
-  public List getServersForQueue(String group, Set<ServerLocation> 
excludedServers, int count) {
+  public List<ServerLocation> getServersForQueue(String group, 
Set<ServerLocation> excludedServers,
+      int count) {
     return getServersForQueue(null, group, excludedServers, count);
   }
 
@@ -412,7 +414,24 @@ public class LocatorLoadSnapshot {
           new ServerLoad(connectionLoad.getLoad(), 
connectionLoad.getLoadPerConnection(),
               queueLoad.getLoad(), queueLoad.getLoadPerConnection()));
     }
+    return result;
+  }
 
+  @TestOnly
+  synchronized Map<ServerLocationAndMemberId, ServerLoad> 
getGatewayReceiverLoadMap() {
+    Map<ServerLocationAndMemberId, LoadHolder> connectionMap =
+        connectionLoadMap.get(GatewayReceiver.RECEIVER_GROUP);
+    Map<ServerLocationAndMemberId, ServerLoad> result = new HashMap<>();
+    if (connectionMap == null) {
+      return result;
+    }
+    for (Entry<ServerLocationAndMemberId, LoadHolder> entry : 
connectionMap.entrySet()) {
+      ServerLocationAndMemberId member = new 
ServerLocationAndMemberId(entry.getKey()
+          .getServerLocation(), entry.getKey().getMemberId());
+      ServerLoad serverLoad =
+          new ServerLoad(entry.getValue().getLoad(), 
entry.getValue().getLoadPerConnection(), 0, 0);
+      result.put(member, serverLoad);
+    }
     return result;
   }
 
@@ -424,7 +443,7 @@ public class LocatorLoadSnapshot {
       groupMap.put(holder.getLocation(), holder);
     }
     // Special case for GatewayReceiver where we don't put those 
serverlocation against holder
-    if (!(groups.length > 0 && 
groups[0].equals(GatewayReceiver.RECEIVER_GROUP))) {
+    if (!isGatewayReceiverGroup(groups)) {
       Map<ServerLocation, LoadHolder> groupMap = map.computeIfAbsent(null, k 
-> new HashMap<>());
       groupMap.put(holder.getLocation(), holder);
     }
@@ -440,13 +459,17 @@ public class LocatorLoadSnapshot {
       groupMap.put(new ServerLocationAndMemberId(holder.getLocation(), 
memberId), holder);
     }
     // Special case for GatewayReceiver where we don't put those 
serverlocation against holder
-    if (!(groups.length > 0 && 
groups[0].equals(GatewayReceiver.RECEIVER_GROUP))) {
+    if (!isGatewayReceiverGroup(groups)) {
       Map<ServerLocationAndMemberId, LoadHolder> groupMap =
           map.computeIfAbsent(null, k -> new HashMap<>());
       groupMap.put(new ServerLocationAndMemberId(holder.getLocation(), 
memberId), holder);
     }
   }
 
+  boolean isGatewayReceiverGroup(String[] groups) {
+    return groups.length > 0 && 
groups[0].equals(GatewayReceiver.RECEIVER_GROUP);
+  }
+
   @VisibleForTesting
   void removeFromMap(Map<String, Map<ServerLocation, LoadHolder>> map, 
String[] groups,
       ServerLocation location) {
@@ -459,7 +482,7 @@ public class LocatorLoadSnapshot {
         }
       }
     }
-    Map groupMap = map.get(null);
+    Map<ServerLocation, LoadHolder> groupMap = map.get(null);
     groupMap.remove(location);
   }
 
@@ -478,27 +501,34 @@ public class LocatorLoadSnapshot {
         }
       }
     }
-    Map groupMap = map.get(null);
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = map.get(null);
     groupMap.remove(locationAndMemberId);
   }
 
   @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, float load, float 
loadPerConnection) {
-    updateMap(map, location, "", load, loadPerConnection);
+  void updateConnectionLoadMap(ServerLocation location, String memberId, float 
load,
+      float loadPerConnection) {
+    ServerLocationAndMemberId locationAndMemberId =
+        new ServerLocationAndMemberId(location, memberId);
+
+    Map<ServerLocationAndMemberId, LoadHolder> groupMap = 
connectionLoadMap.get(null);
+    LoadHolder holder = groupMap.get(locationAndMemberId);
+    if (holder == null) {
+      groupMap = connectionLoadMap.get(GatewayReceiver.RECEIVER_GROUP);
+      if (groupMap != null) {
+        holder = groupMap.get(locationAndMemberId);
+      }
+    }
+
+    if (holder != null) {
+      holder.setLoad(load, loadPerConnection);
+    }
   }
 
   @VisibleForTesting
-  void updateMap(Map map, ServerLocation location, String memberId, float load,
-      float loadPerConnection) {
-    Map groupMap = (Map) map.get(null);
-    LoadHolder holder;
-    if (memberId.equals("")) {
-      holder = (LoadHolder) groupMap.get(location);
-    } else {
-      ServerLocationAndMemberId locationAndMemberId =
-          new ServerLocationAndMemberId(location, memberId);
-      holder = (LoadHolder) groupMap.get(locationAndMemberId);
-    }
+  void updateQueueLoadMap(ServerLocation location, float load, float 
loadPerConnection) {
+    Map<ServerLocation, LoadHolder> groupMap = queueLoadMap.get(null);
+    LoadHolder holder = groupMap.get(location);
     if (holder != null) {
       holder.setLoad(load, loadPerConnection);
     }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
index a27e97d4ab..fbab03dcf8 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/ServerLocator.java
@@ -54,6 +54,7 @@ import org.apache.geode.internal.cache.ControllerAdvisor;
 import org.apache.geode.internal.cache.ControllerAdvisor.ControllerProfile;
 import org.apache.geode.internal.cache.FindDurableQueueProcessor;
 import org.apache.geode.internal.cache.GridAdvisor.GridProfile;
+import org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.inet.LocalHostUtil;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.logging.internal.log4j.api.LogService;
@@ -80,15 +81,10 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
   @MakeNotStatic
   private static final AtomicInteger profileSN = new AtomicInteger();
 
-  private static final long SERVER_LOAD_LOG_INTERVAL = (60 * 60 * 1000); // 
log server load once an
-                                                                         // 
hour
-
   private final String logFile;
   private final String hostName;
   private final String memberName;
 
-  private volatile long lastLogTime;
-
   ServerLocator() throws IOException {
     port = 10334;
     hostName = LocalHostUtil.getCanonicalLocalHostName();
@@ -223,7 +219,7 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
 
 
   private GetAllServersResponse pickAllServers(GetAllServersRequest 
clientRequest) {
-    ArrayList servers = 
loadSnapshot.getServers(clientRequest.getServerGroup());
+    ArrayList<ServerLocation> servers = 
loadSnapshot.getServers(clientRequest.getServerGroup());
     return new GetAllServersResponse(servers);
   }
 
@@ -234,11 +230,11 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
   }
 
   private Object pickQueueServers(QueueConnectionRequest clientRequest) {
-    Set excludedServers = new HashSet(clientRequest.getExcludedServers());
+    Set<ServerLocation> excludedServers = new 
HashSet<>(clientRequest.getExcludedServers());
 
     /* If this is a request to find durable queues, lets go find them */
 
-    ArrayList servers = new ArrayList();
+    List<ServerLocation> servers = new ArrayList<>();
     boolean durableQueueFound = false;
     if (clientRequest.isFindDurable() && 
clientRequest.getProxyId().isDurable()) {
       servers = FindDurableQueueProcessor.sendAndFind(this, 
clientRequest.getProxyId(),
@@ -248,7 +244,7 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
       durableQueueFound = servers.size() > 0;
     }
 
-    List candidates;
+    List<ServerLocation> candidates;
     if (clientRequest.getRedundantCopies() == -1) {
       /* We need all the servers we can get */
       candidates = loadSnapshot.getServersForQueue(clientRequest.getProxyId(),
@@ -371,8 +367,6 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
     stats.endLocatorResponse(startTime);
   }
 
-
-
   private List<ServerLocation> getLocators() {
     if (cachedLocators != null) {
       return cachedLocators;
@@ -438,7 +432,7 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
   }
 
   public void updateLoad(ServerLocation location, String memberId, ServerLoad 
load,
-      List clientIds) {
+      List<ClientProxyMembershipID> clientIds) {
     if (getLogWriter().fineEnabled()) {
       getLogWriter()
           .fine("ServerLocator: Received a load update from " + location + " 
at " + memberId + " , "
@@ -453,7 +447,7 @@ public class ServerLocator implements TcpHandler, 
RestartHandler, DistributionAd
    * with the current load on that server
    */
 
-  public Map getLoadMap() {
+  public Map<ServerLocation, ServerLoad> getLoadMap() {
     return loadSnapshot.getLoadMap();
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
index 25d6f70b33..a921f543d7 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/CacheServerImpl.java
@@ -379,7 +379,7 @@ public class CacheServerImpl extends AbstractCacheServer 
implements Distribution
     acceptor.start();
     advisor.handshake();
     loadMonitor.start(new ServerLocation(getExternalAddress(), getPort()),
-        acceptor.getStats());
+        acceptor.getStats(), acceptor.isGatewayReceiver());
 
     // TODO : Need to provide facility to enable/disable client health 
monitoring.
     // Creating ClientHealthMonitoring region.
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
index 3b5d2597f2..05075fb83b 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FindDurableQueueProcessor.java
@@ -49,9 +49,10 @@ import 
org.apache.geode.logging.internal.log4j.api.LogService;
 public class FindDurableQueueProcessor extends ReplyProcessor21 {
   private static final Logger logger = LogService.getLogger();
 
-  final ArrayList durableLocations = new ArrayList();
+  final ArrayList<ServerLocation> durableLocations = new ArrayList();
 
-  public static ArrayList sendAndFind(ServerLocator locator, 
ClientProxyMembershipID proxyId,
+  public static ArrayList<ServerLocation> sendAndFind(ServerLocator locator,
+      ClientProxyMembershipID proxyId,
       DistributionManager dm) {
     Set members = ((GridAdvisor) 
locator.getDistributionAdvisor()).adviseBridgeServers();
     if (members.contains(dm.getId())) {
@@ -68,7 +69,7 @@ public class FindDurableQueueProcessor extends 
ReplyProcessor21 {
     } catch (ReplyException e) {
       e.handleCause();
     }
-    ArrayList locations = processor.durableLocations;
+    ArrayList<ServerLocation> locations = processor.durableLocations;
     // This will add any local queues to the list
     findLocalDurableQueues(proxyId, locations);
     return locations;
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotTest.java
similarity index 68%
rename from 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
rename to 
geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotTest.java
index 9789bc6e24..e92cabd4ed 100644
--- 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/LocatorLoadSnapshotTest.java
@@ -14,11 +14,9 @@
  */
 package org.apache.geode.distributed.internal;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.entry;
+import static org.assertj.core.data.Offset.offset;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -26,11 +24,13 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.jupiter.api.Test;
 
 import org.apache.geode.cache.server.ServerLoad;
+import org.apache.geode.cache.wan.GatewayReceiver;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.test.junit.categories.MembershipTest;
 
@@ -39,7 +39,7 @@ import org.apache.geode.test.junit.categories.MembershipTest;
  * the locator to compare the load between multiple servers.
  */
 @Category({MembershipTest.class})
-public class LocatorLoadSnapshotJUnitTest {
+public class LocatorLoadSnapshotTest {
 
   final int LOAD_POLL_INTERVAL = 30000;
 
@@ -49,9 +49,10 @@ public class LocatorLoadSnapshotJUnitTest {
   @Test
   public void testEmptySnapshot() {
     final LocatorLoadSnapshot sn = new LocatorLoadSnapshot();
-    assertNull(sn.getServerForConnection("group", Collections.EMPTY_SET));
-    assertNull(sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(Collections.EMPTY_LIST, sn.getServersForQueue(null, 
Collections.EMPTY_SET, 5));
+    assertThat(sn.getServerForConnection("group", 
Collections.emptySet())).isNull();
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isNull();
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 5))
+        .isEqualTo(Collections.EMPTY_LIST);
   }
 
   /**
@@ -71,34 +72,35 @@ public class LocatorLoadSnapshotJUnitTest {
     sn.addServer(l1, uniqueId1, new String[0], ld1, LOAD_POLL_INTERVAL);
     sn.addServer(l2, uniqueId2, new String[0], ld2, LOAD_POLL_INTERVAL);
 
-    HashMap expectedLoad = new HashMap();
+    Map<ServerLocation, ServerLoad> expectedLoad = new HashMap<>();
     expectedLoad.put(l1, ld1);
     expectedLoad.put(l2, ld2);
-    assertEquals(expectedLoad, sn.getLoadMap());
+    assertThat(sn.getLoadMap()).isEqualTo(expectedLoad);
 
-    assertNull(sn.getServerForConnection("group", Collections.EMPTY_SET));
-    assertEquals(Collections.EMPTY_LIST, sn.getServersForQueue("group", 
Collections.EMPTY_SET, 5));
+    assertThat(sn.getServerForConnection("group", 
Collections.emptySet())).isNull();
+    assertThat(sn.getServersForQueue("group", Collections.emptySet(), 5))
+        .isEqualTo(Collections.EMPTY_LIST);
 
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
     // the load should be equal here, so we don't know which server to expect
-    sn.getServerForConnection(null, Collections.EMPTY_SET);
-    sn.getServerForConnection(null, Collections.EMPTY_SET);
-
-    assertEquals(l2, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection(null, Collections.EMPTY_SET));
-
-    assertEquals(Collections.singletonList(l2),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, 1));
-    assertEquals(Collections.singletonList(l1),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, 1));
-    assertEquals(Collections.singletonList(l2),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, 1));
-
-    assertEquals(Arrays.asList(l2, l1),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, 5));
-    assertEquals(Arrays.asList(l2, l1),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, -1));
+    sn.getServerForConnection(null, Collections.emptySet());
+    sn.getServerForConnection(null, Collections.emptySet());
+
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l2);
+
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 1))
+        .isEqualTo(Collections.singletonList(l2));
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 1))
+        .isEqualTo(Collections.singletonList(l1));
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 1))
+        .isEqualTo(Collections.singletonList(l2));
+
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 5))
+        .isEqualTo(Arrays.asList(l2, l1));
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), -1))
+        .isEqualTo(Arrays.asList(l2, l1));
   }
 
   /**
@@ -115,11 +117,11 @@ public class LocatorLoadSnapshotJUnitTest {
     sn.addServer(l2, uniqueId2, new String[0], new ServerLoad(100, .2f, 1, 
.2f),
         LOAD_POLL_INTERVAL);
 
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
     sn.updateLoad(l1, uniqueId1, new ServerLoad(200, 1, 1, 1));
-    assertEquals(l2, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection(null, Collections.EMPTY_SET));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l2);
   }
 
   /**
@@ -137,13 +139,16 @@ public class LocatorLoadSnapshotJUnitTest {
     sn.addServer(l2, uniqueId2, new String[0], new ServerLoad(100, .2f, 10, 
.2f),
         LOAD_POLL_INTERVAL);
 
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(Arrays.asList(l1, l2),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, -1));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
+
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), -1))
+        .isEqualTo(Arrays.asList(l1, l2));
     sn.removeServer(l1, uniqueId1);
-    assertEquals(l2, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(Collections.singletonList(l2),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, -1));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), -1))
+        .isEqualTo(Collections.singletonList(l2));
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 
-1)).isEqualTo(
+        Collections.singletonList(l2));
   }
 
   /**
@@ -160,33 +165,33 @@ public class LocatorLoadSnapshotJUnitTest {
         LOAD_POLL_INTERVAL);
     sn.addServer(l2, uniqueId2, new String[] {"b", "c"}, new ServerLoad(1, 1, 
1, 1),
         LOAD_POLL_INTERVAL);
-    assertNotNull(sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l1, sn.getServerForConnection("a", Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection("c", Collections.EMPTY_SET));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isNotNull();
+    assertThat(sn.getServerForConnection("a", 
Collections.emptySet())).isEqualTo(l1);
+    assertThat(sn.getServerForConnection("c", 
Collections.emptySet())).isEqualTo(l2);
     sn.updateLoad(l1, uniqueId1, new ServerLoad(10, 1, 1, 1));
-    assertEquals(l2, sn.getServerForConnection("b", Collections.EMPTY_SET));
+    assertThat(sn.getServerForConnection("b", 
Collections.emptySet())).isEqualTo(l2);
     sn.updateLoad(l2, uniqueId2, new ServerLoad(100, 1, 1, 1));
-    assertEquals(l1, sn.getServerForConnection("b", Collections.EMPTY_SET));
-    assertEquals(Arrays.asList(l1),
-        sn.getServersForQueue("a", Collections.EMPTY_SET, -1));
-    assertEquals(Arrays.asList(l2),
-        sn.getServersForQueue("c", Collections.EMPTY_SET, -1));
-    assertEquals(Arrays.asList(l1, l2),
-        sn.getServersForQueue("b", Collections.EMPTY_SET, -1));
-    assertEquals(Arrays.asList(l1, l2),
-        sn.getServersForQueue(null, Collections.EMPTY_SET, -1));
-    assertEquals(Arrays.asList(l1, l2),
-        sn.getServersForQueue("b", Collections.EMPTY_SET, 5));
+    assertThat(sn.getServerForConnection("b", 
Collections.emptySet())).isEqualTo(l1);
+    assertThat(sn.getServersForQueue("a", Collections.emptySet(), 
-1)).isEqualTo(
+        Collections.singletonList(l1));
+    assertThat(sn.getServersForQueue("c", Collections.emptySet(), 
-1)).isEqualTo(
+        Collections.singletonList(l2));
+    assertThat(sn.getServersForQueue("b", Collections.emptySet(), 
-1)).isEqualTo(
+        Arrays.asList(l1, l2));
+    assertThat(sn.getServersForQueue(null, Collections.emptySet(), 
-1)).isEqualTo(
+        Arrays.asList(l1, l2));
+    assertThat(sn.getServersForQueue("b", Collections.emptySet(), 
5)).isEqualTo(
+        Arrays.asList(l1, l2));
 
     sn.removeServer(l1, uniqueId1);
-    assertEquals(l2, sn.getServerForConnection("b", Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection("b", Collections.EMPTY_SET));
-    assertNull(sn.getServerForConnection("a", Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection("c", Collections.EMPTY_SET));
-    assertEquals(Arrays.asList(),
-        sn.getServersForQueue("a", Collections.EMPTY_SET, -1));
-    assertEquals(Arrays.asList(l2),
-        sn.getServersForQueue("b", Collections.EMPTY_SET, 5));
+    assertThat(sn.getServerForConnection("b", 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServerForConnection("b", 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServerForConnection("a", 
Collections.emptySet())).isNull();
+    assertThat(sn.getServerForConnection("c", 
Collections.emptySet())).isEqualTo(l2);
+    assertThat(sn.getServersForQueue("a", Collections.emptySet(), 
-1)).isEqualTo(
+        Collections.emptyList());
+    assertThat(sn.getServersForQueue("b", Collections.emptySet(), 
5)).isEqualTo(
+        Collections.singletonList(l2));
   }
 
   /**
@@ -209,18 +214,18 @@ public class LocatorLoadSnapshotJUnitTest {
 
     // Test with interleaving requests for either group
     for (int i = 0; i < 60; i++) {
-      ServerLocation l = sn.getServerForConnection("a", Collections.EMPTY_SET);
-      assertTrue(l1.equals(l) || l2.equals(l));
-      l = sn.getServerForConnection("b", Collections.EMPTY_SET);
-      assertTrue(l2.equals(l) || l3.equals(l));
+      ServerLocation l = sn.getServerForConnection("a", 
Collections.emptySet());
+      assertThat(l1.equals(l) || l2.equals(l)).isTrue();
+      l = sn.getServerForConnection("b", Collections.emptySet());
+      assertThat(l2.equals(l) || l3.equals(l)).isTrue();
     }
 
-    Map expected = new HashMap();
+    Map<ServerLocation, ServerLoad> expected = new HashMap<>();
     ServerLoad expectedLoad = new ServerLoad(40f, 1f, 0f, 1f);
     expected.put(l1, expectedLoad);
     expected.put(l2, expectedLoad);
     expected.put(l3, expectedLoad);
-    assertEquals(expected, sn.getLoadMap());
+    assertThat(sn.getLoadMap()).isEqualTo(expected);
 
     sn.updateLoad(l1, uniqueId1, new ServerLoad(0, 1, 0, 1));
     sn.updateLoad(l2, uniqueId2, new ServerLoad(0, 1, 0, 1));
@@ -230,30 +235,30 @@ public class LocatorLoadSnapshotJUnitTest {
     // Now do the same test, but make all the requests for one group first,
     // then the second group.
     for (int i = 0; i < 60; i++) {
-      ServerLocation l = sn.getServerForConnection("a", Collections.EMPTY_SET);
-      assertTrue(l1.equals(l) || l2.equals(l));
+      ServerLocation l = sn.getServerForConnection("a", 
Collections.emptySet());
+      assertThat(l1.equals(l) || l2.equals(l)).isTrue();
     }
 
-    expected = new HashMap();
+    expected = new HashMap<>();
     expected.put(l1, new ServerLoad(30f, 1f, 0f, 1f));
     expected.put(l2, new ServerLoad(30f, 1f, 0f, 1f));
     expected.put(l3, new ServerLoad(0f, 1f, 0f, 1f));
-    assertEquals(expected, sn.getLoadMap());
+    assertThat(sn.getLoadMap()).isEqualTo(expected);
 
     for (int i = 0; i < 60; i++) {
-      ServerLocation l = sn.getServerForConnection("b", Collections.EMPTY_SET);
-      assertTrue(l2.equals(l) || l3.equals(l));
+      ServerLocation l = sn.getServerForConnection("b", 
Collections.emptySet());
+      assertThat(l2.equals(l) || l3.equals(l)).isTrue();
     }
 
     // The load can't be completely balanced, because
     // We already had 30 connections from group a on server l2.
     // But we expect that l3 should have received most of the connections
     // for group b, because it started out with 0.
-    expected = new HashMap();
+    expected = new HashMap<>();
     expected.put(l1, new ServerLoad(30f, 1f, 0f, 1f));
     expected.put(l2, new ServerLoad(45f, 1f, 0f, 1f));
     expected.put(l3, new ServerLoad(45f, 1f, 0f, 1f));
-    assertEquals(expected, sn.getLoadMap());
+    assertThat(sn.getLoadMap()).isEqualTo(expected);
 
   }
 
@@ -271,26 +276,25 @@ public class LocatorLoadSnapshotJUnitTest {
     sn.addServer(l1, uniqueId1, new String[0], new ServerLoad(1, 1, 1, 1), 
LOAD_POLL_INTERVAL);
     sn.addServer(l2, uniqueId2, new String[0], new ServerLoad(100, 1, 100, 1), 
LOAD_POLL_INTERVAL);
 
-    HashSet excludeAll = new HashSet();
+    Set<ServerLocation> excludeAll = new HashSet<>();
     excludeAll.add(l1);
     excludeAll.add(l2);
 
-    assertEquals(l1, sn.getServerForConnection(null, Collections.EMPTY_SET));
-    assertEquals(l2, sn.getServerForConnection(null, 
Collections.singleton(l1)));
+    assertThat(sn.getServerForConnection(null, 
Collections.emptySet())).isEqualTo(l1);
+    assertThat(sn.getServerForConnection(null, 
Collections.singleton(l1))).isEqualTo(l2);
 
-    assertEquals(null, sn.getServerForConnection(null, excludeAll));
-    assertEquals(Arrays.asList(l2),
-        sn.getServersForQueue(null, Collections.singleton(l1), 3));
+    assertThat(sn.getServerForConnection(null, excludeAll)).isEqualTo(null);
+    assertThat(sn.getServersForQueue(null, Collections.singleton(l1), 
3)).isEqualTo(
+        Collections.singletonList(l2));
 
-    assertEquals(Arrays.asList(),
-        sn.getServersForQueue(null, excludeAll, 3));
+    assertThat(sn.getServersForQueue(null, excludeAll, 
3)).isEqualTo(Collections.emptyList());
   }
 
   @Test
   public void testAreBalanced() {
     final LocatorLoadSnapshot sn = new LocatorLoadSnapshot();
-    assertTrue(sn.hasBalancedConnections(null));
-    assertTrue(sn.hasBalancedConnections("a"));
+    assertThat(sn.hasBalancedConnections(null)).isTrue();
+    assertThat(sn.hasBalancedConnections("a")).isTrue();
     final ServerLocation l1 = new ServerLocation("localhost", 1);
     final ServerLocation l2 = new ServerLocation("localhost", 2);
     final ServerLocation l3 = new ServerLocation("localhost", 3);
@@ -302,19 +306,19 @@ public class LocatorLoadSnapshotJUnitTest {
         LOAD_POLL_INTERVAL);
     sn.addServer(l3, uniqueId3, new String[] {"b"}, new ServerLoad(0, 1, 0, 
1), LOAD_POLL_INTERVAL);
 
-    assertTrue(sn.hasBalancedConnections(null));
-    assertTrue(sn.hasBalancedConnections("a"));
-    assertTrue(sn.hasBalancedConnections("b"));
+    assertThat(sn.hasBalancedConnections(null)).isTrue();
+    assertThat(sn.hasBalancedConnections("a")).isTrue();
+    assertThat(sn.hasBalancedConnections("b")).isTrue();
 
     sn.updateLoad(l1, uniqueId1, new ServerLoad(1, 1, 0, 1));
-    assertTrue(sn.hasBalancedConnections(null));
-    assertTrue(sn.hasBalancedConnections("a"));
-    assertTrue(sn.hasBalancedConnections("b"));
+    assertThat(sn.hasBalancedConnections(null)).isTrue();
+    assertThat(sn.hasBalancedConnections("a")).isTrue();
+    assertThat(sn.hasBalancedConnections("b")).isTrue();
 
     sn.updateLoad(l2, uniqueId2, new ServerLoad(2, 1, 0, 1));
-    assertFalse(sn.hasBalancedConnections(null));
-    assertTrue(sn.hasBalancedConnections("a"));
-    assertFalse(sn.hasBalancedConnections("b"));
+    assertThat(sn.hasBalancedConnections(null)).isFalse();
+    assertThat(sn.hasBalancedConnections("a")).isTrue();
+    assertThat(sn.hasBalancedConnections("b")).isFalse();
   }
 
   @Test
@@ -342,28 +346,28 @@ public class LocatorLoadSnapshotJUnitTest {
         LOAD_POLL_INTERVAL);
 
     // a new server should be selected until the load-imbalance-threshold is 
reached
-    ServerLocation newServer = null;
+    ServerLocation newServer;
     do {
-      newServer = loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.EMPTY_SET);
+      newServer = loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.emptySet());
       if (newServer == l3) {
         // the threshold check should have initiated client rebalancing
-        assertTrue(loadSnapshot.isRebalancing());
+        assertThat(loadSnapshot.isRebalancing()).isTrue();
       }
     } while (newServer == l3);
 
     // once balance is achieved we should have received the same server and
     // rebalancing should have ended
-    assertEquals(l1, newServer);
-    assertFalse(loadSnapshot.isRebalancing());
+    assertThat(newServer).isEqualTo(l1);
+    assertThat(loadSnapshot.isRebalancing()).isFalse();
 
     // all load snapshots should now be balanced
     Map<ServerLocation, ServerLoad> loadMap = loadSnapshot.getLoadMap();
     ServerLoad l1Load = loadMap.get(l1);
-    assertEquals(50, l1Load.getConnectionLoad(), 0.01);
+    assertThat(l1Load.getConnectionLoad()).isCloseTo(50F, offset(0.01F));
     ServerLoad l2Load = loadMap.get(l2);
-    assertEquals(50, l1Load.getConnectionLoad(), 0.01);
+    assertThat(l2Load.getConnectionLoad()).isCloseTo(50F, offset(0.01F));
     ServerLoad l3Load = loadMap.get(l3);
-    assertEquals(50, l3Load.getConnectionLoad(), 0.01);
+    assertThat(l3Load.getConnectionLoad()).isCloseTo(50F, offset(0.01F));
   }
 
   @Test
@@ -391,11 +395,11 @@ public class LocatorLoadSnapshotJUnitTest {
         LOAD_POLL_INTERVAL);
 
     ServerLocation newServer =
-        loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.EMPTY_SET);
-    assertEquals(l1, newServer);
+        loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.emptySet());
+    assertThat(newServer).isEqualTo(l1);
     Map<ServerLocation, ServerLoad> loadMap = loadSnapshot.getLoadMap();
     ServerLoad l1Load = loadMap.get(l1);
-    assertEquals(l1ConnectionLoad, l1Load.getConnectionLoad(), 0.01);
+    assertThat(l1Load.getConnectionLoad()).isCloseTo(l1ConnectionLoad, 
offset(0.01F));
   }
 
   @Test
@@ -430,9 +434,9 @@ public class LocatorLoadSnapshotJUnitTest {
     groupServers.put(sli2, loadHolder2);
     groupServers.put(sli3, loadHolder3);
 
-    assertEquals(loadHolder1, loadSnapshot.isCurrentServerMostLoaded(l1, 
groupServers));
-    assertNull(loadSnapshot.isCurrentServerMostLoaded(l2, groupServers));
-    assertNull(loadSnapshot.isCurrentServerMostLoaded(l3, groupServers));
+    assertThat(loadSnapshot.isCurrentServerMostLoaded(l1, 
groupServers)).isEqualTo(loadHolder1);
+    assertThat(loadSnapshot.isCurrentServerMostLoaded(l2, 
groupServers)).isNull();
+    assertThat(loadSnapshot.isCurrentServerMostLoaded(l3, 
groupServers)).isNull();
   }
 
   @Test
@@ -460,17 +464,17 @@ public class LocatorLoadSnapshotJUnitTest {
         LOAD_POLL_INTERVAL);
 
     ServerLocation newServer1 =
-        loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.EMPTY_SET);
-    assertEquals(l3, newServer1);
+        loadSnapshot.getReplacementServerForConnection(l1, "", 
Collections.emptySet());
+    assertThat(newServer1).isEqualTo(l3);
     ServerLocation newServer2 =
-        loadSnapshot.getReplacementServerForConnection(l1, "a", 
Collections.EMPTY_SET);
-    assertEquals(l2, newServer2);
+        loadSnapshot.getReplacementServerForConnection(l1, "a", 
Collections.emptySet());
+    assertThat(newServer2).isEqualTo(l2);
     ServerLocation newServer3 =
-        loadSnapshot.getReplacementServerForConnection(l3, "b", 
Collections.EMPTY_SET);
-    assertEquals(l3, newServer3);
+        loadSnapshot.getReplacementServerForConnection(l3, "b", 
Collections.emptySet());
+    assertThat(newServer3).isEqualTo(l3);
     ServerLocation newServer4 =
-        loadSnapshot.getReplacementServerForConnection(l2, "b", 
Collections.EMPTY_SET);
-    assertEquals(l3, newServer4);
+        loadSnapshot.getReplacementServerForConnection(l2, "b", 
Collections.emptySet());
+    assertThat(newServer4).isEqualTo(l3);
   }
 
   @Test
@@ -506,9 +510,9 @@ public class LocatorLoadSnapshotJUnitTest {
     groupServers.put(sli3, loadHolder3);
 
     List<LocatorLoadSnapshot.LoadHolder> result =
-        loadSnapshot.findBestServers(groupServers, Collections.EMPTY_SET, 1);
-    assertEquals(1, result.size());
-    assertEquals(loadHolder2, result.get(0));
+        loadSnapshot.findBestServers(groupServers, Collections.emptySet(), 1);
+    assertThat(result).hasSize(1);
+    assertThat(result.get(0)).isEqualTo(loadHolder2);
   }
 
   @Test
@@ -544,13 +548,13 @@ public class LocatorLoadSnapshotJUnitTest {
     groupServers.put(sli3, loadHolder3);
 
     List<LocatorLoadSnapshot.LoadHolder> result =
-        loadSnapshot.findBestServers(groupServers, Collections.EMPTY_SET, 2);
-    assertEquals(2, result.size());
-    assertEquals(loadHolder2, result.get(0));
-    assertEquals(loadHolder3, result.get(1));
+        loadSnapshot.findBestServers(groupServers, Collections.emptySet(), 2);
+    assertThat(result).hasSize(2);
+    assertThat(result.get(0)).isEqualTo(loadHolder2);
+    assertThat(result.get(1)).isEqualTo(loadHolder3);
 
-    result = loadSnapshot.findBestServers(groupServers, Collections.EMPTY_SET, 
0);
-    assertEquals(0, result.size());
+    result = loadSnapshot.findBestServers(groupServers, 
Collections.emptySet(), 0);
+    assertThat(result).hasSize(0);
   }
 
   @Test
@@ -586,8 +590,8 @@ public class LocatorLoadSnapshotJUnitTest {
     groupServers.put(sli3, loadHolder3);
 
     List<LocatorLoadSnapshot.LoadHolder> result =
-        loadSnapshot.findBestServers(groupServers, Collections.EMPTY_SET, 0);
-    assertEquals(0, result.size());
+        loadSnapshot.findBestServers(groupServers, Collections.emptySet(), 0);
+    assertThat(result).hasSize(0);
   }
 
   @Test
@@ -623,36 +627,98 @@ public class LocatorLoadSnapshotJUnitTest {
     groupServers.put(sli3, loadHolder3);
 
     List<LocatorLoadSnapshot.LoadHolder> result =
-        loadSnapshot.findBestServers(groupServers, Collections.EMPTY_SET, -1);
-    assertEquals(3, result.size());
-    assertEquals(loadHolder2, result.get(0));
-    assertEquals(loadHolder3, result.get(1));
-    assertEquals(loadHolder1, result.get(2));
+        loadSnapshot.findBestServers(groupServers, Collections.emptySet(), -1);
+    assertThat(result).containsExactly(loadHolder2, loadHolder3, loadHolder1);
   }
 
   @Test
-  public void updateMapWithServerLocationAndMemberId() {
+  public void 
testThatConnectionLoadIsCorrectlyUpdatedForTrafficConnectionGroup() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    groupServers.put(sli, loadHolder);
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 60, 2);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(sli).getLoad(), 0);
-    assertEquals(expectedLoadHolder.getLoadPerConnection(),
-        groupServers.get(sli).getLoadPerConnection(), 0);
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertThat(expectedLoadHolder.getLoad())
+        .isEqualTo(serverLoadMap.get(serverLocation).getConnectionLoad());
+    assertThat(expectedLoadHolder.getLoadPerConnection())
+        .isEqualTo(serverLoadMap.get(serverLocation).getLoadPerConnection());
+  }
+
+  @Test
+  public void 
testThatConnectionLoadIsCorrectlyUpdatedForGatewayReceiverGroup() {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(serverLocation, uniqueId);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[] 
{GatewayReceiver.RECEIVER_GROUP},
+        new ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, 
LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 
expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, ServerLoad> serverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertThat(expectedLoadHolder.getLoad())
+        .isEqualTo(serverLoadMap.get(servLocAndMemberId).getConnectionLoad());
+    assertThat(expectedLoadHolder.getLoadPerConnection())
+        
.isEqualTo(serverLoadMap.get(servLocAndMemberId).getLoadPerConnection());
+  }
+
+  @Test
+  public void 
testThatConnectionLoadIsCorrectlyUpdatedForBothGatewayReceiverGroupAndTrafficConnectionGroup()
 {
+    final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
+
+    final ServerLocation serverLocation = new ServerLocation("localhost", 1);
+    final ServerLocation gatewayReceiverLocation = new 
ServerLocation("gatewayReciverHost", 111);
+    final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
+    final ServerLocationAndMemberId servLocAndMemberId =
+        new ServerLocationAndMemberId(gatewayReceiverLocation, uniqueId);
+
+    loadSnapshot.addServer(gatewayReceiverLocation, uniqueId,
+        new String[] {GatewayReceiver.RECEIVER_GROUP}, new ServerLoad(50, 1, 
0, 1),
+        LOAD_POLL_INTERVAL);
+
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
+
+    LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 70, 8, 
LOAD_POLL_INTERVAL);
+    LocatorLoadSnapshot.LoadHolder expectedGatewayLoad =
+        new LocatorLoadSnapshot.LoadHolder(serverLocation, 90, 10, 
LOAD_POLL_INTERVAL);
+
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 
expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+    loadSnapshot.updateConnectionLoadMap(gatewayReceiverLocation, uniqueId,
+        expectedGatewayLoad.getLoad(), 
expectedGatewayLoad.getLoadPerConnection());
+
+    Map<ServerLocationAndMemberId, ServerLoad> gatewayReceiverLoadMap =
+        loadSnapshot.getGatewayReceiverLoadMap();
+    assertThat(expectedGatewayLoad.getLoad())
+        
.isEqualTo(gatewayReceiverLoadMap.get(servLocAndMemberId).getConnectionLoad());
+    assertThat(expectedGatewayLoad.getLoadPerConnection())
+        
.isEqualTo(gatewayReceiverLoadMap.get(servLocAndMemberId).getLoadPerConnection());
+
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertThat(expectedLoadHolder.getLoad())
+        .isEqualTo(serverLoadMap.get(serverLocation).getConnectionLoad());
+    assertThat(expectedLoadHolder.getLoadPerConnection())
+        .isEqualTo(serverLoadMap.get(serverLocation).getLoadPerConnection());
   }
 
   @Test
@@ -661,53 +727,47 @@ public class LocatorLoadSnapshotJUnitTest {
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
     final String uniqueId = new InternalDistributedMember("localhost", 
1).getUniqueId();
-    final ServerLocationAndMemberId sli = new 
ServerLocationAndMemberId(serverLocation, uniqueId);
-    Map<ServerLocationAndMemberId, LocatorLoadSnapshot.LoadHolder> 
groupServers = new HashMap<>();
-    Map<String, Map<ServerLocationAndMemberId, 
LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
 
-    loadSnapshot.updateMap(map, serverLocation, uniqueId, 50, 1);
+    loadSnapshot.updateConnectionLoadMap(serverLocation, uniqueId, 50, 1);
 
-    assertNull(groupServers.get(sli));
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertThat(serverLoadMap.isEmpty()).as("Expected connection map to be 
empty").isTrue();
   }
 
   @Test
-  public void updateMapWithServerLocation() {
+  public void updateQueueMapWithServerLocation() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    LocatorLoadSnapshot.LoadHolder loadHolder =
-        new LocatorLoadSnapshot.LoadHolder(serverLocation, 50, 1, 
LOAD_POLL_INTERVAL);
-    Map<ServerLocation, LocatorLoadSnapshot.LoadHolder> groupServers = new 
HashMap<>();
-    groupServers.put(serverLocation, loadHolder);
-    Map<String, Map<ServerLocation, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
+    final String uniqueId = "memberId1";
 
-    loadSnapshot.updateMap(map, serverLocation, 60, 2);
+    loadSnapshot.addServer(serverLocation, uniqueId, new String[0], new 
ServerLoad(50, 1, 0, 1),
+        LOAD_POLL_INTERVAL);
 
     LocatorLoadSnapshot.LoadHolder expectedLoadHolder =
         new LocatorLoadSnapshot.LoadHolder(serverLocation, 60, 2, 
LOAD_POLL_INTERVAL);
 
-    assertEquals(expectedLoadHolder.getLoad(), 
groupServers.get(serverLocation).getLoad(), 0);
-    assertEquals(expectedLoadHolder.getLoadPerConnection(),
-        groupServers.get(serverLocation).getLoadPerConnection(), 0);
+    loadSnapshot.updateQueueLoadMap(serverLocation, 
expectedLoadHolder.getLoad(),
+        expectedLoadHolder.getLoadPerConnection());
+
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    ServerLoad queueServerLoad = serverLoadMap.get(serverLocation);
+    assertThat(expectedLoadHolder.getLoad())
+        .isEqualTo(queueServerLoad.getSubscriptionConnectionLoad());
+    assertThat(expectedLoadHolder.getLoadPerConnection())
+        .isEqualTo(queueServerLoad.getLoadPerSubscriptionConnection());
   }
 
   @Test
-  public void updateMapWithServerLocationKeyNotFound() {
+  public void updateQueueMapWithServerLocationKeyNotFound() {
     final LocatorLoadSnapshot loadSnapshot = new LocatorLoadSnapshot();
 
     final ServerLocation serverLocation = new ServerLocation("localhost", 1);
-    Map<ServerLocation, LocatorLoadSnapshot.LoadHolder> groupServers = new 
HashMap<>();
-    Map<String, Map<ServerLocation, LocatorLoadSnapshot.LoadHolder>> map =
-        new HashMap<>();
-    map.put(null, groupServers);
 
-    loadSnapshot.updateMap(map, serverLocation, 50, 1);
+    loadSnapshot.updateQueueLoadMap(serverLocation, 70, 1);
 
-    assertNull(groupServers.get(serverLocation));
+    Map<ServerLocation, ServerLoad> serverLoadMap = loadSnapshot.getLoadMap();
+    assertThat(serverLoadMap.isEmpty()).as("Expected connection map to be 
empty").isTrue();
   }
 
   @Test
@@ -734,8 +794,8 @@ public class LocatorLoadSnapshotJUnitTest {
 
     loadSnapshot.removeFromMap(map, new String[] {""}, sl1, uniqueId1);
 
-    assertEquals(1, groupServers.size());
-    assertNull(groupServers.get(sli1));
+    assertThat(groupServers).hasSize(1);
+    assertThat(groupServers).doesNotContainKey(sli1);
   }
 
   @Test
@@ -774,11 +834,11 @@ public class LocatorLoadSnapshotJUnitTest {
 
     loadSnapshot.removeFromMap(map, new String[] {"a"}, sl1, uniqueId1);
 
-    assertEquals(2, groupServers.size());
-    assertNull(groupServers.get(sli1));
+    assertThat(groupServers).hasSize(2);
+    assertThat(groupServers).doesNotContainKey(sli1);
 
-    assertEquals(1, groupAServers.size());
-    assertNull(groupAServers.get(sli1));
+    assertThat(groupAServers).hasSize(1);
+    assertThat(groupServers).doesNotContainKey(sli1);
   }
 
   @Test
@@ -802,8 +862,8 @@ public class LocatorLoadSnapshotJUnitTest {
 
     loadSnapshot.removeFromMap(map, new String[] {""}, sl1);
 
-    assertEquals(1, groupServers.size());
-    assertNull(groupServers.get(sl1));
+    assertThat(groupServers).hasSize(1);
+    assertThat(groupServers).doesNotContainKey(sl1);
   }
 
   @Test
@@ -837,11 +897,11 @@ public class LocatorLoadSnapshotJUnitTest {
 
     loadSnapshot.removeFromMap(map, new String[] {"a"}, sl1);
 
-    assertEquals(2, groupServers.size());
-    assertNull(groupServers.get(sl1));
+    assertThat(groupServers).hasSize(2);
+    assertThat(groupServers).doesNotContainKey(sl1);
 
-    assertEquals(1, groupAServers.size());
-    assertNull(groupAServers.get(sl1));
+    assertThat(groupAServers).hasSize(1);
+    assertThat(groupServers).doesNotContainKey(sl1);
   }
 
   @Test
@@ -871,17 +931,10 @@ public class LocatorLoadSnapshotJUnitTest {
     loadSnapshot.addGroups(map, new String[] {"a", "b"}, loadHolder2, 
uniqueId2);
     loadSnapshot.addGroups(map, new String[] {}, loadHolder3, uniqueId3);
 
-    assertEquals(3, map.get(null).size());
-    assertEquals(loadHolder1, map.get(null).get(sli1));
-    assertEquals(loadHolder2, map.get(null).get(sli2));
-    assertEquals(loadHolder3, map.get(null).get(sli3));
-
-    assertEquals(2, map.get("a").size());
-    assertEquals(loadHolder1, map.get("a").get(sli1));
-    assertEquals(loadHolder2, map.get("a").get(sli2));
-
-    assertEquals(1, map.get("b").size());
-    assertEquals(loadHolder2, map.get("b").get(sli2));
+    assertThat(map.get(null)).containsOnly(entry(sli1, loadHolder1), 
entry(sli2, loadHolder2),
+        entry(sli3, loadHolder3));
+    assertThat(map.get("a")).containsOnly(entry(sli1, loadHolder1), 
entry(sli2, loadHolder2));
+    assertThat(map.get("b")).containsOnly(entry(sli2, loadHolder2));
   }
 
   @Test
@@ -906,16 +959,9 @@ public class LocatorLoadSnapshotJUnitTest {
     loadSnapshot.addGroups(map, new String[] {"a", "b"}, loadHolder2);
     loadSnapshot.addGroups(map, new String[] {}, loadHolder3);
 
-    assertEquals(3, map.get(null).size());
-    assertEquals(loadHolder1, map.get(null).get(sl1));
-    assertEquals(loadHolder2, map.get(null).get(sl2));
-    assertEquals(loadHolder3, map.get(null).get(sl3));
-
-    assertEquals(2, map.get("a").size());
-    assertEquals(loadHolder1, map.get("a").get(sl1));
-    assertEquals(loadHolder2, map.get("a").get(sl2));
-
-    assertEquals(1, map.get("b").size());
-    assertEquals(loadHolder2, map.get("b").get(sl2));
+    assertThat(map.get(null)).containsOnly(entry(sl1, loadHolder1), entry(sl2, 
loadHolder2),
+        entry(sl3, loadHolder3));
+    assertThat(map.get("a")).containsOnly(entry(sl1, loadHolder1), entry(sl2, 
loadHolder2));
+    assertThat(map.get("b")).containsOnly(entry(sl2, loadHolder2));
   }
 }

Reply via email to