kirklund commented on a change in pull request #6601:
URL: https://github.com/apache/geode/pull/6601#discussion_r653818247
##########
File path:
geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
##########
@@ -410,7 +410,7 @@ public void testObtainHintForData() {
String hintArgument = "data";
String hintsProvided =
gfshParserRule.getCommandManager().obtainHint(hintArgument);
String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
- assertThat(hintsProvidedArray.length).isEqualTo(17);
+ assertThat(hintsProvidedArray.length).isEqualTo(18);
Review comment:
Better syntax is:
```
assertThat(hintsProvidedArray).hasSize(18);
```
This way if it fails, `hintsProvidedArray` will be fully printed out instead
of just the integer of its length.
##########
File path:
geode-gfsh/src/test/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunctionTest.java
##########
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.time.Clock;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+
+public class WanCopyRegionFunctionTest {
+
+ private WanCopyRegionFunction rrf;
+ private long startTime;
+ private final int entries = 25;
+ private Clock clockMock;
+ private WanCopyRegionFunction.ThreadSleeper threadSleeperMock;
+ private InternalCache internalCacheMock;
+ private GatewaySender gatewaySenderMock;
+ private PoolImpl poolMock;
+ private Connection connectionMock;
+ private GatewaySenderEventDispatcher dispatcherMock;
+
+ @SuppressWarnings("unchecked")
+ private final FunctionContext<Object[]> contextMock =
mock(FunctionContext.class);
Review comment:
Another option to prevent these warnings is:
```
import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
private final FunctionContext<Object[]> contextMock =
uncheckedCast(mock(FunctionContext.class));
```
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ final String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ final InternalCache cache = (InternalCache) context.getCache();
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+
+ final Region region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
regionName));
+ }
+
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND,
senderId));
+ }
+
+ if
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER,
regionName,
+ senderId));
+ }
+
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING,
senderId));
+ }
+
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary())
{
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ try {
+ return executeWanCopyRegionFunctionInNewThread(context, region,
regionName, sender, maxRate,
+ batchSize);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } catch (ExecutionException e) {
+ Writer buffer = new StringWriter();
+ PrintWriter pw = new PrintWriter(buffer);
+ e.printStackTrace(pw);
+ logger.error("Exception when running wan-copy region command: {}",
buffer.toString());
Review comment:
You can do away with the PrintWriter code and just use:
```
logger.error("Exception when running wan-copy region command", e);
```
That will include the full stack trace and message of the ExecutionException.
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ final String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ final InternalCache cache = (InternalCache) context.getCache();
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+
+ final Region region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
regionName));
+ }
+
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND,
senderId));
+ }
+
+ if
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER,
regionName,
+ senderId));
+ }
+
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING,
senderId));
+ }
+
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary())
{
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ try {
+ return executeWanCopyRegionFunctionInNewThread(context, region,
regionName, sender, maxRate,
+ batchSize);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } catch (ExecutionException e) {
+ Writer buffer = new StringWriter();
+ PrintWriter pw = new PrintWriter(buffer);
+ e.printStackTrace(pw);
+ logger.error("Exception when running wan-copy region command: {}",
buffer.toString());
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED,
e.getMessage()));
+ }
+ }
+
+ private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+ FunctionContext<Object[]> context,
+ Region region, String regionName, GatewaySender sender, long maxRate,
int batchSize)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = LoggingExecutors
+ .newSingleThreadExecutor(getFunctionThreadName(regionName,
+ sender.getId()), true);
+ Callable<CliFunctionResult> callable =
+ new wanCopyRegionCallable(context, region, sender, maxRate, batchSize);
+ Future<CliFunctionResult> future = executor.submit(callable);
+ return future.get();
+ }
+
+ class wanCopyRegionCallable implements Callable<CliFunctionResult> {
+ private final FunctionContext<Object[]> context;
+ private final Region region;
+ private final GatewaySender sender;
+ private final long maxRate;
+ private final int batchSize;
+
+ public wanCopyRegionCallable(final FunctionContext<Object[]> context,
final Region region,
+ final GatewaySender sender, final long maxRate,
+ final int batchSize) {
+ this.context = context;
+ this.region = region;
+ this.sender = sender;
+ this.maxRate = maxRate;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public CliFunctionResult call() throws Exception {
+ return wanCopyRegion(context, region, sender, maxRate, batchSize);
+ }
+ }
+
+ @VisibleForTesting
+ CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region
region,
+ GatewaySender sender, long maxRate, int batchSize) {
+ Connection connection = null;
+ PoolImpl senderPool = null;
+ int copiedEntries = 0;
+
+ try {
+ final long startTime = clock.millis();
+ int batchId = 0;
+ final InternalCache cache = (InternalCache) context.getCache();
+ GatewaySenderEventDispatcher dispatcher =
+ ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+ Iterator<?> entriesIter = getEntries(region, sender).iterator();
+ while (entriesIter.hasNext()) {
+ List<GatewayQueueEvent> batch =
+ createBatch((InternalRegion) region, sender, batchSize, cache,
entriesIter);
+ if (batch.size() == 0) {
+ continue;
+ }
+ if (senderPool == null) {
+ senderPool = ((AbstractGatewaySender) sender).getProxy();
+ if (senderPool == null) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL);
+ }
+ connection = senderPool.acquireConnection();
+ if (connection.getWanSiteVersion() <
KnownVersion.GEODE_1_15_0.ordinal()) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE);
+ }
+ }
+ int retries = 0;
+ while (true) {
+ try {
+ dispatcher.sendBatch(batch, connection, senderPool, batchId++,
true);
+ copiedEntries += batch.size();
+ break;
+ } catch (BatchException70 e) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ e.getMessage(), copiedEntries));
+ } catch (ConnectionDestroyedException | ServerConnectivityException
e) {
+ ((PooledConnection) connection).setShouldDestroy();
+ senderPool.returnConnection(connection);
+ connection = null;
+ if (retries++ >= MAX_BATCH_SEND_RETRIES) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Connection error", copiedEntries));
+ }
+ logger.error("Exception {} in sendBatch. Retrying",
e.getClass().getName());
+ try {
+ connection = senderPool.acquireConnection();
+ } catch (NoAvailableServersException |
AllConnectionsInUseException e1) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+ copiedEntries));
+ }
+ }
+ }
+ try {
+ doPostSendBatchActions(startTime, copiedEntries, maxRate);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__CANCELED__AFTER__HAVING__COPIED,
+ copiedEntries));
+ }
+ }
+ } finally {
+ if (senderPool != null && connection != null) {
+ ((PooledConnection) connection).setShouldDestroy();
+ senderPool.returnConnection(connection);
+ }
+ }
+
+ if (region.isDestroyed()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Region destroyed",
+ copiedEntries));
+ }
+
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+ CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+ copiedEntries));
+ }
+
+ List<GatewayQueueEvent> createBatch(InternalRegion region, GatewaySender
sender,
+ int batchSize, InternalCache cache, Iterator<?> iter) {
+ int batchIndex = 0;
+ List<GatewayQueueEvent> batch = new ArrayList<>();
+ while (iter.hasNext() && batchIndex < batchSize) {
+ GatewayQueueEvent event =
+ createGatewaySenderEvent(cache, region, sender, (Region.Entry)
iter.next());
+ if (event != null) {
+ batch.add(event);
+ batchIndex++;
+ }
+ }
+ return batch;
+ }
+
+ Set<?> getEntries(Region region, GatewaySender sender) {
+ if (region instanceof PartitionedRegion && sender.isParallel()) {
+ return ((PartitionedRegion)
region).getDataStore().getAllLocalBucketRegions()
+ .stream()
+ .flatMap(br -> ((Set<?>)
br.entrySet()).stream()).collect(Collectors.toSet());
+ }
+ return region.entrySet();
+ }
+
+ @VisibleForTesting
+ GatewayQueueEvent createGatewaySenderEvent(InternalCache cache,
+ InternalRegion region, GatewaySender sender, Region.Entry entry) {
+ final EntryEventImpl event;
+ if (region instanceof PartitionedRegion) {
+ event = createEventForPartitionedRegion(sender, cache, region, entry);
+ } else {
+ event = createEventForReplicatedRegion(cache, region, entry);
+ }
+ if (event == null) {
+ return null;
+ }
+ try {
+ return new
GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS,
+ event, null, true);
+ } catch (IOException e) {
+ e.printStackTrace();
Review comment:
You should avoid using `e.printStackTrace();` and just use logger
instead.
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ final String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ final InternalCache cache = (InternalCache) context.getCache();
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+
+ final Region region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
regionName));
+ }
+
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND,
senderId));
+ }
+
+ if
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER,
regionName,
+ senderId));
+ }
+
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING,
senderId));
+ }
+
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary())
{
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ try {
+ return executeWanCopyRegionFunctionInNewThread(context, region,
regionName, sender, maxRate,
+ batchSize);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } catch (ExecutionException e) {
+ Writer buffer = new StringWriter();
+ PrintWriter pw = new PrintWriter(buffer);
+ e.printStackTrace(pw);
+ logger.error("Exception when running wan-copy region command: {}",
buffer.toString());
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED,
e.getMessage()));
+ }
+ }
+
+ private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+ FunctionContext<Object[]> context,
+ Region region, String regionName, GatewaySender sender, long maxRate,
int batchSize)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = LoggingExecutors
+ .newSingleThreadExecutor(getFunctionThreadName(regionName,
+ sender.getId()), true);
+ Callable<CliFunctionResult> callable =
+ new wanCopyRegionCallable(context, region, sender, maxRate, batchSize);
+ Future<CliFunctionResult> future = executor.submit(callable);
+ return future.get();
+ }
+
+ class wanCopyRegionCallable implements Callable<CliFunctionResult> {
+ private final FunctionContext<Object[]> context;
+ private final Region region;
+ private final GatewaySender sender;
+ private final long maxRate;
+ private final int batchSize;
+
+ public wanCopyRegionCallable(final FunctionContext<Object[]> context,
final Region region,
+ final GatewaySender sender, final long maxRate,
+ final int batchSize) {
+ this.context = context;
+ this.region = region;
+ this.sender = sender;
+ this.maxRate = maxRate;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public CliFunctionResult call() throws Exception {
+ return wanCopyRegion(context, region, sender, maxRate, batchSize);
+ }
+ }
+
+ @VisibleForTesting
+ CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region
region,
+ GatewaySender sender, long maxRate, int batchSize) {
+ Connection connection = null;
+ PoolImpl senderPool = null;
+ int copiedEntries = 0;
+
+ try {
+ final long startTime = clock.millis();
+ int batchId = 0;
+ final InternalCache cache = (InternalCache) context.getCache();
+ GatewaySenderEventDispatcher dispatcher =
+ ((AbstractGatewaySender) sender).getEventProcessor().getDispatcher();
+ Iterator<?> entriesIter = getEntries(region, sender).iterator();
+ while (entriesIter.hasNext()) {
+ List<GatewayQueueEvent> batch =
+ createBatch((InternalRegion) region, sender, batchSize, cache,
entriesIter);
+ if (batch.size() == 0) {
+ continue;
+ }
+ if (senderPool == null) {
+ senderPool = ((AbstractGatewaySender) sender).getProxy();
+ if (senderPool == null) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION__POOL);
+ }
+ connection = senderPool.acquireConnection();
+ if (connection.getWanSiteVersion() <
KnownVersion.GEODE_1_15_0.ordinal()) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+
CliStrings.WAN_COPY_REGION__MSG__COMMAND__NOT__SUPPORTED__AT__REMOTE__SITE);
+ }
+ }
+ int retries = 0;
+ while (true) {
+ try {
+ dispatcher.sendBatch(batch, connection, senderPool, batchId++,
true);
+ copiedEntries += batch.size();
+ break;
+ } catch (BatchException70 e) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ e.getMessage(), copiedEntries));
+ } catch (ConnectionDestroyedException | ServerConnectivityException
e) {
+ ((PooledConnection) connection).setShouldDestroy();
+ senderPool.returnConnection(connection);
+ connection = null;
+ if (retries++ >= MAX_BATCH_SEND_RETRIES) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Connection error", copiedEntries));
+ }
+ logger.error("Exception {} in sendBatch. Retrying",
e.getClass().getName());
+ try {
+ connection = senderPool.acquireConnection();
+ } catch (NoAvailableServersException |
AllConnectionsInUseException e1) {
+ return new CliFunctionResult(context.getMemberName(),
+ CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ CliStrings.WAN_COPY_REGION__MSG__NO__CONNECTION,
+ copiedEntries));
+ }
+ }
+ }
+ try {
+ doPostSendBatchActions(startTime, copiedEntries, maxRate);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+ CliStrings.format(
+
CliStrings.WAN_COPY_REGION__MSG__CANCELED__AFTER__HAVING__COPIED,
+ copiedEntries));
+ }
+ }
+ } finally {
+ if (senderPool != null && connection != null) {
+ ((PooledConnection) connection).setShouldDestroy();
+ senderPool.returnConnection(connection);
+ }
+ }
+
+ if (region.isDestroyed()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.format(
+ CliStrings.WAN_COPY_REGION__MSG__ERROR__AFTER__HAVING__COPIED,
+ "Region destroyed",
+ copiedEntries));
+ }
+
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+ CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__COPIED__ENTRIES,
+ copiedEntries));
+ }
+
+ List<GatewayQueueEvent> createBatch(InternalRegion region, GatewaySender
sender,
+ int batchSize, InternalCache cache, Iterator<?> iter) {
+ int batchIndex = 0;
+ List<GatewayQueueEvent> batch = new ArrayList<>();
+ while (iter.hasNext() && batchIndex < batchSize) {
+ GatewayQueueEvent event =
+ createGatewaySenderEvent(cache, region, sender, (Region.Entry)
iter.next());
+ if (event != null) {
+ batch.add(event);
+ batchIndex++;
+ }
+ }
+ return batch;
+ }
+
+ Set<?> getEntries(Region region, GatewaySender sender) {
+ if (region instanceof PartitionedRegion && sender.isParallel()) {
+ return ((PartitionedRegion)
region).getDataStore().getAllLocalBucketRegions()
+ .stream()
+ .flatMap(br -> ((Set<?>)
br.entrySet()).stream()).collect(Collectors.toSet());
+ }
+ return region.entrySet();
+ }
+
+ @VisibleForTesting
+ GatewayQueueEvent createGatewaySenderEvent(InternalCache cache,
+ InternalRegion region, GatewaySender sender, Region.Entry entry) {
+ final EntryEventImpl event;
+ if (region instanceof PartitionedRegion) {
+ event = createEventForPartitionedRegion(sender, cache, region, entry);
+ } else {
+ event = createEventForReplicatedRegion(cache, region, entry);
+ }
+ if (event == null) {
+ return null;
+ }
+ try {
+ return new
GatewaySenderEventImpl(EnumListenerEvent.AFTER_UPDATE_WITH_GENERATE_CALLBACKS,
+ event, null, true);
+ } catch (IOException e) {
+ e.printStackTrace();
+ return null;
+ }
+ }
+
+ final CliFunctionResult cancelWanCopyRegion(FunctionContext<Object[]>
context,
+ String regionName, String senderId) {
+ boolean found = false;
+ String threadBaseName = getFunctionThreadName(regionName, senderId);
+ for (Thread t : Thread.getAllStackTraces().keySet()) {
+ if (t.getName().startsWith(threadBaseName)) {
+ found = true;
+ t.interrupt();
+ }
+ }
+ if (found) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ }
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__NO__RUNNING__COMMAND,
+ regionName, senderId));
+ }
+
+ public static String getFunctionThreadName(String regionName, String
senderId) {
+ return "wanCopyRegionFunctionThread_" + regionName + "_" + senderId;
+ }
+
+ /**
+ * It runs the actions to be done after a batch has been
+ * sent: throw an interrupted exception if the operation was canceled and
+ * adjust the rate of copying by sleeping if necessary.
+ *
+ * @param startTime time at which the entries started to be copied
+ * @param copiedEntries number of entries copied so far
+ * @param maxRate maximum copying rate
+ */
+ void doPostSendBatchActions(long startTime, int copiedEntries, long maxRate)
+ throws InterruptedException {
+ if (Thread.currentThread().isInterrupted()) {
+ throw new InterruptedException();
+ }
+ long sleepMs = getTimeToSleep(startTime, copiedEntries, maxRate);
+ if (sleepMs > 0) {
+ logger.info("{}: Sleeping for {} ms to accommodate to requested maxRate",
+ this.getClass().getName(), sleepMs);
Review comment:
`getClass().getName()` provides the fully qualified name (FQN) which is
full.package.ClassName. `getClass().getSimpleName()` gives you just ClassName.
Just making sure you know about that in case you didn't want the package.
##########
File path:
geode-gfsh/src/integrationTest/java/org/apache/geode/management/internal/cli/GfshParserAutoCompletionIntegrationTest.java
##########
@@ -410,7 +410,7 @@ public void testObtainHintForData() {
String hintArgument = "data";
String hintsProvided =
gfshParserRule.getCommandManager().obtainHint(hintArgument);
String[] hintsProvidedArray = hintsProvided.split(lineSeparator());
- assertThat(hintsProvidedArray.length).isEqualTo(17);
+ assertThat(hintsProvidedArray.length).isEqualTo(18);
Review comment:
Better syntax is:
```
assertThat(hintsProvidedArray).hasSize(18);
```
This way if it fails, the contents of `hintsProvidedArray` will be fully
printed instead of just the integer of its length.
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/WanCopyRegionCommand.java
##########
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.cli.commands;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.springframework.shell.core.annotation.CliCommand;
+import org.springframework.shell.core.annotation.CliOption;
+
+import org.apache.geode.cache.execute.FunctionInvocationTargetException;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.management.cli.CliMetaData;
+import org.apache.geode.management.cli.ConverterHint;
+import org.apache.geode.management.cli.GfshCommand;
+import
org.apache.geode.management.internal.cli.functions.WanCopyRegionFunction;
+import org.apache.geode.management.internal.cli.result.model.ResultModel;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+import org.apache.geode.security.ResourcePermission.Operation;
+import org.apache.geode.security.ResourcePermission.Resource;
+
+public class WanCopyRegionCommand extends GfshCommand {
+ private final WanCopyRegionFunction wanCopyRegionFunction = new
WanCopyRegionFunction();
+
+ @CliCommand(value = CliStrings.WAN_COPY_REGION, help =
CliStrings.WAN_COPY_REGION__HELP)
+ @CliMetaData(relatedTopic = {CliStrings.TOPIC_GEODE_DATA,
CliStrings.TOPIC_GEODE_REGION})
+ public ResultModel wanCopyRegion(
+ @CliOption(key = CliStrings.WAN_COPY_REGION__REGION, mandatory = true,
+ optionContext = ConverterHint.REGION_PATH,
+ help = CliStrings.WAN_COPY_REGION__REGION__HELP) String regionName,
+ @CliOption(key = CliStrings.WAN_COPY_REGION__SENDERID,
+ optionContext = ConverterHint.GATEWAY_SENDER_ID,
+ help = CliStrings.WAN_COPY_REGION__SENDERID__HELP) String senderId,
+ @CliOption(key = CliStrings.WAN_COPY_REGION__MAXRATE,
+ unspecifiedDefaultValue = "0",
+ help = CliStrings.WAN_COPY_REGION__MAXRATE__HELP) long maxRate,
+ @CliOption(key = CliStrings.WAN_COPY_REGION__BATCHSIZE,
+ unspecifiedDefaultValue = "1000",
+ help = CliStrings.WAN_COPY_REGION__BATCHSIZE__HELP) int batchSize,
+ @CliOption(key = CliStrings.WAN_COPY_REGION__CANCEL,
+ unspecifiedDefaultValue = "false",
+ specifiedDefaultValue = "true",
+ help = CliStrings.WAN_COPY_REGION__CANCEL__HELP) boolean isCancel) {
+
+ authorize(Resource.DATA, Operation.WRITE, regionName);
+ final Object[] args = {regionName, senderId, isCancel, maxRate, batchSize};
+ ResultCollector<?, ?> rc =
+ executeFunction(wanCopyRegionFunction, args, getAllNormalMembers());
Review comment:
Please try to avoid using abbreviations and acronyms. Just name vars
like this `resultCollector` instead of `rc`.
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ final String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ final InternalCache cache = (InternalCache) context.getCache();
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+
+ final Region region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
regionName));
+ }
+
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND,
senderId));
+ }
+
+ if
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER,
regionName,
+ senderId));
+ }
+
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING,
senderId));
+ }
+
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary())
{
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ try {
+ return executeWanCopyRegionFunctionInNewThread(context, region,
regionName, sender, maxRate,
+ batchSize);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } catch (ExecutionException e) {
+ Writer buffer = new StringWriter();
+ PrintWriter pw = new PrintWriter(buffer);
+ e.printStackTrace(pw);
+ logger.error("Exception when running wan-copy region command: {}",
buffer.toString());
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED,
e.getMessage()));
+ }
+ }
+
+ private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+ FunctionContext<Object[]> context,
+ Region region, String regionName, GatewaySender sender, long maxRate,
int batchSize)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = LoggingExecutors
+ .newSingleThreadExecutor(getFunctionThreadName(regionName,
+ sender.getId()), true);
+ Callable<CliFunctionResult> callable =
+ new wanCopyRegionCallable(context, region, sender, maxRate, batchSize);
+ Future<CliFunctionResult> future = executor.submit(callable);
+ return future.get();
+ }
+
+ class wanCopyRegionCallable implements Callable<CliFunctionResult> {
+ private final FunctionContext<Object[]> context;
+ private final Region region;
+ private final GatewaySender sender;
+ private final long maxRate;
+ private final int batchSize;
+
+ public wanCopyRegionCallable(final FunctionContext<Object[]> context,
final Region region,
+ final GatewaySender sender, final long maxRate,
+ final int batchSize) {
+ this.context = context;
+ this.region = region;
+ this.sender = sender;
+ this.maxRate = maxRate;
+ this.batchSize = batchSize;
+ }
+
+ @Override
+ public CliFunctionResult call() throws Exception {
+ return wanCopyRegion(context, region, sender, maxRate, batchSize);
+ }
+ }
+
+ @VisibleForTesting
+ CliFunctionResult wanCopyRegion(FunctionContext<Object[]> context, Region
region,
Review comment:
Can you please extract some smaller methods from this? There's actually
a maximum byte length for the size of methods in order for the Jit to perform
inlining and other optimizations. Basically, if you carve it up into smaller
methods, then Jit can do inlining and it will actually perform better.
One way to do this is to extract blocks as methods to avoid having nested
try-blocks and while-blocks. So if you see a nested while-block, pull that out
as its own method. You can lean on IntelliJ refactoring to make it easy.
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
+ @Override
+ public String getId() {
+ return ID;
+ }
+
+ @Override
+ public boolean hasResult() {
+ return true;
+ }
+
+ @Override
+ public boolean isHA() {
+ return false;
+ }
+
+ @Override
+ public CliFunctionResult executeFunction(FunctionContext<Object[]> context) {
+ final Object[] args = context.getArguments();
+ if (args.length < 5) {
+ throw new IllegalStateException(
+ "Arguments length does not match required length.");
+ }
+ final String regionName = (String) args[0];
+ final String senderId = (String) args[1];
+ final boolean isCancel = (Boolean) args[2];
+ long maxRate = (Long) args[3];
+ int batchSize = (Integer) args[4];
+
+ final InternalCache cache = (InternalCache) context.getCache();
+
+ if (isCancel) {
+ return cancelWanCopyRegion(context, regionName, senderId);
+ }
+
+ final Region region = cache.getRegion(regionName);
+ if (region == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__FOUND,
regionName));
+ }
+
+ GatewaySender sender = cache.getGatewaySender(senderId);
+ if (sender == null) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__FOUND,
senderId));
+ }
+
+ if
(!region.getAttributes().getGatewaySenderIds().contains(sender.getId())) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__REGION__NOT__USING_SENDER,
regionName,
+ senderId));
+ }
+
+ if (!sender.isRunning()) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__NOT__RUNNING,
senderId));
+ }
+
+ if (!sender.isParallel() && !((InternalGatewaySender) sender).isPrimary())
{
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.OK,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__SENDER__SERIAL__AND__NOT__PRIMARY,
+ senderId));
+ }
+
+ try {
+ return executeWanCopyRegionFunctionInNewThread(context, region,
regionName, sender, maxRate,
+ batchSize);
+ } catch (InterruptedException e) {
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+ CliStrings.WAN_COPY_REGION__MSG__EXECUTION__CANCELED);
+ } catch (ExecutionException e) {
+ Writer buffer = new StringWriter();
+ PrintWriter pw = new PrintWriter(buffer);
+ e.printStackTrace(pw);
+ logger.error("Exception when running wan-copy region command: {}",
buffer.toString());
+ return new CliFunctionResult(context.getMemberName(),
CliFunctionResult.StatusState.ERROR,
+
CliStrings.format(CliStrings.WAN_COPY_REGION__MSG__EXECUTION__FAILED,
e.getMessage()));
+ }
+ }
+
+ private CliFunctionResult executeWanCopyRegionFunctionInNewThread(
+ FunctionContext<Object[]> context,
+ Region region, String regionName, GatewaySender sender, long maxRate,
int batchSize)
+ throws InterruptedException, ExecutionException {
+ ExecutorService executor = LoggingExecutors
+ .newSingleThreadExecutor(getFunctionThreadName(regionName,
+ sender.getId()), true);
Review comment:
Is this Executor providing the same thread that you later look up by
name with the following?
```
for (Thread t : Thread.getAllStackTraces().keySet()) {
if (t.getName().startsWith(threadBaseName)) {
found = true;
t.interrupt();
```
This is a really non-standard, unusual way to do any threading in Java. You
should try to get it as close to standard use of threading as possible
Can you make `ExecutorService executor` a final field in this Function class
and then just later invoke `executor.shutdownNow()`? `shutdownNow()` will
interrupt any thread.
If you need ability to replace the ExecutorService at runtime in a
thread-safe way then make use AtomicReference:
```
private final AtomicReference<ExecutorService> executor;
```
Then you can always do:
```
executor.set(LoggingExecutors.newSingleThreadExecutor(getFunctionThreadName(regionName,
sender.getId()), true));
```
PS: "Java Concurrency In Practice" by Brian Goetz is the gold standard for
Java threading
##########
File path:
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/functions/WanCopyRegionFunction.java
##########
@@ -0,0 +1,480 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express
+ * or implied. See the License for the specific language governing permissions
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.cli.functions;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Serializable;
+import java.io.StringWriter;
+import java.io.Writer;
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.annotations.VisibleForTesting;
+import org.apache.geode.cache.Declarable;
+import org.apache.geode.cache.EntryDestroyedException;
+import org.apache.geode.cache.Operation;
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.client.AllConnectionsInUseException;
+import org.apache.geode.cache.client.NoAvailableServersException;
+import org.apache.geode.cache.client.ServerConnectivityException;
+import org.apache.geode.cache.client.internal.Connection;
+import org.apache.geode.cache.client.internal.PoolImpl;
+import
org.apache.geode.cache.client.internal.pooling.ConnectionDestroyedException;
+import org.apache.geode.cache.client.internal.pooling.PooledConnection;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.wan.GatewayQueueEvent;
+import org.apache.geode.cache.wan.GatewaySender;
+import org.apache.geode.internal.cache.BucketRegion;
+import org.apache.geode.internal.cache.DefaultEntryEventFactory;
+import org.apache.geode.internal.cache.EntryEventImpl;
+import org.apache.geode.internal.cache.EntrySnapshot;
+import org.apache.geode.internal.cache.EnumListenerEvent;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalRegion;
+import org.apache.geode.internal.cache.NonTXEntry;
+import org.apache.geode.internal.cache.PartitionedRegion;
+import org.apache.geode.internal.cache.wan.AbstractGatewaySender;
+import org.apache.geode.internal.cache.wan.BatchException70;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventDispatcher;
+import org.apache.geode.internal.cache.wan.GatewaySenderEventImpl;
+import org.apache.geode.internal.cache.wan.InternalGatewaySender;
+import org.apache.geode.internal.serialization.KnownVersion;
+import org.apache.geode.logging.internal.executors.LoggingExecutors;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import org.apache.geode.management.cli.CliFunction;
+import org.apache.geode.management.internal.functions.CliFunctionResult;
+import org.apache.geode.management.internal.i18n.CliStrings;
+
+/**
+ * Class for copying via WAN the contents of a region
+ * It must be executed in all members of the Geode cluster that host the region
+ * to be copied. (called with onServers() or withMembers() passing the list
+ * of all members hosting the region).
+ * It also offers the possibility to cancel an ongoing execution of this
function.
+ * The copying itself is executed in a new thread with a known name
+ * (parameterized with the regionName and senderId) in order to allow
+ * to cancel ongoing invocations by interrupting that thread.
+ *
+ * It accepts the following arguments in an array of objects
+ * 0: regionName (String)
+ * 1: senderId (String)
+ * 2: isCancel (Boolean): If true, it indicates that an ongoing execution of
this
+ * function for the given region and senderId must be stopped. Otherwise,
+ * it indicates that the region must be copied.
+ * 3: maxRate (Long) maximum copy rate in entries per second. In the case of
+ * parallel gateway senders, the maxRate is per server hosting the region.
+ * 4: batchSize (Integer): the size of the batches. Region entries are copied
in batches of the
+ * passed size. After each batch is sent, the function checks if the command
+ * must be canceled and also sleeps for some time if necessary to adjust the
+ * copy rate to the one passed as argument.
+ */
+public class WanCopyRegionFunction extends CliFunction<Object[]> implements
Declarable {
+ private static final Logger logger = LogService.getLogger();
+ private static final long serialVersionUID = 1L;
+
+ public static final String ID = WanCopyRegionFunction.class.getName();
+
+ private static final int MAX_BATCH_SEND_RETRIES = 1;
+
+ private Clock clock = Clock.systemDefaultZone();
+ private ThreadSleeper threadSleeper = new ThreadSleeper();
+
+ static class ThreadSleeper implements Serializable {
+ void millis(long millis) throws InterruptedException {
+ Thread.sleep(millis);
+ }
+ }
+
+ @VisibleForTesting
+ public void setClock(Clock clock) {
+ this.clock = clock;
+ }
+
+ @VisibleForTesting
+ public void setThreadSleeper(ThreadSleeper ts) {
+ this.threadSleeper = ts;
+ }
+
Review comment:
It would be much better if you make these two fields `final` and delete
these setters. Then create two constructors... one that's public with no-args
for Geode to use and one that's only for testing that accepts args for both of
these fields. Basically you want to always try to make fields `final` to avoid
concurrency bugs and to improve performance. Here's an example along with
proper constructor chaining:
```
private final Clock clock;
private final ThreadSleeper;
public WanCopyRegionFunction() {
this(Clock.systemDefaultZone(), new ThreadSleeper());
}
@VisibleForTesting
WanCopyRegionFunction(Clock clock, ThreadSleeper threadSleeper) {
this.clock = clock;
this.threadSleeper = threadSleeper;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]