This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new ae17ba4adc GEODE-10020: Introduction of option to gradually activate 
pinging (#7517)
ae17ba4adc is described below

commit ae17ba4adce09e51f91d8bb3813beeed8cbf5569
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Wed Apr 27 22:02:44 2022 +0200

    GEODE-10020: Introduction of option to gradually activate pinging (#7517)
    
    * GEODE-10020: Introduction of option to gradually activate pinging toward 
destination
---
 ...iversWithSamePortAndHostnameForSendersTest.java | 61 ++++++++++++++++-
 .../cache/client/internal/LiveServerPinger.java    | 41 +++++++++++-
 .../client/internal/LiveServerPingerTest.java      | 78 ++++++++++++++++++++++
 3 files changed, 177 insertions(+), 3 deletions(-)

diff --git 
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
 
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
index 8bad48f570..682b132638 100644
--- 
a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
+++ 
b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java
@@ -58,6 +58,7 @@ import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.VM;
 import org.apache.geode.test.dunit.rules.DistributedRule;
 import org.apache.geode.test.junit.categories.WanTest;
+import org.apache.geode.util.internal.GeodeGlossary;
 
 
 /**
@@ -214,6 +215,54 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
 
   }
 
+
+  /**
+   * The aim of this test is verify that when several gateway receivers in a 
remote site share the
+   * same port and hostname-for-senders, the pings sent from the gateway 
senders reach the right
+   * gateway receiver and not just any of the receivers. Check that only one 
additional connection
+   * is used.
+   */
+  @Test
+  public void 
testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection()
+      throws InterruptedException {
+    String senderId = "ln";
+    String regionName = "region-wan";
+    final int remoteLocPort = docker.getExternalPortForService("haproxy", 
20334);
+
+    int locPort = createLocator(VM.getVM(0), 1, remoteLocPort);
+
+    VM vm1 = VM.getVM(1);
+
+    vm1.invoke(() -> {
+      System.setProperty(
+          GeodeGlossary.GEMFIRE_PREFIX
+              + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS",
+          "500");
+
+      Properties props = new Properties();
+      props.setProperty(LOCATORS, "localhost[" + locPort + "]");
+      CacheFactory cacheFactory = new CacheFactory(props);
+      cache = cacheFactory.create();
+    });
+
+    createGatewaySender(vm1, senderId, 2, true, 5,
+        2, GatewaySender.DEFAULT_ORDER_POLICY);
+
+    createPartitionedRegion(vm1, regionName, senderId, 0, 10);
+
+    int NUM_PUTS = 10;
+
+    putKeyValues(vm1, NUM_PUTS, regionName);
+
+    await().untilAsserted(() -> assertThat(getQueuedEvents(vm1, 
senderId)).isEqualTo(0));
+
+    await().untilAsserted(() -> assertThat(getSenderPoolDisconnects(vm1, 
senderId)).isEqualTo(0));
+
+    await().untilAsserted(() -> assertThat(getSenderPoolConnects(vm1, 
senderId)).isEqualTo(4));
+  }
+
+
+
   private boolean allDispatchersConnectedToSameReceiver(int server) {
 
     String gfshOutput = runListGatewayReceiversCommandInServer(server);
@@ -351,12 +400,22 @@ public class 
SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest {
     return vm.invoke(() -> {
       AbstractGatewaySender sender =
           (AbstractGatewaySender) 
CacheFactory.getAnyInstance().getGatewaySender(senderId);
-      assertNotNull(sender);
+      assertThat(sender).isNotNull();
       PoolStats poolStats = sender.getProxy().getStats();
       return poolStats.getDisConnects();
     });
   }
 
+  private static int getSenderPoolConnects(VM vm, String senderId) {
+    return vm.invoke(() -> {
+      AbstractGatewaySender sender =
+          (AbstractGatewaySender) 
CacheFactory.getAnyInstance().getGatewaySender(senderId);
+      assertThat(sender).isNotNull();
+      PoolStats poolStats = sender.getProxy().getStats();
+      return poolStats.getConnects();
+    });
+  }
+
   private static void putKeyValues(VM vm, int numPuts, String region) {
     final HashMap<Integer, Integer> keyValues = new HashMap<>();
     for (int i = 0; i < numPuts; i++) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
index ff91da5a87..fd00a4a9be 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java
@@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Future;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.logging.log4j.Logger;
 
@@ -27,6 +28,7 @@ import 
org.apache.geode.cache.client.internal.PoolImpl.PoolTask;
 import org.apache.geode.internal.cache.GemFireCacheImpl;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.util.internal.GeodeGlossary;
 
 /**
  * Responsible for pinging live servers to make sure they are still alive.
@@ -41,26 +43,48 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
   protected final InternalPool pool;
   protected final long pingIntervalNanos;
 
+  /**
+   * Initial delay offset time between LiveServerPinger tasks. Time in 
milliseconds.
+   */
+  public static final int INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS =
+      Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX
+          + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS", 0);
+
+  private final AtomicInteger initialDelayIndex = new AtomicInteger(0);
+
+
   public LiveServerPinger(InternalPool pool) {
     this.pool = pool;
-    pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS;
+    pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos((pool.getPingInterval() 
+ 1) / 2);
   }
 
   @Override
   public void endpointCrashed(Endpoint endpoint) {
+    resetInitialDelay();
     cancelFuture(endpoint);
   }
 
   @Override
   public void endpointNoLongerInUse(Endpoint endpoint) {
+    resetInitialDelay();
     cancelFuture(endpoint);
   }
 
+  /**
+   * At each registration of new endpoint increase counter for calculation of 
initial delay offset
+   *
+   */
   @Override
   public void endpointNowInUse(Endpoint endpoint) {
     try {
+      long initDelay = calculateInitialDelay();
+
+      // initDelay - the time to delay first execution
+      // pingIntervalNanos - the delay between the termination of one 
execution and the commencement
+      // of the next
+
       Future future = pool.getBackgroundProcessor().scheduleWithFixedDelay(new 
PingTask(endpoint),
-          pingIntervalNanos, pingIntervalNanos, TimeUnit.NANOSECONDS);
+          initDelay, pingIntervalNanos, TimeUnit.NANOSECONDS);
       taskFutures.put(endpoint, future);
     } catch (RejectedExecutionException e) {
       if (!pool.getCancelCriterion().isCancelInProgress()) {
@@ -76,6 +100,19 @@ public class LiveServerPinger extends 
EndpointListenerAdapter {
     }
   }
 
+
+  long calculateInitialDelay() {
+    long initDelay = initialDelayIndex.getAndIncrement();
+    initDelay =
+        TimeUnit.MILLISECONDS.toNanos(initDelay * 
INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS)
+            + pingIntervalNanos;
+    return initDelay;
+  }
+
+  void resetInitialDelay() {
+    initialDelayIndex.set(0);
+  }
+
   private class PingTask extends PoolTask {
     private final Endpoint endpoint;
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
new file mode 100644
index 0000000000..15f4ca5ebd
--- /dev/null
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.client.internal;
+
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import org.apache.geode.util.internal.GeodeGlossary;
+
+public class LiveServerPingerTest {
+
+  private InternalPool pool;
+
+  private LiveServerPinger lsp;
+
+  private static long PING_INTERVAL = 10L;
+  private static long DEFAULT_PING_INTERVAL_NANOS = 5000000L;
+
+  private static long CONFIG_PING_INTERVAL_NANOS = 1000000L;
+
+
+  @BeforeEach
+  public void init() throws Exception {
+    System.setProperty(
+        GeodeGlossary.GEMFIRE_PREFIX + 
"LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS",
+        "1");
+
+    pool = mock(InternalPool.class);
+    when(pool.getPingInterval()).thenReturn(PING_INTERVAL);
+
+    lsp = new LiveServerPinger(pool);
+  }
+
+  @Test
+  public void testInitialDelay() throws Exception {
+
+    
assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS);
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 * 
CONFIG_PING_INTERVAL_NANOS));
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (3 * 
CONFIG_PING_INTERVAL_NANOS));
+
+  }
+
+  @Test
+  public void testInitialDelayWithReset() throws Exception {
+
+    
assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS);
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 * 
CONFIG_PING_INTERVAL_NANOS));
+    lsp.resetInitialDelay();
+    assertThat(lsp.calculateInitialDelay())
+        .isEqualTo(DEFAULT_PING_INTERVAL_NANOS);
+
+  }
+
+}

Reply via email to