This is an automated email from the ASF dual-hosted git repository. mivanac pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/geode.git
The following commit(s) were added to refs/heads/develop by this push: new ae17ba4adc GEODE-10020: Introduction of option to gradually activate pinging (#7517) ae17ba4adc is described below commit ae17ba4adce09e51f91d8bb3813beeed8cbf5569 Author: Mario Ivanac <48509724+miva...@users.noreply.github.com> AuthorDate: Wed Apr 27 22:02:44 2022 +0200 GEODE-10020: Introduction of option to gradually activate pinging (#7517) * GEODE-10020: Introduction of option to gradually activate pinging toward destination --- ...iversWithSamePortAndHostnameForSendersTest.java | 61 ++++++++++++++++- .../cache/client/internal/LiveServerPinger.java | 41 +++++++++++- .../client/internal/LiveServerPingerTest.java | 78 ++++++++++++++++++++++ 3 files changed, 177 insertions(+), 3 deletions(-) diff --git a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java index 8bad48f570..682b132638 100644 --- a/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java +++ b/geode-assembly/src/acceptanceTest/java/org/apache/geode/cache/wan/SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest.java @@ -58,6 +58,7 @@ import org.apache.geode.test.dunit.IgnoredException; import org.apache.geode.test.dunit.VM; import org.apache.geode.test.dunit.rules.DistributedRule; import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.util.internal.GeodeGlossary; /** @@ -214,6 +215,54 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { } + + /** + * The aim of this test is verify that when several gateway receivers in a remote site share the + * same port and hostname-for-senders, the pings sent from the gateway senders reach the right + * gateway receiver and not just any of the receivers. Check that only one additional connection + * is used. + */ + @Test + public void testPingsToReceiversWithSamePortAndHostnameForSendersUseOnlyOneMoreConnection() + throws InterruptedException { + String senderId = "ln"; + String regionName = "region-wan"; + final int remoteLocPort = docker.getExternalPortForService("haproxy", 20334); + + int locPort = createLocator(VM.getVM(0), 1, remoteLocPort); + + VM vm1 = VM.getVM(1); + + vm1.invoke(() -> { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS", + "500"); + + Properties props = new Properties(); + props.setProperty(LOCATORS, "localhost[" + locPort + "]"); + CacheFactory cacheFactory = new CacheFactory(props); + cache = cacheFactory.create(); + }); + + createGatewaySender(vm1, senderId, 2, true, 5, + 2, GatewaySender.DEFAULT_ORDER_POLICY); + + createPartitionedRegion(vm1, regionName, senderId, 0, 10); + + int NUM_PUTS = 10; + + putKeyValues(vm1, NUM_PUTS, regionName); + + await().untilAsserted(() -> assertThat(getQueuedEvents(vm1, senderId)).isEqualTo(0)); + + await().untilAsserted(() -> assertThat(getSenderPoolDisconnects(vm1, senderId)).isEqualTo(0)); + + await().untilAsserted(() -> assertThat(getSenderPoolConnects(vm1, senderId)).isEqualTo(4)); + } + + + private boolean allDispatchersConnectedToSameReceiver(int server) { String gfshOutput = runListGatewayReceiversCommandInServer(server); @@ -351,12 +400,22 @@ public class SeveralGatewayReceiversWithSamePortAndHostnameForSendersTest { return vm.invoke(() -> { AbstractGatewaySender sender = (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId); - assertNotNull(sender); + assertThat(sender).isNotNull(); PoolStats poolStats = sender.getProxy().getStats(); return poolStats.getDisConnects(); }); } + private static int getSenderPoolConnects(VM vm, String senderId) { + return vm.invoke(() -> { + AbstractGatewaySender sender = + (AbstractGatewaySender) CacheFactory.getAnyInstance().getGatewaySender(senderId); + assertThat(sender).isNotNull(); + PoolStats poolStats = sender.getProxy().getStats(); + return poolStats.getConnects(); + }); + } + private static void putKeyValues(VM vm, int numPuts, String region) { final HashMap<Integer, Integer> keyValues = new HashMap<>(); for (int i = 0; i < numPuts; i++) { diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java index ff91da5a87..fd00a4a9be 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/LiveServerPinger.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.Logger; @@ -27,6 +28,7 @@ import org.apache.geode.cache.client.internal.PoolImpl.PoolTask; import org.apache.geode.internal.cache.GemFireCacheImpl; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.logging.internal.log4j.api.LogService; +import org.apache.geode.util.internal.GeodeGlossary; /** * Responsible for pinging live servers to make sure they are still alive. @@ -41,26 +43,48 @@ public class LiveServerPinger extends EndpointListenerAdapter { protected final InternalPool pool; protected final long pingIntervalNanos; + /** + * Initial delay offset time between LiveServerPinger tasks. Time in milliseconds. + */ + public static final int INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS = + Integer.getInteger(GeodeGlossary.GEMFIRE_PREFIX + + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS", 0); + + private final AtomicInteger initialDelayIndex = new AtomicInteger(0); + + public LiveServerPinger(InternalPool pool) { this.pool = pool; - pingIntervalNanos = ((pool.getPingInterval() + 1) / 2) * NANOS_PER_MS; + pingIntervalNanos = TimeUnit.MILLISECONDS.toNanos((pool.getPingInterval() + 1) / 2); } @Override public void endpointCrashed(Endpoint endpoint) { + resetInitialDelay(); cancelFuture(endpoint); } @Override public void endpointNoLongerInUse(Endpoint endpoint) { + resetInitialDelay(); cancelFuture(endpoint); } + /** + * At each registration of new endpoint increase counter for calculation of initial delay offset + * + */ @Override public void endpointNowInUse(Endpoint endpoint) { try { + long initDelay = calculateInitialDelay(); + + // initDelay - the time to delay first execution + // pingIntervalNanos - the delay between the termination of one execution and the commencement + // of the next + Future future = pool.getBackgroundProcessor().scheduleWithFixedDelay(new PingTask(endpoint), - pingIntervalNanos, pingIntervalNanos, TimeUnit.NANOSECONDS); + initDelay, pingIntervalNanos, TimeUnit.NANOSECONDS); taskFutures.put(endpoint, future); } catch (RejectedExecutionException e) { if (!pool.getCancelCriterion().isCancelInProgress()) { @@ -76,6 +100,19 @@ public class LiveServerPinger extends EndpointListenerAdapter { } } + + long calculateInitialDelay() { + long initDelay = initialDelayIndex.getAndIncrement(); + initDelay = + TimeUnit.MILLISECONDS.toNanos(initDelay * INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS) + + pingIntervalNanos; + return initDelay; + } + + void resetInitialDelay() { + initialDelayIndex.set(0); + } + private class PingTask extends PoolTask { private final Endpoint endpoint; diff --git a/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java new file mode 100644 index 0000000000..15f4ca5ebd --- /dev/null +++ b/geode-core/src/test/java/org/apache/geode/cache/client/internal/LiveServerPingerTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.cache.client.internal; + + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import org.apache.geode.util.internal.GeodeGlossary; + +public class LiveServerPingerTest { + + private InternalPool pool; + + private LiveServerPinger lsp; + + private static long PING_INTERVAL = 10L; + private static long DEFAULT_PING_INTERVAL_NANOS = 5000000L; + + private static long CONFIG_PING_INTERVAL_NANOS = 1000000L; + + + @BeforeEach + public void init() throws Exception { + System.setProperty( + GeodeGlossary.GEMFIRE_PREFIX + "LiveServerPinger.INITIAL_DELAY_MULTIPLIER_IN_MILLISECONDS", + "1"); + + pool = mock(InternalPool.class); + when(pool.getPingInterval()).thenReturn(PING_INTERVAL); + + lsp = new LiveServerPinger(pool); + } + + @Test + public void testInitialDelay() throws Exception { + + assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 * CONFIG_PING_INTERVAL_NANOS)); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (3 * CONFIG_PING_INTERVAL_NANOS)); + + } + + @Test + public void testInitialDelayWithReset() throws Exception { + + assertThat(lsp.calculateInitialDelay()).isEqualTo(DEFAULT_PING_INTERVAL_NANOS); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + CONFIG_PING_INTERVAL_NANOS); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS + (2 * CONFIG_PING_INTERVAL_NANOS)); + lsp.resetInitialDelay(); + assertThat(lsp.calculateInitialDelay()) + .isEqualTo(DEFAULT_PING_INTERVAL_NANOS); + + } + +}