This is an automated email from the ASF dual-hosted git repository.

mivanac pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/geode.git


The following commit(s) were added to refs/heads/develop by this push:
     new c4e5a034d8 GEODE-10421: Improve start gw sender with clean-queue 
(#7856)
c4e5a034d8 is described below

commit c4e5a034d8cccb0a2814221d0cd3a8c5242e913d
Author: Mario Ivanac <48509724+miva...@users.noreply.github.com>
AuthorDate: Fri Sep 16 10:40:20 2022 +0200

    GEODE-10421: Improve start gw sender with clean-queue (#7856)
    
    * GEODE-10421: added check gw status
    
    * GEODE-10421: added TC
    
    * GEODE-10421: add document impacts
    
    * GEODE-10421: update after comments
---
 .../geode/management/internal/i18n/CliStrings.java |  7 ++
 .../gfsh/command-pages/start.html.md.erb           |  2 +-
 .../cli/commands/StartGatewaySenderCommand.java    | 38 +++++++++
 .../StartGatewaySenderCommandDUnitTest.java        | 91 ++++++++++++++++++++++
 4 files changed, 137 insertions(+), 1 deletion(-)

diff --git 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
index f533c77960..3734eedce4 100644
--- 
a/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
+++ 
b/geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
@@ -3007,6 +3007,13 @@ public class CliStrings {
   public static final String GATEWAY_RECEIVER_IS_NOT_AVAILABLE_ON_MEMBER_0 =
       "GatewayReceiver is not available on member {0}";
 
+  public static final String START_GATEWAYSENDER_REJECTED = "Command rejected. 
Reasons:";
+
+  public static final String REJECT_START_GATEWAYSENDER_REASON = "Reasons 
command is rejected";
+
+  public static final String EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS =
+      "Command must be executed on all members on which gateway sender is 
created";
+
   public static final String GATEWAY_SENDER_IS_NOT_AVAILABLE = "GatewaySender 
is not available";
   public static final String GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1 =
       "GatewaySender {0} is already started on member {1}";
diff --git a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb 
b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
index f25b6b5be0..b41bfb02bb 100644
--- a/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
+++ b/geode-docs/tools_modules/gfsh/command-pages/start.html.md.erb
@@ -121,7 +121,7 @@ start gateway-sender --id=value [--groups=value(,value)*] 
[--members=value(,valu
 | &#8209;&#8209;id           | *Required.* ID of the GatewaySender.            
          | |
 | &#8209;&#8209;groups       | Group(s) of members on which to start the 
Gateway Sender. | |
 | &#8209;&#8209;members      | Member(s) on which to start the Gateway Sender. 
| |
-| &#8209;&#8209;clean-queues | Option to clean existing queue at start of the 
Gateway Sender. This option is only applicable for Gateway Senders with enabled 
persistence. | false |
+| &#8209;&#8209;clean-queues | Option to clean existing queue at start of the 
Gateway Sender. This option can be executed only on all members on which 
Gateway Sender is created. | false |
 
 **Example Commands:**
 
diff --git 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
index 996e85be8e..d02277d4ac 100644
--- 
a/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
+++ 
b/geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/StartGatewaySenderCommand.java
@@ -77,6 +77,44 @@ public class StartGatewaySenderCommand extends GfshCommand {
       return ResultModel.createError(CliStrings.NO_MEMBERS_FOUND_MESSAGE);
     }
 
+    if (cleanQueues) {
+
+      GatewaySenderMXBean bean;
+      boolean commandRejected = false;
+
+      ResultModel rejectResultModel =
+          ResultModel.createError(CliStrings.START_GATEWAYSENDER_REJECTED);
+      TabularResultModel rejectResultData =
+          
rejectResultModel.addTable(CliStrings.REJECT_START_GATEWAYSENDER_REASON);
+
+      Set<DistributedMember> allServers = findMembers(null, null);
+
+      for (DistributedMember member : allServers) {
+        if 
(cache.getDistributedSystem().getDistributedMember().getId().equals(member.getId()))
 {
+          bean = service.getLocalGatewaySenderMXBean(senderId);
+        } else {
+          ObjectName objectName = service.getGatewaySenderMBeanName(member, 
senderId);
+          bean = service.getMBeanProxy(objectName, GatewaySenderMXBean.class);
+        }
+        if (bean != null) {
+          if (!dsMembers.contains(member)) {
+            return 
ResultModel.createError(CliStrings.EXECUTE_ON_ALL_GATEWAYSENDER_MEMBERS);
+          }
+
+          if (bean.isRunning()) {
+            commandRejected = true;
+            rejectResultData.addMemberStatusResultRow(member.getId(), 
CliStrings.GATEWAY_ERROR,
+                CliStrings.format(
+                    
CliStrings.GATEWAY_SENDER_0_IS_ALREADY_STARTED_ON_MEMBER_1, id,
+                    member.getId()));
+          }
+        }
+      }
+      if (commandRejected) {
+        return rejectResultModel;
+      }
+    }
+
     ExecutorService execService =
         LoggingExecutors.newCachedThreadPool("Start Sender Command Thread ", 
true);
 
diff --git 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
index edff65d582..b41ac9f916 100644
--- 
a/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
+++ 
b/geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/wancommand/StartGatewaySenderCommandDUnitTest.java
@@ -19,6 +19,7 @@ import static 
org.apache.geode.distributed.ConfigurationProperties.GROUPS;
 import static 
org.apache.geode.distributed.ConfigurationProperties.REMOTE_LOCATORS;
 import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.createSender;
 import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.getMember;
+import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.startSender;
 import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.validateGatewaySenderMXBeanProxy;
 import static 
org.apache.geode.internal.cache.wan.wancommand.WANCommandUtils.verifySenderState;
 import static org.assertj.core.api.Assertions.assertThat;
@@ -431,6 +432,96 @@ public class StartGatewaySenderCommandDUnitTest implements 
Serializable {
     server3.invoke(() -> verifySenderState("ln", true, false));
   }
 
+  @Test
+  public void 
testStartGatewaySender_clean_queues_sender_on_one_server_allready_started()
+      throws Exception {
+    Integer locator1Port = locatorSite1.getPort();
+
+    // setup servers in Site #1
+    server1 = clusterStartupRule.startServerVM(3, locator1Port);
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+    server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+
+    server1.invoke(() -> startSender("ln"));
+
+    server1.invoke(() -> verifySenderState("ln", true, false));
+    server2.invoke(() -> verifySenderState("ln", false, false));
+    server3.invoke(() -> verifySenderState("ln", false, false));
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", true, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), 
"ln", false, false));
+
+    String command =
+        CliStrings.START_GATEWAYSENDER + " --" + 
CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+            + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + "=true";
+    CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+    assertThat(cmdResult).isNotNull();
+    assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);
+
+    TabularResultModel resultData = cmdResult.getResultData()
+        .getTableSection(CliStrings.REJECT_START_GATEWAYSENDER_REASON);
+    List<String> status = resultData.getValuesInColumn("Result");
+    assertThat(status).containsExactlyInAnyOrder("Error");
+
+  }
+
+
+  @Test
+  public void testStartGatewaySender_clean_queues_on_one_member() throws 
Exception {
+    Integer locator1Port = locatorSite1.getPort();
+
+    // setup servers in Site #1
+    server1 = clusterStartupRule.startServerVM(3, locator1Port);
+    server2 = clusterStartupRule.startServerVM(4, locator1Port);
+    server3 = clusterStartupRule.startServerVM(5, locator1Port);
+
+    DistributedMember vm1Member = getMember(server1.getVM());
+
+    server1.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server2.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+    server3.invoke(() -> createSender("ln", 2, false, 100, 400, false, false, 
null, true));
+
+    server1.invoke(() -> verifySenderState("ln", false, false));
+    server2.invoke(() -> verifySenderState("ln", false, false));
+    server3.invoke(() -> verifySenderState("ln", false, false));
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), 
"ln", false, false));
+
+    String command =
+        CliStrings.START_GATEWAYSENDER + " --" + 
CliStrings.START_GATEWAYSENDER__ID + "=ln --"
+            + CliStrings.START_GATEWAYSENDER__CLEAN_QUEUE + CliStrings.MEMBER 
+ "="
+            + vm1Member.getId();
+    CommandResult cmdResult = executeCommandWithIgnoredExceptions(command);
+    assertThat(cmdResult).isNotNull();
+    assertThat(cmdResult.getStatus()).isSameAs(Result.Status.ERROR);
+
+
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server1.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server2.getVM()), 
"ln", false, false));
+    locatorSite1.invoke(
+        () -> validateGatewaySenderMXBeanProxy(getMember(server3.getVM()), 
"ln", false, false));
+
+    server1.invoke(() -> verifySenderState("ln", false, false));
+    server2.invoke(() -> verifySenderState("ln", false, false));
+    server3.invoke(() -> verifySenderState("ln", false, false));
+  }
+
+
 
   private CommandResult executeCommandWithIgnoredExceptions(String command) 
throws Exception {
     try (IgnoredException ie = IgnoredException.addIgnoredException("Could not 
connect")) {

Reply via email to