jvarenina commented on a change in pull request #6036: URL: https://github.com/apache/geode/pull/6036#discussion_r696476653
########## File path: geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest.java ########## @@ -0,0 +1,716 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.cache.wan.wancommand; + +import static org.apache.geode.distributed.ConfigurationProperties.DISTRIBUTED_SYSTEM_ID; +import static org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewayReceiverMXBeanProxy; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifyReceiverState; +import static org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState; +import static org.apache.geode.test.awaitility.GeodeAwaitility.await; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import junitparams.JUnitParamsRunner; +import junitparams.Parameters; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.wan.GatewaySender; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.cache.RegionQueue; +import org.apache.geode.internal.cache.wan.AbstractGatewaySender; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.parallel.ConcurrentParallelGatewaySenderQueue; +import org.apache.geode.internal.cache.wan.parallel.ParallelGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.serial.ConcurrentSerialGatewaySenderEventProcessor; +import org.apache.geode.internal.cache.wan.serial.SerialGatewaySenderEventProcessor; +import org.apache.geode.internal.membership.utils.AvailablePort; +import org.apache.geode.management.internal.cli.util.CommandStringBuilder; +import org.apache.geode.management.internal.i18n.CliStrings; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; +import org.apache.geode.test.junit.categories.WanTest; +import org.apache.geode.test.junit.rules.GfshCommandRule; + +@Category({WanTest.class}) +@RunWith(JUnitParamsRunner.class) +public class ClusterConfigStartStopPauseAndResumeGatewaySenderOperationDUnitTest + implements Serializable { + + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(9); + + @Rule + public transient GfshCommandRule gfsh = new GfshCommandRule(); + + private MemberVM locatorSite1; + private MemberVM locatorSite2; + private MemberVM server1Site1; + private MemberVM server2Site1; + private MemberVM server1Site2; + private MemberVM server2Site2; + + private ClientVM clientSite1; + private ClientVM clientSite2; + + /** + * Verify that gateway-sender state is persisted after pause and resume gateway-sender + * commands are executed, and that gateway-sender works as expected after member restart: + * + * - Region type: PARTITION and non-redundant + * - Gateway sender configured without queue persistence + * + * 1. Pause gateway-sender + * 2. Run some traffic and verify that data is enqueued in gateway-sender queues + * 3. Restart all servers that host gateway-sender + * 4. Run some traffic and verify that data is enqueued in gateway-sender queues, old + * data should be lost from queue after servers are restarted + * 5. Resume gateway-sender + * 6. Verify that latest traffic is sent over the gateway-sender to remote site + */ + @Test + @Parameters({"true", "false"}) + public void testThatPauseStateRemainAfterTheRestartOfMembers(String isParallel) throws Exception { + configureSites(false, "PARTITION", "0", isParallel); + + executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER); + verifyGatewaySenderState(true, true); + server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + + // Do some puts and check that data has been enqueued + Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15)); + server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size())); + + // stop servers on site #2 + server1Site2.stop(false); + server2Site2.stop(false); + + // start again servers in Site #2 + server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort()); + server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort()); + + verifyGatewaySenderState(true, true); + + // Do some puts and check that data has been enqueued, previous queue should be lost + Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35)); + server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size())); + + executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + // check that queue is empty + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + // check that data replicated to remote site + clientSite1.invoke(() -> checkDataAvailable(keysQueue1)); + } + + /** + * Verify that gateway-sender queue is persisted while in paused state and it is recovered after + * the restart of servers. + * + * - Region type: PARTITION_PERSISTENT and non-redundant + * - Gateway sender configured with queue persistence + * + * 1. Pause gateway-sender + * 2. Restart all servers that host gateway-sender + * 3. Run some traffic and verify that data is enqueued in gateway-sender queues + * 4. Restart all servers that host gateway-sender + * 5. Run some traffic and verify that data is enqueued in gateway-sender queues, old + * data should be recovered after servers restarted + * 6. Resume gateway-sender + * 5. Verify that complete traffic is sent over the gateway-sender to the remote site + */ + @Test + @Parameters({"true", "false"}) + public void testThatPauseStateRemainAfterRestartAllServersPersistent(String isParallel) + throws Exception { + configureSites(true, "PARTITION_PERSISTENT", "0", isParallel); + + executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER); + verifyGatewaySenderState(true, true); + server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + + // Do some puts and check that data has been enqueued + Set<String> keysQueue = clientSite2.invoke(() -> doPutsInRange(0, 15)); + server1Site2.invoke(() -> checkQueueSize("ln", keysQueue.size())); + + // stop servers on site #2 + server1Site2.stop(false); + server2Site2.stop(false); + + Thread thread = new Thread( + () -> server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort())); + Thread thread1 = new Thread( + () -> server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort())); + // start threads + thread.start(); + thread1.start(); + thread.join(); + thread.join(); + + verifyGatewaySenderState(true, true); + + // Do some puts and check that data has been enqueued, previous queue data should not be lost + Set<String> keysQueue1 = clientSite2.invoke(() -> doPutsInRange(20, 35)); + server1Site2.invoke(() -> checkQueueSize("ln", keysQueue1.size() + keysQueue.size())); + + executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + // check that queue is empty + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + // check that data replicated to remote site + clientSite1.invoke(() -> checkDataAvailable(keysQueue1)); + clientSite1.invoke(() -> checkDataAvailable(keysQueue)); + } + + /** + * Verify that gateway-sender is recovered from redundant server after the + * restart of member. + * + * - Region type: PARTITION and redundant + * - Gateway sender configured without queue persistence + * + * 1. Pause gateway-sender + * 2. Run some traffic and verify that data is enqueued in gateway-sender queues + * 3. Restart one server that host gateway-sender + * 4. Run some traffic and verify that data is enqueued in gateway-sender queues, old + * data should not be lost from queue after servers are restarted because of redundancy + * 5. Resume gateway-sender + * 6. Verify that queued traffic is sent over the gateway-sender to remote site + */ + @Test + @Parameters({"true", "false"}) + public void testThatPauseStateRemainAfterRestartOneServerRedundant(String isParallel) + throws Exception { + configureSites(false, "PARTITION", "1", isParallel); + + executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER); + verifyGatewaySenderState(true, true); + server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + + // Do some puts and check that data has been enqueued + Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15)); + server1Site2.invoke(() -> checkQueueSize("ln", keys1.size())); + + // stop server on site #2 + server1Site2.stop(false); + + // start again server in Site #2 + server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort()); + + verifyGatewaySenderState(true, true); + + // Do some puts and check that data has been enqueued, previous queue should not be lost + // due to configured redundancy + Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35)); + server1Site2.invoke(() -> checkQueueSize("ln", keys.size() + keys1.size())); + + executeGfshCommand(CliStrings.RESUME_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + // check that queue is empty + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + // check that data replicated to other site + clientSite1.invoke(() -> checkDataAvailable(keys)); + clientSite1.invoke(() -> checkDataAvailable(keys1)); + } + + /** + * Verify that gateway-sender state is persisted after stop and start gateway-sender + * commands are executed, and that gateway-sender works as expected after member restart: + * + * - Region type: PARTITION and non-redundant + * - Gateway sender configured without queue persistence + * + * 1. Stop gateway-sender + * 2. Run some traffic and verify that data is stored in partition region, and not replicated to + * the other site + * 3. Restart servers that host gateway-sender + * 4. Run some traffic and verify that partition region is recovered and that data is not + * replicated to the other site + * 5. Start gateway-sender + * 6. Run some traffic and verify that traffic is sent over the gateway-sender to remote site + */ + @Test + @Parameters({"true", "false"}) + public void testThatStopStateRemainAfterTheRestartOfMembers(String isParallel) throws Exception { + configureSites(false, "PARTITION", "0", isParallel); + + executeGfshCommand(CliStrings.STOP_GATEWAYSENDER); + verifyGatewaySenderState(false, false); + + Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15)); + clientSite2.invoke(() -> checkDataAvailable(keys1)); + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + + // stop servers on site #2 + server1Site2.stop(false); + server2Site2.stop(false); + + // start again servers in Site #2 + server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort()); + server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort()); + + verifyGatewaySenderState(false, false); + + Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35)); + clientSite2.invoke(() -> checkDataAvailable(keys)); + + executeGfshCommand(CliStrings.START_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55)); + clientSite2.invoke(() -> checkDataAvailable(keys3)); + clientSite1.invoke(() -> checkDataAvailable(keys3)); + } + + /** + * Verify that colocated partition regions (gws queue and region) are created + * after servers are restarted and gateway-sender is created in stopped state. + * + * - Region type: PARTITION_PERSISTENT and non-redundant + * - Gateway sender configured with queue persistence + * + * 1. Stop gateway-sender + * 2. Run some traffic and verify that data is stored in partition region, and not replicated to + * the remote site + * 3. Restart servers that host gateway-sender + * 4. Run some traffic and verify that partition region is recovered and that data is not + * replicated to the other site + * 5. Start gateway-sender + * 6. Run some traffic and verify that onl latest traffic is sent over the gateway-sender + * to remote site + */ + @Test + @Parameters({"true", "false"}) + public void testThatStopStateRemainAfterTheRestartOfMembersAndAllRegionsRecover(String isParallel) + throws Exception { + configureSites(true, "PARTITION_PERSISTENT", "0", isParallel); + + executeGfshCommand(CliStrings.STOP_GATEWAYSENDER); + verifyGatewaySenderState(false, false); + + Set<String> keys1 = clientSite2.invoke(() -> doPutsInRange(0, 15)); + clientSite2.invoke(() -> checkDataAvailable(keys1)); + + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + + // stop servers on site #2 + server1Site2.stop(false); + server2Site2.stop(false); + + Thread thread = new Thread( + () -> server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort())); + Thread thread1 = new Thread( + () -> server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort())); + // start threads + thread.start(); + thread1.start(); + thread.join(); + thread.join(); + + verifyGatewaySenderState(false, false); + + // check that partition region is created and that accepts new traffic + Set<String> keys = clientSite2.invoke(() -> doPutsInRange(20, 35)); + clientSite2.invoke(() -> checkDataAvailable(keys)); + + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + + executeGfshCommand(CliStrings.START_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + Set<String> keys3 = clientSite2.invoke(() -> doPutsInRange(40, 55)); + clientSite2.invoke(() -> checkDataAvailable(keys3)); + clientSite1.invoke(() -> checkDataAvailable(keys3)); + } + + /** + * Verify that parallel gateway-sender queue recovers only data enqueued prior to stop command + * from disk-store. + * + * - Region type: PARTITION_PERSISTENT and non-redundant + * - Gateway sender configured with queue persistence + * + * 1. Pause gateway-sender + * 2. Run some traffic and verify that data is enqueued in gateway-sender queues + * 3. Stop gateway-sender + * 4. Run some traffic and verify that data is stored in region, and not enqueued + * 6. Restart all servers + * 7. Check that data that is enqueued prior to stop command is recovered from persistent storage, + * and that gateway-sender remained in stopped state + * 8. Start gateway-senders + * 9. Check that data is not replicated to remote site + */ + @Test + @Parameters({"false", "true"}) + public void testThatStopStateRemainAfterTheRestartAndQueueDataIsRecovered(String isParallel) + throws Exception { + configureSites(true, "PARTITION_PERSISTENT", "0", isParallel); + + executeGfshCommand(CliStrings.PAUSE_GATEWAYSENDER); + verifyGatewaySenderState(true, true); + server2Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + server1Site2.invoke(() -> waitAllDispatcherThreadsToPause("ln")); + + Set<String> keysQueued = clientSite2.invoke(() -> doPutsInRange(70, 85)); + clientSite2.invoke(() -> checkDataAvailable(keysQueued)); + server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size())); + + executeGfshCommand(CliStrings.STOP_GATEWAYSENDER); + + verifyGatewaySenderState(false, isParallel.equals("true")); + + Set<String> keysNotQueued = clientSite2.invoke(() -> doPutsInRange(100, 105)); + clientSite2.invoke(() -> checkDataAvailable(keysNotQueued)); + + // stop servers on site #2 + server1Site2.stop(false); + server2Site2.stop(false); + + // start again servers in Site #2 + Thread thread = new Thread( + () -> server1Site2 = clusterStartupRule.startServerVM(5, locatorSite2.getPort())); + Thread thread1 = new Thread( + () -> server2Site2 = clusterStartupRule.startServerVM(6, locatorSite2.getPort())); + // start threads + thread.start(); + thread1.start(); + thread.join(); + thread1.join(); + + verifyGatewaySenderState(false, false); + if (isParallel.equals("true")) { + server1Site2.invoke(() -> checkQueueSize("ln", keysQueued.size())); + } + executeGfshCommand(CliStrings.START_GATEWAYSENDER); + verifyGatewaySenderState(true, false); + + // Check that data is sent over the gateway-sender + server1Site2.invoke(() -> checkQueueSize("ln", 0)); + clientSite1.invoke(() -> checkDataAvailable(keysQueued)); + clientSite1.invoke(() -> checkDataNotAvailable(keysNotQueued)); + } + + /** + * This test case verifies that the server during recovery of gateway-sender in stopped state + * stores events in tmpDroppedEvents queue, and handles them after sender has been recovered. + */ Review comment: Yeah, I tried to do that but unfortunately unsuccessfully. The only way I could do this it to create hook (that stores events) on the place where events are stored in tmpDroppedEvents queue, and collect and check them afterwards. Not sure if this will be acceptable to you? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
