kirklund commented on a change in pull request #5249:
URL: https://github.com/apache/geode/pull/5249#discussion_r441173269



##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformer.java
##########
@@ -189,16 +189,16 @@ public static DistributedMember 
getAssociatedMembers(String region, final Intern
 
     String[] membersName = bean.getMembers();
     Set<DistributedMember> dsMembers = ManagementUtils.getAllMembers(cache);
-    Iterator it = dsMembers.iterator();
+    Iterator<DistributedMember> it = dsMembers.iterator();
 
     boolean matchFound = false;
 
     if (membersName.length > 1) {
       while (it.hasNext() && !matchFound) {
-        DistributedMember dsmember = (DistributedMember) it.next();
+        DistributedMember DSMember = it.next();

Review comment:
       Variable names should start with lowercase: `dsMember`

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformerTest.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.DistributedRegionMXBean;
+import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.management.internal.BaseManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformerTest {
+
+  public static final String DS_MEMBER_NAME_SERVER1 = "server1";
+  public static final String DS_MEMBER_NAME_SERVER2 = "server2";
+
+  public static final String REGION_1 = "region1";
+  public static final String BOGUS_PASS_MESSAGE = "Bogus pass message";
+  private InternalDistributedMember server1;
+  private InternalDistributedMember server2;
+  private InternalCacheForClientAccess internalCacheForClientAccess;
+  private RestoreRedundancyPerformer restoreRedundancyPerformer;
+
+  @Before
+  public void setup() {
+    BaseManagementService baseManagementService = 
mock(BaseManagementService.class);
+    DistributedSystemMXBean distributedSystemMXBean = 
mock(DistributedSystemMXBean.class);
+    DistributedRegionMXBean distributedRegionMXBean = 
mock(DistributedRegionMXBean.class);
+    server1 = mock(InternalDistributedMember.class);
+    server2 = mock(InternalDistributedMember.class);
+    internalCacheForClientAccess = mock(InternalCacheForClientAccess.class);
+    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    
when(baseManagementService.getDistributedSystemMXBean()).thenReturn(distributedSystemMXBean);
+    when(baseManagementService.getDistributedRegionMXBean(Mockito.anyString()))
+        .thenReturn(distributedRegionMXBean);
+    
when(distributedRegionMXBean.getRegionType()).thenReturn(String.valueOf(DataPolicy.PARTITION));
+    when(distributedRegionMXBean.getMembers())
+        .thenReturn(new String[] {DS_MEMBER_NAME_SERVER1, 
DS_MEMBER_NAME_SERVER2});
+    when(server1.getName()).thenReturn(DS_MEMBER_NAME_SERVER1);
+    when(server2.getName()).thenReturn(DS_MEMBER_NAME_SERVER2);
+    when(distributedSystemMXBean.listRegions()).thenReturn(new String[] 
{REGION_1});
+    when(internalDistributedSystem.getDistributionManager())
+        .thenReturn(distributionManager);
+    Set<InternalDistributedMember> dsMembers = new HashSet<>();
+    dsMembers.add(server1);
+    dsMembers.add(server2);
+    
when(distributionManager.getDistributionManagerIds()).thenReturn(dsMembers);
+    BaseManagementService.setManagementService(internalCacheForClientAccess, 
baseManagementService);
+
+    when(((InternalCache) 
internalCacheForClientAccess).getCacheForProcessingClientRequests())
+        .thenReturn(internalCacheForClientAccess);
+    when(internalCacheForClientAccess.getInternalDistributedSystem())
+        .thenReturn(internalDistributedSystem);
+
+    when(server1.getVersionObject())
+        .thenReturn(RestoreRedundancyPerformer.ADDED_VERSION);
+    when(server2.getVersionObject())
+        .thenReturn(RestoreRedundancyPerformer.ADDED_VERSION);
+
+    restoreRedundancyPerformer = new RestoreRedundancyPerformer();
+  }
+
+  @Test
+  public void executePerformWithIncludeRegionsSuccess() {
+    // Setup a request to restore redundancy for region 1
+    RestoreRedundancyRequest restoreRedundancyRequest = new 
RestoreRedundancyRequest();
+    restoreRedundancyRequest.setReassignPrimaries(true);
+    
restoreRedundancyRequest.setIncludeRegions(Collections.singletonList(REGION_1));
+    restoreRedundancyRequest.setExcludeRegions(new ArrayList<>());
+
+
+    // Setup a successful response from executeFunctionAndGetFunctionResult
+    RestoreRedundancyResultsImpl restoreRedundancyResultsImpl = new 
RestoreRedundancyResultsImpl();
+    restoreRedundancyResultsImpl.setStatusMessage(BOGUS_PASS_MESSAGE);
+    restoreRedundancyResultsImpl.setSuccess(true);
+
+    Map<String, RegionRedundancyStatus> satisfied =
+        restoreRedundancyResultsImpl.getSatisfiedRedundancyRegionResults();
+
+    // Create and add the RegionRedundancyStatus to the response
+    RegionRedundancyStatusImpl regionRedundancyStatusImpl = new 
RegionRedundancyStatusImpl(1, 1,
+        REGION_1, RegionRedundancyStatus.RedundancyStatus.SATISFIED);
+
+    satisfied.put(REGION_1, regionRedundancyStatusImpl);
+
+    // intercept the executeFunctionAndGetFunctionResult method call on the 
performer
+    RestoreRedundancyPerformer spyRedundancyPerformer = 
Mockito.spy(restoreRedundancyPerformer);
+    Mockito.doReturn(restoreRedundancyResultsImpl).when(spyRedundancyPerformer)
+        
.executeFunctionAndGetFunctionResult(Mockito.any(RestoreRedundancyFunction.class),
+            Mockito.any(Object.class),
+            Mockito.any(
+                DistributedMember.class));
+
+    // invoke perform
+    RestoreRedundancyResults restoreRedundancyResult = spyRedundancyPerformer
+        .perform(internalCacheForClientAccess, restoreRedundancyRequest, 
false);
+
+    assertThat(restoreRedundancyResult.getSuccess()).isTrue();
+  }
+
+  @Test
+  public void executePerformWithNoIncludeRegionsSuccess() {
+    // Setup a request to restore redundancy for region 1
+    RestoreRedundancyRequest restoreRedundancyRequest = new 
RestoreRedundancyRequest();
+    restoreRedundancyRequest.setReassignPrimaries(true);
+
+
+    // Setup a successful response from executeFunctionAndGetFunctionResult
+    RestoreRedundancyResultsImpl restoreRedundancyResultsImpl = new 
RestoreRedundancyResultsImpl();
+    restoreRedundancyResultsImpl.setStatusMessage(BOGUS_PASS_MESSAGE);
+    restoreRedundancyResultsImpl.setSuccess(true);
+
+    Map<String, RegionRedundancyStatus> satisfied =
+        restoreRedundancyResultsImpl.getSatisfiedRedundancyRegionResults();
+
+    // Create and add the RegionRedundancyStatus to the response
+    RegionRedundancyStatusImpl regionRedundancyStatusImpl = new 
RegionRedundancyStatusImpl(1, 1,
+        REGION_1, RegionRedundancyStatus.RedundancyStatus.SATISFIED);
+
+    satisfied.put(REGION_1, regionRedundancyStatusImpl);
+
+    // intercept the executeFunctionAndGetFunctionResult method call on the 
performer
+    RestoreRedundancyPerformer spyRedundancyPerformer = 
Mockito.spy(restoreRedundancyPerformer);
+    Mockito.doReturn(restoreRedundancyResultsImpl).when(spyRedundancyPerformer)
+        
.executeFunctionAndGetFunctionResult(Mockito.any(RestoreRedundancyFunction.class),
+            Mockito.any(Object.class),

Review comment:
       Using import static for all of these Mockito methods should improve 
readability a little.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/control/RestoreRedundancyResultsImplTest.java
##########
@@ -68,30 +68,36 @@ public void setUp() {
     
when(zeroRedundancyRegionResult.getRegionName()).thenReturn(zeroRedundancyRegionName);
     
when(details.getPrimaryTransfersCompleted()).thenReturn(transfersCompleted);
     when(details.getPrimaryTransferTime()).thenReturn(transferTime);
-    results = new RestoreRedundancyResultsImpl();
+    results = new SerializableRestoreRedundancyResultsImpl();
+  }
+
+  @Test
+  public void initialStateIsSuccess() throws Exception {

Review comment:
       You can delete the `throws Exception`.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformer.java
##########
@@ -219,24 +219,26 @@ public static DistributedMember 
getAssociatedMembers(String region, final Intern
     for (String regionName : listDSRegions) {
       // check for excluded regions
       boolean excludedRegionMatch = false;
-      for (String aListExcludedRegion : listExcludedRegion) {
-        // this is needed since region name may start with / or without it
-        // also
-        String excludedRegion = aListExcludedRegion.trim();
-        if (regionName.startsWith(SEPARATOR)) {
-          if (!excludedRegion.startsWith(SEPARATOR)) {
-            excludedRegion = SEPARATOR + excludedRegion;
+      if (listExcludedRegion != null) {
+        for (String aListExcludedRegion : listExcludedRegion) {
+          // this is needed since region name may start with / or without it
+          // also
+          String excludedRegion = aListExcludedRegion.trim();
+          if (regionName.startsWith(SEPARATOR)) {

Review comment:
       This is ok either way, but you could combine these if-blocks if you want 
to:
   ```
   if (regionName.startsWith(SEPARATOR) && 
!excludedRegion.startsWith(SEPARATOR)) {
     excludedRegion = SEPARATOR + excludedRegion;
   }
   ```
   And the next block:
   ```
   if (excludedRegion.startsWith(SEPARATOR) && 
!regionName.startsWith(SEPARATOR)) {
     regionName = SEPARATOR + regionName;
   }
   ```

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults
+  public RestoreRedundancyResults 
executeFunctionAndGetFunctionResult(Function<?> function,

Review comment:
       executeFunctionAndGetFunctionResult should be package-private (no 
qualifier) instead of public.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/control/RestoreRedundancyResultsImplTest.java
##########
@@ -102,7 +108,7 @@ public void 
getMessageReturnsStatusForAllRegionsAndPrimaryInfo() {
 
     results.addPrimaryReassignmentDetails(details);
 
-    String message = results.getMessage();
+    String message = results.getRegionOperationMessage();
     List<String> messageLines = Arrays.asList(message.split("\n"));

Review comment:
       This is untouched code, but we should change all `"\n"` uses with 
System.lineSeparator() to ensure it is platform independent.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/internal/operation/RegionRedundancyStatusImpl.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.operation;
+
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+
+/**
+ * result object used by the cms that only needs to be json serializable
+ */
+public class RegionRedundancyStatusImpl implements RegionRedundancyStatus {
+
+  public static final String OUTPUT_STRING =
+      "%s redundancy status: %s. Desired redundancy is %s and actual 
redundancy is %s.";
+
+  /**
+   * The name of the region used to create this object.
+   */
+  protected String regionName;
+
+  /**
+   * The configured redundancy of the region used to create this object.
+   */
+  protected int configuredRedundancy;
+
+  /**
+   * The actual redundancy of the region used to create this object at time of 
creation.
+   */
+  protected int actualRedundancy;
+
+  /**
+   * The {@link RedundancyStatus} of the region used to create this object at 
time of creation.
+   */
+  protected RedundancyStatus status;
+
+  /**
+   * Default constructor used for serialization
+   */
+  public RegionRedundancyStatusImpl() {}
+
+  public RegionRedundancyStatusImpl(int configuredRedundancy, int 
actualRedundancy,

Review comment:
       The non-default constructor should be package-private (no qualifier) 
with the annotation `@VisibleForTesting`

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/operation/RestoreRedundancyRequest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.operation;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.management.api.ClusterManagementOperation;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * Defines a distributed system request to optimize bucket allocation across 
members.
+ */
+@Experimental
+public class RestoreRedundancyRequest
+    implements ClusterManagementOperation<RestoreRedundancyResults> {
+
+  /**
+   * see {@link #getEndpoint()}
+   */
+  public static final String RESTORE_REDUNDANCY_REBALANCE_ENDPOINT =
+      "/operations/restoreRedundancy";
+  // null means all regions included
+  private List<String> includeRegions;
+  // null means don't exclude any regions
+  private List<String> excludeRegions;
+  private boolean reassignPrimaries = true;
+  private String operator;
+
+  /**
+   * by default, requests all partitioned regions to be rebalanced
+   */
+  public RestoreRedundancyRequest() {}
+
+  /**
+   * copy constructor
+   */
+  public RestoreRedundancyRequest(
+      RestoreRedundancyRequest other) {
+    this.setExcludeRegions(other.getExcludeRegions());
+    this.setIncludeRegions(other.getIncludeRegions());
+    this.setReassignPrimaries(other.getReassignPrimaries());
+    this.operator = other.getOperator();
+  }
+
+  /***
+   * Returns the list of regions to be rebalanced (or an empty list for 
all-except-excluded)
+   */
+  public List<String> getIncludeRegions() {
+    return includeRegions;
+  }
+
+  /**
+   * requests rebalance of the specified region(s) only. When at least one 
region is specified, this
+   * takes precedence over any excluded regions.
+   */
+  public void setIncludeRegions(List<String> includeRegions) {
+    this.includeRegions = includeRegions;
+  }
+
+  /***
+   * Returns the list of regions NOT to be rebalanced (iff {@link 
#getIncludeRegions()} is empty)
+   */
+  public List<String> getExcludeRegions() {
+    return excludeRegions;
+  }
+
+  /**
+   * excludes specific regions from the rebalance, if {@link 
#getIncludeRegions()} is empty,
+   * otherwise has no effect
+   * default: no regions are excluded
+   */
+  public void setExcludeRegions(List<String> excludeRegions) {
+    this.excludeRegions = excludeRegions;
+  }
+
+  public void setReassignPrimaries(boolean reassignPrimaries) {
+    this.reassignPrimaries = reassignPrimaries;
+  }
+
+  public boolean getReassignPrimaries() {
+    return reassignPrimaries;
+  }
+
+  @Override
+  @JsonIgnore
+  public String getEndpoint() {
+    return RESTORE_REDUNDANCY_REBALANCE_ENDPOINT;
+  }
+
+  @Override
+  public String getOperator() {
+    return operator;
+  }
+
+  public void setOperator(String operator) {

Review comment:
       setOperator is unused.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyResultsImpl.java
##########
@@ -12,29 +12,22 @@
  * or implied. See the License for the specific language governing permissions 
and limitations under
  * the License.
  */
-package org.apache.geode.internal.cache.control;
+package org.apache.geode.management.internal.operation;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.geode.DataSerializer;
-import org.apache.geode.cache.control.RegionRedundancyStatus;
-import org.apache.geode.cache.control.RestoreRedundancyResults;
-import org.apache.geode.cache.partition.PartitionRebalanceInfo;
-import org.apache.geode.internal.serialization.DataSerializableFixedID;
-import org.apache.geode.internal.serialization.DeserializationContext;
-import org.apache.geode.internal.serialization.SerializationContext;
-import org.apache.geode.internal.serialization.Version;
-
-public class RestoreRedundancyResultsImpl
-    implements RestoreRedundancyResults, DataSerializableFixedID {
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * result object used by the cms that only needs to be json serializable
+ */
+public class RestoreRedundancyResultsImpl implements RestoreRedundancyResults {

Review comment:
       Optional: This class has some `size()` calls that could be changed to 
use `isEmpty()` if you want. Also some unused code and overly public constants 
and methods.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        
context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.redundancyStatus();
+      } else {
+        
redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this

Review comment:
       Remove initials.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/operation/RestoreRedundancyRequest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.operation;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.management.api.ClusterManagementOperation;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * Defines a distributed system request to optimize bucket allocation across 
members.
+ */
+@Experimental

Review comment:
       Don't forget to add a paragraph to the javadocs about this API being 
experimental. See other classes that use `@Experimental` for examples.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {

Review comment:
       Change to:
   ```
   if (!viableMembers.isEmpty()) {
   ```

##########
File path: 
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/RedundancyCommand.java
##########
@@ -15,35 +15,33 @@
 package org.apache.geode.management.internal.cli.commands;
 
 import static org.apache.geode.cache.Region.SEPARATOR;
-import static 
org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl.NO_REDUNDANT_COPIES_FOR_REGIONS;
-import static 
org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl.PRIMARY_TRANSFERS_COMPLETED;
-import static 
org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl.PRIMARY_TRANSFER_TIME;
-import static 
org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl.REDUNDANCY_NOT_SATISFIED_FOR_REGIONS;
-import static 
org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl.REDUNDANCY_SATISFIED_FOR_REGIONS;
-import static 
org.apache.geode.management.internal.functions.CliFunctionResult.StatusState.ERROR;
+import static 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl.NO_REDUNDANT_COPIES_FOR_REGIONS;
+import static 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl.PRIMARY_TRANSFERS_COMPLETED;
+import static 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl.PRIMARY_TRANSFER_TIME;
+import static 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl.REDUNDANCY_NOT_SATISFIED_FOR_REGIONS;
+import static 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl.REDUNDANCY_SATISFIED_FOR_REGIONS;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-import org.apache.geode.cache.control.RegionRedundancyStatus;
-import org.apache.geode.cache.control.RestoreRedundancyResults;
 import org.apache.geode.distributed.DistributedMember;
 import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
 import org.apache.geode.internal.cache.InternalCache;
-import org.apache.geode.internal.cache.control.RestoreRedundancyResultsImpl;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
 import org.apache.geode.internal.serialization.Version;
 import org.apache.geode.management.ManagementService;
 import org.apache.geode.management.cli.GfshCommand;
 import org.apache.geode.management.cli.Result;
-import 
org.apache.geode.management.internal.cli.functions.RedundancyCommandFunction;
 import org.apache.geode.management.internal.cli.result.model.InfoResultModel;
 import org.apache.geode.management.internal.cli.result.model.ResultModel;
-import org.apache.geode.management.internal.functions.CliFunctionResult;
 import 
org.apache.geode.management.internal.operation.RebalanceOperationPerformer;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyPerformer;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
 
 public class RedundancyCommand extends GfshCommand {

Review comment:
       Optional: RedundancyCommand has unused methods and one constant that can 
be deleted. It also has constants and other methods that should be changed to 
private sometime. Most of this is untouched code.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;

Review comment:
       Since this is a product class, you should have IntelliJ generate the 
serialVersionUID for you. I think you'll need the corresponding inspection 
enabled: 
https://stackoverflow.com/questions/24573643/how-to-generate-serial-version-uid-in-intellij
   
   It's theoretically ok to have it set to `1L` but the generated number is 
more correct.

##########
File path: 
geode-core/src/main/java/org/apache/geode/internal/cache/control/SerializableRestoreRedundancyResultsImpl.java
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.cache.control;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.geode.DataSerializer;
+import org.apache.geode.cache.partition.PartitionRebalanceInfo;
+import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.serialization.DeserializationContext;
+import org.apache.geode.internal.serialization.SerializationContext;
+import org.apache.geode.internal.serialization.Version;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+
+/**
+ * result object produced by the servers. These need to be transferred to the 
locators
+ * via functions so they need to be DataSerializable
+ */
+public class SerializableRestoreRedundancyResultsImpl
+    extends RestoreRedundancyResultsImpl
+    implements DataSerializableFixedID {
+
+  public void addPrimaryReassignmentDetails(PartitionRebalanceInfo details) {
+    this.totalPrimaryTransfersCompleted += 
details.getPrimaryTransfersCompleted();
+    this.totalPrimaryTransferTime =
+        
this.totalPrimaryTransferTime.plusMillis(details.getPrimaryTransferTime());
+  }
+
+  @Override
+  public int getDSFID() {
+    return RESTORE_REDUNDANCY_RESULTS;
+  }
+
+  @Override
+  public void toData(DataOutput out, SerializationContext context) throws 
IOException {
+    DataSerializer.writeHashMap(satisfiedRedundancyRegions, out);
+    DataSerializer.writeHashMap(underRedundancyRegions, out);
+    DataSerializer.writeHashMap(zeroRedundancyRegions, out);
+    out.writeInt(totalPrimaryTransfersCompleted);
+    DataSerializer.writeObject(totalPrimaryTransferTime, out);
+    out.writeBoolean(success);
+    DataSerializer.writeString(statusMessage, out);
+  }
+
+  @Override
+  public void fromData(DataInput in, DeserializationContext context)
+      throws IOException, ClassNotFoundException {
+    this.satisfiedRedundancyRegions = DataSerializer.readHashMap(in);

Review comment:
       Maybe remove all of these unnecessary `this.` qualifiers? They show up 
as unnecessary in my IDE.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        
context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.redundancyStatus();
+      } else {
+        
redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this
+    } catch (Exception e) {
+      results =
+          new SerializableRestoreRedundancyResultsImpl();
+      results.setSuccess(false);
+      results.setStatusMessage(e.getMessage());
+    }
+    context.getResultSender().lastResult(results);
+  }
+
+  @Override
+  public String getId() {
+    return RestoreRedundancyFunction.ID;

Review comment:
       Doesn't need the `RestoreRedundancyFunction.` qualifier.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        
context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.redundancyStatus();
+      } else {
+        
redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;

Review comment:
       I would use a subclass of Exception and inline throwing it:
   ```
   throw new IllegalStateException(results.getRegionOperationMessage());
   ```

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;

Review comment:
       Visibility on the constants is public when they don't need to be.
   
   ADDED_VERSION should be package-private (no qualifier) and add the 
annotation `@VisibleForTesting`:
   ```
   @Immutable
   @VisibleForTesting
   static final Version ADDED_VERSION = Version.GEODE_1_13_0;
   ```
   NO_MEMBERS_WITH_VERSION_FOR_REGION and EXCEPTION_MEMBER_MESSAGE should be 
private until something outside the package needs to reference them.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformer.java
##########
@@ -219,24 +219,26 @@ public static DistributedMember 
getAssociatedMembers(String region, final Intern
     for (String regionName : listDSRegions) {
       // check for excluded regions
       boolean excludedRegionMatch = false;
-      for (String aListExcludedRegion : listExcludedRegion) {
-        // this is needed since region name may start with / or without it
-        // also
-        String excludedRegion = aListExcludedRegion.trim();
-        if (regionName.startsWith(SEPARATOR)) {
-          if (!excludedRegion.startsWith(SEPARATOR)) {
-            excludedRegion = SEPARATOR + excludedRegion;
+      if (listExcludedRegion != null) {
+        for (String aListExcludedRegion : listExcludedRegion) {
+          // this is needed since region name may start with / or without it
+          // also

Review comment:
       Delete `// also`? Or finish comment?

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {

Review comment:
       In theory the implementation could optimize `isEmpty()` to not require 
`size()`.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults
+  public RestoreRedundancyResults 
executeFunctionAndGetFunctionResult(Function<?> function,
+      Object args,
+      final DistributedMember targetMember) {
+    ResultCollector<?, ?> rc =
+        ManagementUtils.executeFunction(function, args, 
Collections.singleton(targetMember));
+    List<RestoreRedundancyResults> results = (List<RestoreRedundancyResults>) 
rc.getResult();

Review comment:
       This line generates unchecked cast warning. You can change it to this if 
you want:
   ```
   import static org.apache.geode.util.internal.UncheckedUtils.uncheckedCast;
   
       List<RestoreRedundancyResults> results = uncheckedCast(rc.getResult());
   ```

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults
+  public RestoreRedundancyResults 
executeFunctionAndGetFunctionResult(Function<?> function,
+      Object args,
+      final DistributedMember targetMember) {
+    ResultCollector<?, ?> rc =
+        ManagementUtils.executeFunction(function, args, 
Collections.singleton(targetMember));
+    List<RestoreRedundancyResults> results = (List<RestoreRedundancyResults>) 
rc.getResult();
+    return results.size() > 0 ? results.get(0) : null;
+  }
+
+
+  List<DistributedMember> filterViableMembers(
+      RebalanceOperationPerformer.MemberPRInfo prInfo) {
+    return prInfo.dsMemberList.stream()
+        .map(InternalDistributedMember.class::cast)
+        .filter(member -> member.getVersionObject().compareTo(ADDED_VERSION) 
>= 0)
+        .collect(Collectors.toList());
+  }
+
+  void populateLists(List<RebalanceOperationPerformer.MemberPRInfo> 
membersForEachRegion,
+      List<String> noMemberRegions, List<String> includeRegions, List<String> 
excludeRegions,
+      InternalCache cache) {
+    // Include all regions
+    if (includeRegions == null) {
+      // Exclude these regions
+      List<RebalanceOperationPerformer.MemberPRInfo> memberRegionList =
+          getMembersForEachRegion(cache, excludeRegions);
+      membersForEachRegion.addAll(memberRegionList);
+    } else {
+      for (String regionName : includeRegions) {
+        DistributedMember memberForRegion = getOneMemberForRegion(cache, 
regionName);
+
+        // If we did not find a member for this region name, add it to the 
list of regions with no
+        // members
+        if (memberForRegion == null) {
+          noMemberRegions.add(regionName);
+        } else {
+          RebalanceOperationPerformer.MemberPRInfo memberPRInfo =
+              new RebalanceOperationPerformer.MemberPRInfo();
+          memberPRInfo.region = regionName;
+          memberPRInfo.dsMemberList.add(memberForRegion);
+          membersForEachRegion.add(memberPRInfo);
+        }
+      }
+    }
+  }
+
+  // Extracted for testing
+  List<RebalanceOperationPerformer.MemberPRInfo> 
getMembersForEachRegion(InternalCache cache,

Review comment:
       getMembersForEachRegion can be private until you need to make it 
`@VisibleForTesting`. Currently there's no test using the method though.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults
+  public RestoreRedundancyResults 
executeFunctionAndGetFunctionResult(Function<?> function,
+      Object args,
+      final DistributedMember targetMember) {
+    ResultCollector<?, ?> rc =
+        ManagementUtils.executeFunction(function, args, 
Collections.singleton(targetMember));
+    List<RestoreRedundancyResults> results = (List<RestoreRedundancyResults>) 
rc.getResult();
+    return results.size() > 0 ? results.get(0) : null;
+  }
+
+
+  List<DistributedMember> filterViableMembers(
+      RebalanceOperationPerformer.MemberPRInfo prInfo) {
+    return prInfo.dsMemberList.stream()
+        .map(InternalDistributedMember.class::cast)
+        .filter(member -> member.getVersionObject().compareTo(ADDED_VERSION) 
>= 0)
+        .collect(Collectors.toList());
+  }
+
+  void populateLists(List<RebalanceOperationPerformer.MemberPRInfo> 
membersForEachRegion,
+      List<String> noMemberRegions, List<String> includeRegions, List<String> 
excludeRegions,
+      InternalCache cache) {
+    // Include all regions
+    if (includeRegions == null) {
+      // Exclude these regions
+      List<RebalanceOperationPerformer.MemberPRInfo> memberRegionList =
+          getMembersForEachRegion(cache, excludeRegions);
+      membersForEachRegion.addAll(memberRegionList);
+    } else {
+      for (String regionName : includeRegions) {
+        DistributedMember memberForRegion = getOneMemberForRegion(cache, 
regionName);
+
+        // If we did not find a member for this region name, add it to the 
list of regions with no
+        // members
+        if (memberForRegion == null) {
+          noMemberRegions.add(regionName);
+        } else {
+          RebalanceOperationPerformer.MemberPRInfo memberPRInfo =
+              new RebalanceOperationPerformer.MemberPRInfo();
+          memberPRInfo.region = regionName;
+          memberPRInfo.dsMemberList.add(memberForRegion);
+          membersForEachRegion.add(memberPRInfo);
+        }
+      }
+    }
+  }
+
+  // Extracted for testing
+  List<RebalanceOperationPerformer.MemberPRInfo> 
getMembersForEachRegion(InternalCache cache,
+      List<String> excludedRegionList) {
+    return RebalanceOperationPerformer.getMemberRegionList(
+        ManagementService.getManagementService(cache), cache, 
excludedRegionList);
+  }
+
+  // Extracted for testing
+  DistributedMember getOneMemberForRegion(InternalCache cache, String 
regionName) {

Review comment:
       getOneMemberForRegion can be private until you need to make it 
`@VisibleForTesting`. Currently there's no test using the method though.

##########
File path: 
geode-core/src/test/java/org/apache/geode/internal/cache/control/RestoreRedundancyOperationImplTest.java
##########
@@ -38,19 +38,19 @@
 import org.junit.Test;
 
 import org.apache.geode.cache.RegionDestroyedException;
-import org.apache.geode.cache.control.RegionRedundancyStatus;
-import org.apache.geode.cache.control.RestoreRedundancyResults;
 import org.apache.geode.cache.partition.PartitionRebalanceInfo;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.PartitionedRegion;
 import 
org.apache.geode.internal.cache.partitioned.PartitionedRegionRebalanceOp;
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
 
 public class RestoreRedundancyOperationImplTest {
   InternalCache cache;
   InternalResourceManager manager;
   ResourceManagerStats stats;
   RestoreRedundancyOperationImpl operation;
-  RestoreRedundancyResultsImpl emptyResults;
+  SerializableRestoreRedundancyResultsImpl emptyResults;

Review comment:
       Let's make all of these fields `private`. There's an inspection that 
finds and automates this.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults
+  public RestoreRedundancyResults 
executeFunctionAndGetFunctionResult(Function<?> function,
+      Object args,
+      final DistributedMember targetMember) {
+    ResultCollector<?, ?> rc =
+        ManagementUtils.executeFunction(function, args, 
Collections.singleton(targetMember));
+    List<RestoreRedundancyResults> results = (List<RestoreRedundancyResults>) 
rc.getResult();
+    return results.size() > 0 ? results.get(0) : null;

Review comment:
       Another `size()` call that can be changed to use `isEmpty()`:
   ```
   return !results.isEmpty() ? results.get(0) : null;
   ```
   Or even better:
   ```
   return results.isEmpty() ? null : results.get(0);
   ```
   IntelliJ has an inspection you can turn on that allows you to automatically 
make this change.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformerTest.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.DistributedRegionMXBean;
+import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.management.internal.BaseManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformerTest {
+
+  public static final String DS_MEMBER_NAME_SERVER1 = "server1";

Review comment:
       Please make all the constants in this test `private`.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformerTest.java
##########
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.cache.InternalCacheForClientAccess;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.DistributedRegionMXBean;
+import org.apache.geode.management.DistributedSystemMXBean;
+import org.apache.geode.management.internal.BaseManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformerTest {
+
+  public static final String DS_MEMBER_NAME_SERVER1 = "server1";
+  public static final String DS_MEMBER_NAME_SERVER2 = "server2";
+
+  public static final String REGION_1 = "region1";
+  public static final String BOGUS_PASS_MESSAGE = "Bogus pass message";
+  private InternalDistributedMember server1;
+  private InternalDistributedMember server2;
+  private InternalCacheForClientAccess internalCacheForClientAccess;
+  private RestoreRedundancyPerformer restoreRedundancyPerformer;
+
+  @Before
+  public void setup() {
+    BaseManagementService baseManagementService = 
mock(BaseManagementService.class);
+    DistributedSystemMXBean distributedSystemMXBean = 
mock(DistributedSystemMXBean.class);
+    DistributedRegionMXBean distributedRegionMXBean = 
mock(DistributedRegionMXBean.class);
+    server1 = mock(InternalDistributedMember.class);
+    server2 = mock(InternalDistributedMember.class);
+    internalCacheForClientAccess = mock(InternalCacheForClientAccess.class);
+    InternalDistributedSystem internalDistributedSystem = 
mock(InternalDistributedSystem.class);
+    DistributionManager distributionManager = mock(DistributionManager.class);
+    
when(baseManagementService.getDistributedSystemMXBean()).thenReturn(distributedSystemMXBean);
+    when(baseManagementService.getDistributedRegionMXBean(Mockito.anyString()))

Review comment:
       All of the `Mockito.*****` matchers in this class are actually owned by 
`org.mockito.ArgumentMatchers`. I would recommend just converting them all to 
static import which should switch them to importing from ArgumentMatchers.

##########
File path: 
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/RedundancyCommand.java
##########
@@ -148,81 +126,37 @@ void 
populateLists(List<RebalanceOperationPerformer.MemberPRInfo> membersForEach
     }
   }
 
-  List<CliFunctionResult> executeFunctionOnMembers(String[] includeRegions, 
String[] excludeRegions,
-      boolean reassignPrimaries, boolean isStatusCommand,
-      List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion) {
-    List<CliFunctionResult> functionResults = new ArrayList<>();
-    Object[] functionArgs =
-        new Object[] {includeRegions, excludeRegions, reassignPrimaries, 
isStatusCommand};
-    List<DistributedMember> completedMembers = new ArrayList<>();
-    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
-      // Check to see if an earlier function execution has already targeted a 
member hosting this
-      // region. If one has, there is no point sending a function for this 
region as it has already
-      // had redundancy restored
-      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
-        continue;
-      }
-      // Try the function on the first member for this region
-      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
-      CliFunctionResult functionResult = executeFunctionAndGetFunctionResult(
-          new RedundancyCommandFunction(), functionArgs, targetMember);
-      if (functionResult.getStatus().equals(ERROR.name())) {
-        // Record the error and then give up
-        functionResults.add(functionResult);
-        break;
-      }
-      functionResults.add(functionResult);
-      completedMembers.add(targetMember);
-    }
-    return functionResults;
-  }
-
-  ResultModel buildResultModelFromFunctionResults(List<CliFunctionResult> 
functionResults,
-      List<String> includedRegionsWithNoMembers, boolean isStatusCommand) {
+  ResultModel buildResultModelFromFunctionResults(RestoreRedundancyResults 
results,

Review comment:
       buildResultModelFromFunctionResults should be private until something 
outside the class needs to reference it.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/internal/operation/RegionRedundancyStatusImpl.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.operation;
+
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+
+/**
+ * result object used by the cms that only needs to be json serializable
+ */
+public class RegionRedundancyStatusImpl implements RegionRedundancyStatus {
+
+  public static final String OUTPUT_STRING =

Review comment:
       OUTPUT_STRING should be `protected` instead of `public`.

##########
File path: 
geode-gfsh/src/main/java/org/apache/geode/management/internal/cli/commands/RedundancyCommand.java
##########
@@ -148,81 +126,37 @@ void 
populateLists(List<RebalanceOperationPerformer.MemberPRInfo> membersForEach
     }
   }
 
-  List<CliFunctionResult> executeFunctionOnMembers(String[] includeRegions, 
String[] excludeRegions,
-      boolean reassignPrimaries, boolean isStatusCommand,
-      List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion) {
-    List<CliFunctionResult> functionResults = new ArrayList<>();
-    Object[] functionArgs =
-        new Object[] {includeRegions, excludeRegions, reassignPrimaries, 
isStatusCommand};
-    List<DistributedMember> completedMembers = new ArrayList<>();
-    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
-      // Check to see if an earlier function execution has already targeted a 
member hosting this
-      // region. If one has, there is no point sending a function for this 
region as it has already
-      // had redundancy restored
-      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
-        continue;
-      }
-      // Try the function on the first member for this region
-      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
-      CliFunctionResult functionResult = executeFunctionAndGetFunctionResult(
-          new RedundancyCommandFunction(), functionArgs, targetMember);
-      if (functionResult.getStatus().equals(ERROR.name())) {
-        // Record the error and then give up
-        functionResults.add(functionResult);
-        break;
-      }
-      functionResults.add(functionResult);
-      completedMembers.add(targetMember);
-    }
-    return functionResults;
-  }
-
-  ResultModel buildResultModelFromFunctionResults(List<CliFunctionResult> 
functionResults,
-      List<String> includedRegionsWithNoMembers, boolean isStatusCommand) {
+  ResultModel buildResultModelFromFunctionResults(RestoreRedundancyResults 
results,
+      boolean isStatusCommand) {
     // No members hosting partitioned regions were found, but no regions were 
explicitly included,
     // so return OK status
-    if (functionResults.size() == 0 && includedRegionsWithNoMembers.size() == 
0) {
+    if (results.getRegionResults().size() == 0

Review comment:
       This class has `size()` calls that would be more appropriate as 
`sEmpty()` calls.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/operation/RestoreRedundancyRequest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.operation;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.management.api.ClusterManagementOperation;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * Defines a distributed system request to optimize bucket allocation across 
members.
+ */
+@Experimental
+public class RestoreRedundancyRequest
+    implements ClusterManagementOperation<RestoreRedundancyResults> {
+
+  /**
+   * see {@link #getEndpoint()}
+   */
+  public static final String RESTORE_REDUNDANCY_REBALANCE_ENDPOINT =
+      "/operations/restoreRedundancy";
+  // null means all regions included

Review comment:
       This is a dangling comment which can be orphaned easily if you move code 
around. I would make it a javadoc so that it "sticks" to the field:
   ```
   /** null means all regions included */
   private List<String> includeRegions;
   ```
   Same thing for the comment above `excludeRegions`.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/internal/operation/RegionRedundancyStatusImpl.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.operation;
+
+import org.apache.geode.management.runtime.RegionRedundancyStatus;
+
+/**
+ * result object used by the cms that only needs to be json serializable
+ */
+public class RegionRedundancyStatusImpl implements RegionRedundancyStatus {
+
+  public static final String OUTPUT_STRING =
+      "%s redundancy status: %s. Desired redundancy is %s and actual 
redundancy is %s.";
+
+  /**
+   * The name of the region used to create this object.
+   */
+  protected String regionName;
+
+  /**
+   * The configured redundancy of the region used to create this object.
+   */
+  protected int configuredRedundancy;
+
+  /**
+   * The actual redundancy of the region used to create this object at time of 
creation.
+   */
+  protected int actualRedundancy;
+
+  /**
+   * The {@link RedundancyStatus} of the region used to create this object at 
time of creation.
+   */
+  protected RedundancyStatus status;
+
+  /**
+   * Default constructor used for serialization
+   */
+  public RegionRedundancyStatusImpl() {}
+
+  public RegionRedundancyStatusImpl(int configuredRedundancy, int 
actualRedundancy,
+      String regionName, RedundancyStatus status) {
+    this.configuredRedundancy = configuredRedundancy;
+    this.actualRedundancy = actualRedundancy;
+    this.regionName = regionName;
+    this.status = status;
+  }
+
+  @Override
+  public String getRegionName() {
+    return regionName;
+  }
+
+  @Override
+  public int getConfiguredRedundancy() {
+    return configuredRedundancy;
+  }
+
+  @Override
+  public int getActualRedundancy() {
+    return actualRedundancy;
+  }
+
+  @Override
+  public RedundancyStatus getStatus() {
+    return status;
+  }
+
+
+  /**
+   * Determines the {@link RedundancyStatus} for the region. If redundancy is 
not configured (i.e.
+   * configured redundancy = 0), this always returns {@link 
RedundancyStatus#SATISFIED}.
+   *
+   * @param desiredRedundancy The configured redundancy of the region.
+   * @param actualRedundancy The actual redundancy of the region.
+   * @return The {@link RedundancyStatus} for the region.
+   */
+  private RedundancyStatus determineStatus(int desiredRedundancy, int 
actualRedundancy) {

Review comment:
       determineStatus is unused.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/operation/RestoreRedundancyRequest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.operation;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.management.api.ClusterManagementOperation;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * Defines a distributed system request to optimize bucket allocation across 
members.
+ */
+@Experimental
+public class RestoreRedundancyRequest
+    implements ClusterManagementOperation<RestoreRedundancyResults> {
+
+  /**
+   * see {@link #getEndpoint()}
+   */
+  public static final String RESTORE_REDUNDANCY_REBALANCE_ENDPOINT =

Review comment:
       RESTORE_REDUNDANCY_REBALANCE_ENDPOINT should be private.

##########
File path: 
geode-management/src/main/java/org/apache/geode/management/operation/RestoreRedundancyRequest.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.operation;
+
+import java.util.List;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.geode.annotations.Experimental;
+import org.apache.geode.management.api.ClusterManagementOperation;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+/**
+ * Defines a distributed system request to optimize bucket allocation across 
members.
+ */
+@Experimental
+public class RestoreRedundancyRequest
+    implements ClusterManagementOperation<RestoreRedundancyResults> {
+
+  /**
+   * see {@link #getEndpoint()}
+   */
+  public static final String RESTORE_REDUNDANCY_REBALANCE_ENDPOINT =
+      "/operations/restoreRedundancy";
+  // null means all regions included
+  private List<String> includeRegions;
+  // null means don't exclude any regions
+  private List<String> excludeRegions;
+  private boolean reassignPrimaries = true;
+  private String operator;
+
+  /**
+   * by default, requests all partitioned regions to be rebalanced
+   */
+  public RestoreRedundancyRequest() {}
+
+  /**
+   * copy constructor
+   */
+  public RestoreRedundancyRequest(
+      RestoreRedundancyRequest other) {
+    this.setExcludeRegions(other.getExcludeRegions());

Review comment:
       Lots of unnecessary `this.` qualifiers.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to