kirklund commented on a change in pull request #6833:
URL: https://github.com/apache/geode/pull/6833#discussion_r709494837



##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/i18n/CliStrings.java
##########
@@ -1426,6 +1426,55 @@
   public static final String EXPORT_DATA__SUCCESS__MESSAGE =
       "Data successfully exported from region : {0} to file : {1} on host : 
{2}";
 
+  /* 'wan-copy region' command */
+  public static final String WAN_COPY_REGION = "wan-copy region";
+  public static final String WAN_COPY_REGION__HELP =
+      "Copy a region with a senderId via WAN replication";
+  public static final String WAN_COPY_REGION__REGION = "region";
+  public static final String WAN_COPY_REGION__REGION__HELP =
+      "Region from which data will be exported.";
+  public static final String WAN_COPY_REGION__SENDERID = "sender-id";
+  public static final String WAN_COPY_REGION__SENDERID__HELP =
+      "Sender Id to use to copy the region.";
+  public static final String WAN_COPY_REGION__MAXRATE = "max-rate";
+  public static final String WAN_COPY_REGION__MAXRATE__HELP =
+      "Maximum rate for copying in entries per second.";
+  public static final String WAN_COPY_REGION__BATCHSIZE = "batch-size";
+  public static final String WAN_COPY_REGION__BATCHSIZE__HELP =
+      "Number of entries to be copied in each batch.";
+  public static final String WAN_COPY_REGION__CANCEL = "cancel";
+  public static final String WAN_COPY_REGION__CANCEL__HELP =
+      "Cancel an ongoing wan-copy region command";
+  public static final String WAN_COPY_REGION__MSG__REGION__NOT__FOUND = 
"Region {0} not found";
+  public static final String WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER =
+      "Region {0} is not configured to use sender {1}";
+  public static final String WAN_COPY_REGION__MSG__SENDER__NOT__FOUND = 
"Sender {0} not found";
+  public static final String 
WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY =
+      "Sender {0} is serial and not primary. 0 entries copied.";
+  public static final String WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING =
+      "Sender {0} is not running";
+  public static final String WAN_COPY_REGION__MSG__EXECUTION__CANCELED = 
"Execution canceled";
+  public static final String WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED =
+      "Executions canceled: {0}";
+  public static final String WAN_COPY_REGION__MSG__EXECUTION__FAILED =
+      "Execution failed. Error: {0}";
+  public static final String WAN_COPY_REGION__MSG__NO__CONNECTION__POOL =
+      "No connection pool available to receiver";
+  public static final String 
WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE =
+      "Command not supported at remote site.";
+  public static final String WAN_COPY_REGION__MSG__NO__CONNECTION =
+      "No connection available to receiver after having copied {0} entries";
+  public static final String 
WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED =
+      "Error ({0}) in operation after having copied {1} entries";
+  public static final String 
WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED =
+      "Operation canceled before having copied all entries";
+  public static final String WAN_COPY_REGION__MSG__COPIED__ENTRIES = "Entries 
copied: {0}";
+  public static final String WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND =
+      "No running command to be canceled for region {0} and sender {1}";
+  public static final String WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND =
+      "There is already a command running for region {0} and sender {1}";
+
+

Review comment:
       Let's move these constants into the `WanCopyRegionCommand` in geode-wan.

##########
File path: 
geode-core/src/distributedTest/java/org/apache/geode/management/internal/security/MultiGfshDUnitTest.java
##########
@@ -52,6 +52,9 @@ public void testMultiUser() throws Exception {
     IgnoredException.addIgnoredException("java.util.zip.ZipException: zip file 
is empty");
     IgnoredException
         .addIgnoredException("java.lang.IllegalStateException: WAN service is 
not available.");
+    IgnoredException
+        .addIgnoredException(
+            
"org.apache.geode.internal.cache.wan.GatewaySenderConfigurationException: 
Locators must be configured before starting gateway-sender.");
     int jmxPort = server.getJmxPort();

Review comment:
       Why does this test in geode-core need to ignore 
GatewaySenderConfigurationException?

##########
File path: 
geode-wan/src/main/java/org/apache/geode/cache/wan/internal/cli/commands/WanCopyRegionCommand.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
+import 
org.apache.geode.management.internal.cli.functions.WanCopyRegionFunction;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.security.ResourcePermission.Operation;
+import org.apache.geode.security.ResourcePermission.Resource;
+
+public class WanCopyRegionCommand extends GfshCommand {
+  private final WanCopyRegionFunction wanCopyRegionFunction = new 
WanCopyRegionFunction();
+
+  @CliCommand(value = CliStrings.WAN_COPY_REGION, help = 
CliStrings.WAN_COPY_REGION__HELP)
+  @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA, 
CliStrings.TOPIC_GEODE_REGION})
+  public ResultModel wanCopyRegion(
+      @CliOption(key = CliStrings.WAN_COPY_REGION__REGION, mandatory = true,
+          optionContext = ConverterHint.REGION_PATH,
+          help = CliStrings.WAN_COPY_REGION__REGION__HELP) String regionName,
+      @CliOption(key = CliStrings.WAN_COPY_REGION__SENDERID, mandatory = true,
+          optionContext = ConverterHint.GATEWAY_SENDER_ID,
+          help = CliStrings.WAN_COPY_REGION__SENDERID__HELP) String senderId,
+      @CliOption(key = CliStrings.WAN_COPY_REGION__MAXRATE,
+          unspecifiedDefaultValue = "0",
+          help = CliStrings.WAN_COPY_REGION__MAXRATE__HELP) long maxRate,
+      @CliOption(key = CliStrings.WAN_COPY_REGION__BATCHSIZE,
+          unspecifiedDefaultValue = "1000",
+          help = CliStrings.WAN_COPY_REGION__BATCHSIZE__HELP) int batchSize,
+      @CliOption(key = CliStrings.WAN_COPY_REGION__CANCEL,
+          unspecifiedDefaultValue = "false",
+          specifiedDefaultValue = "true",
+          help = CliStrings.WAN_COPY_REGION__CANCEL__HELP) boolean isCancel) {
+
+    authorize(Resource.DATA, Operation.WRITE, regionName);
+    final Object[] args = {regionName, senderId, isCancel, maxRate, batchSize};
+    ResultCollector<?, ?> resultCollector =
+        executeFunction(wanCopyRegionFunction, args, getAllNormalMembers());
+    final List<CliFunctionResult> cliFunctionResults =
+        getCliFunctionResults((List<CliFunctionResult>) 
resultCollector.getResult());
+    return ResultModel.createMemberStatusResult(cliFunctionResults, false, 
false);
+  }
+
+  private List<CliFunctionResult> 
getCliFunctionResults(List<CliFunctionResult> resultsObjects) {
+    final List<CliFunctionResult> cliFunctionResults = new ArrayList<>();
+    for (Object r : resultsObjects) {

Review comment:
       I would rename `r` to be `result`.

##########
File path: 
geode-wan/src/distributedTest/java/org/apache/geode/internal/cache/wan/WANTestBase.java
##########
@@ -233,6 +237,7 @@ public static void beforeClassWANTestBase() throws 
Exception {
     vm5 = getHost(0).getVM(5);
     vm6 = getHost(0).getVM(6);
     vm7 = getHost(0).getVM(7);
+    vm8 = getHost(0).getVM(8);

Review comment:
       You can add additional VMs at anytime during any DUnit test or the setUp 
methods of that DUnit class. Creating it eagerly here within the base class 
will create an unneeded VM for every test that subclasses `WANTestBase`.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;

Review comment:
       Let's replace this factory with:
   ```java
   private final WanCopyRegionFunctionServiceProvider serviceProvider;
   ```
   

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+              
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+                  regionName, sender.getId()));
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+          try {
+            return wanCopyRegion(context, region, sender, maxRate, batchSize);
+          } catch (InterruptedException e) {
+            return new CliFunctionResult(context.getMemberName(),
+                CliFunctionResult.StatusState.ERROR,
+                
CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+          }
+        }, executor);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region<?, 
?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws 
InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    final InternalCache cache = (InternalCache) context.getCache();
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, 
entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(context, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(context, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(FunctionContext<Object[]> 
context,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(context, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, 
GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) 
region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) 
br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> 
context,
+      String regionName, String senderId) {
+    Future<CliFunctionResult> execution = 
executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    execution.cancel(true);
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> 
context) {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, 
executionsString));
+  }
+
+  private static String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  public static int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }
+
+  @VisibleForTesting
+  long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+    if (maxRate == 0) {
+      return 0;
+    }
+    final long elapsedMs = clock.millis() - startTime;
+    if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= 
maxRate) {
+      return 0;
+    }
+    final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+    return targetElapsedMs - elapsedMs;
+  }
+
+
+  static class ConnectionState {
+    private volatile Connection connection = null;
+    private volatile PoolImpl senderPool = null;
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PoolImpl getSenderPool() {
+      return senderPool;
+    }
+
+    public Optional<CliFunctionResult> 
connectIfNeeded(FunctionContext<Object[]> context,
+        GatewaySender sender) {
+      if (senderPool == null) {
+        senderPool = ((AbstractGatewaySender) sender).getProxy();
+        if (senderPool == null) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+        }
+        connection = senderPool.acquireConnection();
+        if (connection.getWanSiteVersion() < 
KnownVersion.GEODE_1_15_0.ordinal()) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+        }
+      }
+      return Optional.empty();
+    }
+
+    public Optional<CliFunctionResult> reconnect(FunctionContext<Object[]> 
context, int retries,
+        int copiedEntries, Exception e) {
+      close();
+      if (retries >= MAX_BATCH_SEND_RETRIES) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                "Connection error", copiedEntries)));
+      }
+      logger.error("Exception {} in sendBatch. Retrying", 
e.getClass().getName());
+      try {
+        connection = senderPool.acquireConnection();
+      } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+                copiedEntries)));
+      }
+      return Optional.empty();
+    }
+
+    public void close() {
+      if (senderPool != null && connection != null) {
+        try {
+          connection.close(false);
+        } catch (Exception e) {
+          logger.error("Error closing the connection used to wan-copy region 
entries");
+        }
+        senderPool.returnConnection(connection);
+      }
+      connection = null;
+    }
+  }
+
+
+  static class ThreadSleeper implements Serializable {
+    void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  }

Review comment:
       I would recommend just making this an `interface` annotated with 
`@FunctionalInterface`:
   ```java
   @FunctionalInterface
   interface ThreadSleeper extends Serializable {
     void sleep(long millis) throws InterruptedException;
   }
   ```

##########
File path: 
geode-wan/src/integrationTest/java/org/apache/geode/cache/wan/internal/cli/commands/GfshParserAutoCompletionIntegrationTest.java
##########
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.cache.wan.internal.cli.commands;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.test.junit.categories.GfshTest;
+import org.apache.geode.test.junit.rules.GfshParserRule;
+import org.apache.geode.test.junit.rules.GfshParserRule.CommandCandidate;
+
+@Category(GfshTest.class)
+public class GfshParserAutoCompletionIntegrationTest {

Review comment:
       I would rename this test to be more specific to geode-wan such as 
`WanCommandAutoCompletionIntegrationTest`.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/cache/wan/internal/WanCopyRegionFunctionService.java
##########
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.cache.wan.internal;
+
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.internal.cache.CacheService;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
+
+public class WanCopyRegionFunctionService implements CacheService {
+
+  private ExecutorService wanCopyRegionFunctionExecutionPool;
+
+  @Override
+  public boolean init(Cache cache) {
+    String WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX =
+        "WAN Copy Region Function Execution Processor";
+    int WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS = 10;
+    wanCopyRegionFunctionExecutionPool = LoggingExecutors
+        .newFixedThreadPool(WAN_COPY_REGION_FUNCTION_MAX_CONCURRENT_THREADS,
+            WAN_COPY_REGION_FUNCTION_EXECUTION_PROCESSOR_THREAD_PREFIX, true);
+    return true;
+  }
+
+  @Override
+  public Class<? extends CacheService> getInterface() {
+    return WanCopyRegionFunctionService.class;
+  }
+
+  @Override
+  public CacheServiceMBeanBase getMBean() {
+    return null;
+  }
+
+  @Override
+  public void close() {
+    wanCopyRegionFunctionExecutionPool.shutdownNow();
+    try {
+      if (!wanCopyRegionFunctionExecutionPool.awaitTermination(5, 
TimeUnit.SECONDS)) {
+        wanCopyRegionFunctionExecutionPool.shutdownNow();
+      }
+    } catch (InterruptedException ie) {
+      wanCopyRegionFunctionExecutionPool.shutdownNow();
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  public Executor getExecutor() {
+    return wanCopyRegionFunctionExecutionPool;
+  }

Review comment:
       The method `getExecutor()` is being called by threads which are 
different from the thread that invokes `init`, so my recommendation is to make 
the field `wanCopyRegionFunctionExecutionPool` be an `AtomicReference` to 
ensure thread visibility:
   ```java
   private final AtomicReference<ExecutorService> 
wanCopyRegionFunctionExecutionPool = new AtomicReference<>();
   ```

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+              
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+                  regionName, sender.getId()));
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+          try {
+            return wanCopyRegion(context, region, sender, maxRate, batchSize);
+          } catch (InterruptedException e) {
+            return new CliFunctionResult(context.getMemberName(),
+                CliFunctionResult.StatusState.ERROR,
+                
CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+          }
+        }, executor);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region<?, 
?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws 
InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    final InternalCache cache = (InternalCache) context.getCache();
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, 
entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(context, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(context, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(FunctionContext<Object[]> 
context,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(context, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, 
GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) 
region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) 
br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> 
context,
+      String regionName, String senderId) {
+    Future<CliFunctionResult> execution = 
executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    execution.cancel(true);
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> 
context) {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, 
executionsString));
+  }
+
+  private static String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  public static int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }
+
+  @VisibleForTesting
+  long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+    if (maxRate == 0) {
+      return 0;
+    }
+    final long elapsedMs = clock.millis() - startTime;
+    if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= 
maxRate) {
+      return 0;
+    }
+    final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+    return targetElapsedMs - elapsedMs;
+  }
+
+
+  static class ConnectionState {
+    private volatile Connection connection = null;
+    private volatile PoolImpl senderPool = null;
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PoolImpl getSenderPool() {
+      return senderPool;
+    }
+
+    public Optional<CliFunctionResult> 
connectIfNeeded(FunctionContext<Object[]> context,
+        GatewaySender sender) {
+      if (senderPool == null) {
+        senderPool = ((AbstractGatewaySender) sender).getProxy();
+        if (senderPool == null) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+        }
+        connection = senderPool.acquireConnection();
+        if (connection.getWanSiteVersion() < 
KnownVersion.GEODE_1_15_0.ordinal()) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+        }
+      }
+      return Optional.empty();
+    }
+
+    public Optional<CliFunctionResult> reconnect(FunctionContext<Object[]> 
context, int retries,
+        int copiedEntries, Exception e) {
+      close();
+      if (retries >= MAX_BATCH_SEND_RETRIES) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                "Connection error", copiedEntries)));
+      }
+      logger.error("Exception {} in sendBatch. Retrying", 
e.getClass().getName());
+      try {
+        connection = senderPool.acquireConnection();
+      } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+                copiedEntries)));
+      }
+      return Optional.empty();
+    }
+
+    public void close() {
+      if (senderPool != null && connection != null) {
+        try {
+          connection.close(false);
+        } catch (Exception e) {
+          logger.error("Error closing the connection used to wan-copy region 
entries");
+        }
+        senderPool.returnConnection(connection);
+      }
+      connection = null;
+    }
+  }
+
+
+  static class ThreadSleeper implements Serializable {
+    void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  }
+
+
+  interface EventCreator {
+    GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
+  }
+
+  static class EventCreatorImpl implements EventCreator, Serializable {

Review comment:
       Remove `Serializable` from this class. We'll move `Serializable` to the 
`EventCreator` interface instead.

##########
File path: 
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/CommandAvailabilityIndicator.java
##########
@@ -50,6 +50,7 @@
       CliStrings.STOP_GATEWAYRECEIVER, CliStrings.LIST_GATEWAY, 
CliStrings.STATUS_GATEWAYSENDER,
       CliStrings.STATUS_GATEWAYRECEIVER, CliStrings.LOAD_BALANCE_GATEWAYSENDER,
       CliStrings.DESTROY_GATEWAYSENDER, 
AlterAsyncEventQueueCommand.COMMAND_NAME,
+      CliStrings.WAN_COPY_REGION,

Review comment:
       Delete `WAN_COPY_REGION` here and add this method to 
`WanCopyRegionCommand`:
   ```java
     @CliAvailabilityIndicator({WAN_COPY_REGION})
     public boolean commandAvailable() {
       return isOnlineCommandAvailable();
     }
   
   ```

##########
File path: 
geode-gfsh/src/main/resources/META-INF/services/org.springframework.shell.core.CommandMarker
##########
@@ -115,4 +115,4 @@ 
org.apache.geode.management.internal.cli.commands.lifecycle.StartVsdCommand
 
org.apache.geode.management.internal.cli.commands.lifecycle.StatusLocatorCommand
 org.apache.geode.management.internal.cli.commands.lifecycle.StatusServerCommand
 org.apache.geode.management.internal.cli.commands.lifecycle.StopLocatorCommand
-org.apache.geode.management.internal.cli.commands.lifecycle.StopServerCommand
\ No newline at end of file
+org.apache.geode.management.internal.cli.commands.lifecycle.StopServerCommand

Review comment:
       Delete the blank line at the end of the file.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();

Review comment:
       Next, move `executions` to `WanCopyRegionFunctionService` and add a 
`getExecutions()` method. There is only one instance of 
`WanCopyRegionFunctionService` so the `executions` can live there as a 
non-static field.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+              
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+                  regionName, sender.getId()));
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+          try {
+            return wanCopyRegion(context, region, sender, maxRate, batchSize);
+          } catch (InterruptedException e) {
+            return new CliFunctionResult(context.getMemberName(),
+                CliFunctionResult.StatusState.ERROR,
+                
CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+          }
+        }, executor);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region<?, 
?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws 
InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    final InternalCache cache = (InternalCache) context.getCache();
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, 
entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(context, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(context, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(FunctionContext<Object[]> 
context,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(context, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, 
GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) 
region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) 
br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> 
context,
+      String regionName, String senderId) {
+    Future<CliFunctionResult> execution = 
executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    execution.cancel(true);
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> 
context) {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, 
executionsString));
+  }
+
+  private static String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  public static int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }

Review comment:
       `getNumberOfCurrentExecutions` should probably be moved to the service.

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }

Review comment:
       These two constructors would then change to:
   ```java
   public WanCopyRegionFunction() {
     this(Clock.systemDefaultZone(),
         Thread::sleep,
         cache -> cache.getService(WanCopyRegionFunctionService.class),
         new EventCreatorImpl());
   }
   
   @VisibleForTesting
   WanCopyRegionFunction(Clock clock, 
       ThreadSleeper threadSleeper,
       WanCopyRegionFunctionServiceProvider serviceProvider,
       EventCreator eventCreator) {
     this.clock = clock;
     this.threadSleeper = threadSleeper;
     this.serviceProvider = serviceProvider;
     this.eventCreator = eventCreator;
   }
   ```

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+              
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+                  regionName, sender.getId()));
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+          try {
+            return wanCopyRegion(context, region, sender, maxRate, batchSize);
+          } catch (InterruptedException e) {
+            return new CliFunctionResult(context.getMemberName(),
+                CliFunctionResult.StatusState.ERROR,
+                
CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+          }
+        }, executor);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region<?, 
?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws 
InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    final InternalCache cache = (InternalCache) context.getCache();
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, 
entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(context, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(context, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(FunctionContext<Object[]> 
context,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(context, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, 
GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) 
region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) 
br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> 
context,
+      String regionName, String senderId) {
+    Future<CliFunctionResult> execution = 
executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    execution.cancel(true);
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> 
context) {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, 
executionsString));
+  }
+
+  private static String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  public static int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }
+
+  @VisibleForTesting
+  long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+    if (maxRate == 0) {
+      return 0;
+    }
+    final long elapsedMs = clock.millis() - startTime;
+    if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= 
maxRate) {
+      return 0;
+    }
+    final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+    return targetElapsedMs - elapsedMs;
+  }
+
+
+  static class ConnectionState {
+    private volatile Connection connection = null;
+    private volatile PoolImpl senderPool = null;
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PoolImpl getSenderPool() {
+      return senderPool;
+    }
+
+    public Optional<CliFunctionResult> 
connectIfNeeded(FunctionContext<Object[]> context,
+        GatewaySender sender) {
+      if (senderPool == null) {
+        senderPool = ((AbstractGatewaySender) sender).getProxy();
+        if (senderPool == null) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+        }
+        connection = senderPool.acquireConnection();
+        if (connection.getWanSiteVersion() < 
KnownVersion.GEODE_1_15_0.ordinal()) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+        }
+      }
+      return Optional.empty();
+    }
+
+    public Optional<CliFunctionResult> reconnect(FunctionContext<Object[]> 
context, int retries,
+        int copiedEntries, Exception e) {
+      close();
+      if (retries >= MAX_BATCH_SEND_RETRIES) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                "Connection error", copiedEntries)));
+      }
+      logger.error("Exception {} in sendBatch. Retrying", 
e.getClass().getName());
+      try {
+        connection = senderPool.acquireConnection();
+      } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+                copiedEntries)));
+      }
+      return Optional.empty();
+    }
+
+    public void close() {
+      if (senderPool != null && connection != null) {
+        try {
+          connection.close(false);
+        } catch (Exception e) {
+          logger.error("Error closing the connection used to wan-copy region 
entries");
+        }
+        senderPool.returnConnection(connection);
+      }
+      connection = null;
+    }
+  }
+
+
+  static class ThreadSleeper implements Serializable {
+    void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  }
+
+
+  interface EventCreator {
+    GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
+  }

Review comment:
       Annotate this interface with @FunctionalInterface, and make the 
interface extend Serializable:
   ```java
   @FunctionalInterface
   interface EventCreator extends Serializable {
     GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
         InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
   }
   ```

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {
+        if (executions.containsKey(executionName)) {
+          return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+              
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__ALREADY__RUNNING__COMMAND,
+                  regionName, sender.getId()));
+        }
+        future = CompletableFuture.supplyAsync(() -> {
+          try {
+            return wanCopyRegion(context, region, sender, maxRate, batchSize);
+          } catch (InterruptedException e) {
+            return new CliFunctionResult(context.getMemberName(),
+                CliFunctionResult.StatusState.ERROR,
+                
CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+          }
+        }, executor);
+        executions.put(executionName, future);
+      }
+      return future.get();
+    } finally {
+      if (future != null) {
+        executions.remove(executionName);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region<?, 
?> region,
+      GatewaySender sender, long maxRate, int batchSize) throws 
InterruptedException {
+    ConnectionState connectionState = new ConnectionState();
+    int copiedEntries = 0;
+    final InternalCache cache = (InternalCache) context.getCache();
+    Iterator<?> entriesIter = getEntries(region, sender).iterator();
+    final long startTime = clock.millis();
+
+    try {
+      while (entriesIter.hasNext()) {
+        List<GatewayQueueEvent<?, ?>> batch =
+            createBatch((InternalRegion) region, sender, batchSize, cache, 
entriesIter);
+        if (batch.size() == 0) {
+          continue;
+        }
+        Optional<CliFunctionResult> connectionError =
+            connectionState.connectIfNeeded(context, sender);
+        if (connectionError.isPresent()) {
+          return connectionError.get();
+        }
+        Optional<CliFunctionResult> error =
+            sendBatch(context, sender, batch, connectionState, copiedEntries);
+        if (error.isPresent()) {
+          return error.get();
+        }
+        copiedEntries += batch.size();
+        doPostSendBatchActions(startTime, copiedEntries, maxRate);
+      }
+    } finally {
+      connectionState.close();
+    }
+
+    if (region.isDestroyed()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.format(
+              CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+              "Region destroyed",
+              copiedEntries));
+    }
+
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+            copiedEntries));
+  }
+
+  private Optional<CliFunctionResult> sendBatch(FunctionContext<Object[]> 
context,
+      GatewaySender sender, List<GatewayQueueEvent<?, ?>> batch,
+      ConnectionState connectionState, int copiedEntries) {
+    GatewaySenderEventDispatcher dispatcher =
+        ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+    int retries = 0;
+
+    while (true) {
+      try {
+        dispatcher.sendBatch(batch, connectionState.getConnection(),
+            connectionState.getSenderPool(), getAndIncrementBatchId(), true);
+        return Optional.empty();
+      } catch (BatchException70 e) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                e.getExceptions().get(0).getCause(), copiedEntries)));
+      } catch (ConnectionDestroyedException | ServerConnectivityException e) {
+        Optional<CliFunctionResult> error =
+            connectionState.reconnect(context, retries++, copiedEntries, e);
+        if (error.isPresent()) {
+          return error;
+        }
+      }
+    }
+  }
+
+  private List<GatewayQueueEvent<?, ?>> createBatch(InternalRegion region, 
GatewaySender sender,
+      int batchSize, InternalCache cache, Iterator<?> iter) {
+    int batchIndex = 0;
+    List<GatewayQueueEvent<?, ?>> batch = new ArrayList<>();
+
+    while (iter.hasNext() && batchIndex < batchSize) {
+      GatewayQueueEvent<?, ?> event =
+          eventCreator.createGatewaySenderEvent(cache, region, sender,
+              (Region.Entry<?, ?>) iter.next());
+      if (event != null) {
+        batch.add(event);
+        batchIndex++;
+      }
+    }
+    return batch;
+  }
+
+  private Set<?> getEntries(Region<?, ?> region, GatewaySender sender) {
+    if (region instanceof PartitionedRegion && sender.isParallel()) {
+      return ((PartitionedRegion) 
region).getDataStore().getAllLocalBucketRegions()
+          .stream()
+          .flatMap(br -> ((Set<?>) 
br.entrySet()).stream()).collect(Collectors.toSet());
+    }
+    return region.entrySet();
+  }
+
+  private CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]> 
context,
+      String regionName, String senderId) {
+    Future<CliFunctionResult> execution = 
executions.remove(getExecutionName(regionName, senderId));
+    if (execution == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+              regionName, senderId));
+    }
+    execution.cancel(true);
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+  }
+
+  private CliFunctionResult cancelAllWanCopyRegion(FunctionContext<Object[]> 
context) {
+    String executionsString = executions.keySet().toString();
+    for (Future<CliFunctionResult> execution : executions.values()) {
+      execution.cancel(true);
+    }
+    executions.clear();
+    return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+        
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTIONS__CANCELED, 
executionsString));
+  }
+
+  private static String getExecutionName(String regionName, String senderId) {
+    return "(" + regionName + "," + senderId + ")";
+  }
+
+  /**
+   * It runs the actions to be done after a batch has been
+   * sent: throw an interrupted exception if the operation was canceled and
+   * adjust the rate of copying by sleeping if necessary.
+   *
+   * @param startTime time at which the entries started to be copied
+   * @param copiedEntries number of entries copied so far
+   * @param maxRate maximum copying rate
+   */
+  @VisibleForTesting
+  void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+      throws InterruptedException {
+    long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+    if (sleepMs > 0) {
+      logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+          this.getClass().getSimpleName(), sleepMs);
+      threadSleeper.sleep(sleepMs);
+    } else {
+      if (Thread.currentThread().isInterrupted()) {
+        throw new InterruptedException();
+      }
+    }
+  }
+
+  private int getAndIncrementBatchId() {
+    if (batchId + 1 == Integer.MAX_VALUE) {
+      batchId = 0;
+    }
+    return batchId++;
+  }
+
+  @VisibleForTesting
+  public static int getNumberOfCurrentExecutions() {
+    return executions.size();
+  }
+
+  @VisibleForTesting
+  long getTimeToSleep(long startTime, int copiedEntries, long maxRate) {
+    if (maxRate == 0) {
+      return 0;
+    }
+    final long elapsedMs = clock.millis() - startTime;
+    if (elapsedMs != 0 && (copiedEntries * 1000.0) / (double) elapsedMs <= 
maxRate) {
+      return 0;
+    }
+    final long targetElapsedMs = (copiedEntries * 1000L) / maxRate;
+    return targetElapsedMs - elapsedMs;
+  }
+
+
+  static class ConnectionState {
+    private volatile Connection connection = null;
+    private volatile PoolImpl senderPool = null;
+
+    public Connection getConnection() {
+      return connection;
+    }
+
+    public PoolImpl getSenderPool() {
+      return senderPool;
+    }
+
+    public Optional<CliFunctionResult> 
connectIfNeeded(FunctionContext<Object[]> context,
+        GatewaySender sender) {
+      if (senderPool == null) {
+        senderPool = ((AbstractGatewaySender) sender).getProxy();
+        if (senderPool == null) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL));
+        }
+        connection = senderPool.acquireConnection();
+        if (connection.getWanSiteVersion() < 
KnownVersion.GEODE_1_15_0.ordinal()) {
+          return Optional.of(new CliFunctionResult(context.getMemberName(),
+              CliFunctionResult.StatusState.ERROR,
+              
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE));
+        }
+      }
+      return Optional.empty();
+    }
+
+    public Optional<CliFunctionResult> reconnect(FunctionContext<Object[]> 
context, int retries,
+        int copiedEntries, Exception e) {
+      close();
+      if (retries >= MAX_BATCH_SEND_RETRIES) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+                "Connection error", copiedEntries)));
+      }
+      logger.error("Exception {} in sendBatch. Retrying", 
e.getClass().getName());
+      try {
+        connection = senderPool.acquireConnection();
+      } catch (NoAvailableServersException | AllConnectionsInUseException e1) {
+        return Optional.of(new CliFunctionResult(context.getMemberName(),
+            CliFunctionResult.StatusState.ERROR,
+            CliStrings.format(
+                CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+                copiedEntries)));
+      }
+      return Optional.empty();
+    }
+
+    public void close() {
+      if (senderPool != null && connection != null) {
+        try {
+          connection.close(false);
+        } catch (Exception e) {
+          logger.error("Error closing the connection used to wan-copy region 
entries");
+        }
+        senderPool.returnConnection(connection);
+      }
+      connection = null;
+    }
+  }
+
+
+  static class ThreadSleeper implements Serializable {
+    void sleep(long millis) throws InterruptedException {
+      Thread.sleep(millis);
+    }
+  }
+
+
+  interface EventCreator {
+    GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry);
+  }
+
+  static class EventCreatorImpl implements EventCreator, Serializable {
+    @VisibleForTesting
+    public GatewayQueueEvent<?, ?> createGatewaySenderEvent(InternalCache 
cache,
+        InternalRegion region, GatewaySender sender, Region.Entry<?, ?> entry) 
{
+      final EntryEventImpl event;
+      if (region instanceof PartitionedRegion) {
+        event = createEventForPartitionedRegion(sender, cache, region, entry);
+      } else {
+        event = createEventForReplicatedRegion(cache, region, entry);
+      }
+      if (event == null) {
+        return null;
+      }
+      try {
+        return new 
GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS,
+            event, null, true);
+      } catch (IOException e) {
+        logger.error("Error when creating event in wan-copy: {}", 
e.getMessage());
+        return null;
+      }
+    }
+
+    private EntryEventImpl createEventForReplicatedRegion(InternalCache cache,
+        InternalRegion region,
+        Region.Entry<?, ?> entry) {
+      return createEvent(cache, region, entry);
+    }
+
+    private EntryEventImpl createEventForPartitionedRegion(GatewaySender 
sender,
+        InternalCache cache,
+        InternalRegion region,
+        Region.Entry<?, ?> entry) {
+      EntryEventImpl event = createEvent(cache, region, entry);
+      if (event == null) {
+        return null;
+      }
+      BucketRegion bucketRegion = ((PartitionedRegion) 
event.getRegion()).getDataStore()
+          .getLocalBucketById(event.getKeyInfo().getBucketId());
+      if (bucketRegion != null && !bucketRegion.getBucketAdvisor().isPrimary()
+          && sender.isParallel()) {
+        return null;
+      }
+      if (bucketRegion != null) {
+        bucketRegion.handleWANEvent(event);
+      }
+      return event;
+    }
+
+    private EntryEventImpl createEvent(InternalCache cache, InternalRegion 
region,
+        Region.Entry<?, ?> entry) {
+
+      EntryEventImpl event;
+      try {
+        event = new DefaultEntryEventFactory().create(region, Operation.UPDATE,
+            entry.getKey(),
+            entry.getValue(), null, false,
+            (cache).getInternalDistributedSystem().getDistributedMember(), 
false);
+      } catch (EntryDestroyedException e) {
+        return null;
+      }
+      if (entry instanceof NonTXEntry) {
+        event.setVersionTag(((NonTXEntry) 
entry).getRegionEntry().getVersionStamp().asVersionTag());
+      } else {
+        event.setVersionTag(((EntrySnapshot) entry).getVersionTag());
+      }
+      event.setNewEventId(cache.getInternalDistributedSystem());
+      return event;
+    }
+  }
+
+
+  interface WanCopyRegionFunctionExecutorFactory {
+    Executor getExecutor(InternalCache cache);
+  }
+
+  static class WanCopyRegionFunctionExecutorFactoryImpl
+      implements WanCopyRegionFunctionExecutorFactory, Serializable {
+    public Executor getExecutor(InternalCache cache) {
+      return 
cache.getService(WanCopyRegionFunctionService.class).getExecutor();
+    }
+  }

Review comment:
       Delete `WanCopyRegionFunctionExecutorFactory` and its implementation. 
We'll use a new provider interface to fetch the service.
   ```java
   @FunctionalInterface
   interface WanCopyRegionFunctionServiceProvider extends Serializable {
     WanCopyRegionFunctionService get(InternalCache cache);
   }
   ```
   Then the serviceProvider can be used to fetch the executor and the 
executions:
   ```java
   serviceProvider.get(internalCache).getExecutor()
   ```
   

##########
File path: 
geode-wan/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,581 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import 
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.cache.wan.internal.WanCopyRegionFunctionService;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this 
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of 
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied 
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements 
Declarable {
+  private static final Logger logger = LogService.getLogger();
+  private static final long serialVersionUID = 1L;
+
+  public static final String ID = WanCopyRegionFunction.class.getName();
+
+  private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+  private final Clock clock;
+  private final ThreadSleeper threadSleeper;
+  private final WanCopyRegionFunctionExecutorFactory executorFactory;
+  private final EventCreator eventCreator;
+
+  /**
+   * Contains the ongoing executions of this function
+   */
+  private static final Map<String, Future<CliFunctionResult>> executions =
+      new ConcurrentHashMap<>();
+
+  private int batchId = 0;
+
+  public WanCopyRegionFunction() {
+    this(Clock.systemDefaultZone(), new ThreadSleeper(),
+        new WanCopyRegionFunctionExecutorFactoryImpl(), new 
EventCreatorImpl());
+  }
+
+  @VisibleForTesting
+  WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper,
+      WanCopyRegionFunctionExecutorFactory executorFactory, EventCreator 
eventCreator) {
+    this.clock = clock;
+    this.threadSleeper = threadSleeper;
+    this.executorFactory = executorFactory;
+    this.eventCreator = eventCreator;
+  }
+
+  @Override
+  public String getId() {
+    return ID;
+  }
+
+  @Override
+  public boolean hasResult() {
+    return true;
+  }
+
+  @Override
+  public boolean isHA() {
+    return false;
+  }
+
+  @Override
+  public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+    final Object[] args = context.getArguments();
+    if (args.length < 5) {
+      throw new IllegalStateException(
+          "Arguments length does not match required length.");
+    }
+    String regionName = (String) args[0];
+    final String senderId = (String) args[1];
+    final boolean isCancel = (Boolean) args[2];
+    long maxRate = (Long) args[3];
+    int batchSize = (Integer) args[4];
+
+    if (regionName.startsWith(SEPARATOR)) {
+      regionName = regionName.substring(1);
+    }
+    if (regionName.equals("*") && senderId.equals("*") && isCancel) {
+      return cancelAllWanCopyRegion(context);
+    }
+
+    if (isCancel) {
+      return cancelWanCopyRegion(context, regionName, senderId);
+    }
+    final Cache cache = context.getCache();
+
+    final Region<?, ?> region = cache.getRegion(regionName);
+    if (region == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND, 
regionName));
+    }
+    GatewaySender sender = cache.getGatewaySender(senderId);
+    if (sender == null) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND, 
senderId));
+    }
+    if 
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER, 
regionName,
+              senderId));
+    }
+    if (!sender.isRunning()) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING, 
senderId));
+    }
+    if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary()) 
{
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.OK,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+              senderId));
+    }
+
+    try {
+      return executeWanCopyRegionFunctionInNewThread(context, region, 
regionName, sender, maxRate,
+          batchSize);
+    } catch (InterruptedException | CancellationException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          CliStrings.WAN_COPY_REGION__MSG__CANCELED__BEFORE__HAVING__COPIED);
+    } catch (ExecutionException e) {
+      return new CliFunctionResult(context.getMemberName(), 
CliFunctionResult.StatusState.ERROR,
+          
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED, 
e.getMessage()));
+    }
+  }
+
+  private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+      FunctionContext<Object[]> context,
+      Region<?, ?> region, String regionName, GatewaySender sender, long 
maxRate, int batchSize)
+      throws InterruptedException, ExecutionException, CancellationException {
+    String executionName = getExecutionName(regionName, sender.getId());
+    CompletableFuture<CliFunctionResult> future = null;
+    Executor executor = executorFactory.getExecutor((InternalCache) 
context.getCache());
+    try {
+      synchronized (executions) {

Review comment:
       Since `executions` would be moved to the service, I think you should 
only synchronize on that field over in the service. You could potentially pass 
a `Runnable` and a `String executionName` over to the service to be executed 
under synchronization. The service would then handle all use of the 
`executions` and probably the `executor`. Then the only thing this function 
would need is to invoke the method for executing tasks over in the service.
   
   The code is then divided into two places. Everything that can be performed 
by multiple threads and multiple instances lives in `WanCopyRegionFunction`, 
while everything that must be one instance or fixed threads per Cache would 
live in `WanCopyRegionFunctionService`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to