DonalEvans commented on a change in pull request #5249:
URL: https://github.com/apache/geode/pull/5249#discussion_r441065594



##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RebalanceOperationPerformer.java
##########
@@ -189,16 +189,16 @@ public static DistributedMember 
getAssociatedMembers(String region, final Intern
 
     String[] membersName = bean.getMembers();
     Set<DistributedMember> dsMembers = ManagementUtils.getAllMembers(cache);
-    Iterator it = dsMembers.iterator();
+    Iterator<DistributedMember> it = dsMembers.iterator();
 
     boolean matchFound = false;
 
     if (membersName.length > 1) {
       while (it.hasNext() && !matchFound) {
-        DistributedMember dsmember = (DistributedMember) it.next();
+        DistributedMember DSMember = it.next();

Review comment:
       I think that variable names should start with a lower-case letter.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        
context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.redundancyStatus();
+      } else {
+        
redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this

Review comment:
       This comment should be removed.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];
+    RestoreRedundancyOperation redundancyOperation =
+        
context.getCache().getResourceManager().createRestoreRedundancyOperation();
+    Set<String> includeRegionsSet = null;
+    if (request.getIncludeRegions() != null) {
+      includeRegionsSet = new HashSet<>(request.getIncludeRegions());
+    }
+    Set<String> excludeRegionsSet = null;
+    if (request.getExcludeRegions() != null) {
+      excludeRegionsSet = new HashSet<>(request.getExcludeRegions());
+    }
+    redundancyOperation.includeRegions(includeRegionsSet);
+    redundancyOperation.excludeRegions(excludeRegionsSet);
+    RestoreRedundancyResultsImpl results;
+
+    try {
+      if (isStatusCommand) {
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.redundancyStatus();
+      } else {
+        
redundancyOperation.shouldReassignPrimaries(request.getReassignPrimaries());
+        results = (RestoreRedundancyResultsImpl) 
redundancyOperation.start().join();
+      }
+      if (results.getRegionOperationStatus().equals(ERROR)) {
+        Exception e = new Exception(results.getRegionOperationMessage());
+        throw e;
+      }
+      results.setSuccess(true);
+      results.setStatusMessage("Success"); // MLH change this
+    } catch (Exception e) {
+      results =
+          new SerializableRestoreRedundancyResultsImpl();
+      results.setSuccess(false);
+      results.setStatusMessage(e.getMessage());
+    }

Review comment:
       I'm not sure I understand why in the case that an exception is thrown, a 
`SerializableRestoreRedundancyResultsImpl` is returned here instead of a 
`RestoreRedundancyResultsImpl`.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = 
mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> 
argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);

Review comment:
       Remove this commented out code.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);

Review comment:
       There is a redundant call to `setSuccess(false)` here, since it's 
already been called a few lines above.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());

Review comment:
       These lines appear to be redundant within the for loop, since in order 
to reach this point, all function results must be successful. These calls to 
`setSuccess()` and `setStatusMessage()` could be moved outside the for loop and 
only called once.

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunction.java
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.management.internal.functions;
+
+import static 
org.apache.geode.management.runtime.RestoreRedundancyResults.Status.ERROR;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.logging.log4j.Logger;
+
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.internal.cache.execute.InternalFunction;
+import org.apache.geode.logging.internal.log4j.api.LogService;
+import 
org.apache.geode.management.internal.operation.RestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+
+
+public class RestoreRedundancyFunction implements InternalFunction<Object[]> {
+  private static final Logger logger = LogService.getLogger();
+
+  public static final String ID = RestoreRedundancyFunction.class.getName();
+
+
+  private static final long serialVersionUID = 1L;
+
+  @Override
+  // this would return the RestoreRedundancyResults if successful,
+  // it will return an exception to the caller if status is failure or any 
exception happens
+  public void execute(FunctionContext<Object[]> context) {
+    Object[] arguments = context.getArguments();
+    RestoreRedundancyRequest request = (RestoreRedundancyRequest) arguments[0];
+    boolean isStatusCommand = (boolean) arguments[1];

Review comment:
       This variable is set in `RestoreRedundancyPerformer` using the name 
`checkStatus`. It might be best to have consistency between classes in terms of 
naming, for clarity.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = 
mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> 
argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();

Review comment:
       Can this `RestoreRedundancyRequest` object be replaced with a mock, to 
avoid testing the behaviour of both it and the `RestoreRedundancyFunction` 
class in this unit test?

##########
File path: 
geode-core/src/main/java/org/apache/geode/management/internal/operation/RestoreRedundancyPerformer.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.operation;
+
+import static org.apache.geode.cache.Region.SEPARATOR;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.geode.annotations.Immutable;
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.ResultCollector;
+import org.apache.geode.distributed.DistributedMember;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import org.apache.geode.internal.serialization.Version;
+import org.apache.geode.management.ManagementService;
+import 
org.apache.geode.management.internal.functions.RestoreRedundancyFunction;
+import org.apache.geode.management.internal.util.ManagementUtils;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyPerformer
+    implements OperationPerformer<RestoreRedundancyRequest, 
RestoreRedundancyResults> {
+  @Immutable
+  public static final Version ADDED_VERSION = Version.GEODE_1_13_0;
+  public static final String NO_MEMBERS_WITH_VERSION_FOR_REGION =
+      "No members with a version greater than or equal to %s were found for 
region %s";
+  public static final String EXCEPTION_MEMBER_MESSAGE = "Exception occurred on 
member %s: %s";
+
+  @Override
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation) {
+    return perform(cache, operation, false);
+  }
+
+  public RestoreRedundancyResults perform(Cache cache, 
RestoreRedundancyRequest operation,
+      boolean checkStatus) {
+    List<RebalanceOperationPerformer.MemberPRInfo> membersForEachRegion = new 
ArrayList<>();
+    List<String> includedRegionsWithNoMembers = new ArrayList<>();
+
+    populateLists(membersForEachRegion, includedRegionsWithNoMembers, 
operation.getIncludeRegions(),
+        operation.getExcludeRegions(), (InternalCache) cache);
+
+    for (RebalanceOperationPerformer.MemberPRInfo prInfo : 
membersForEachRegion) {
+      // Filter out any members using older versions of Geode
+      List<DistributedMember> viableMembers = filterViableMembers(prInfo);
+
+      if (viableMembers.size() != 0) {
+        // Update the MemberPRInfo with the viable members
+        prInfo.dsMemberList = viableMembers;
+      } else {
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        
results.setStatusMessage(String.format(NO_MEMBERS_WITH_VERSION_FOR_REGION,
+            ADDED_VERSION.getName(), prInfo.region));
+        results.setSuccess(false);
+        return results;
+      }
+    }
+
+    List<RestoreRedundancyResults> functionResults = new ArrayList<>();
+    Object[] functionArgs = new Object[] {operation, checkStatus};
+    List<DistributedMember> completedMembers = new ArrayList<>();
+    for (RebalanceOperationPerformer.MemberPRInfo memberPRInfo : 
membersForEachRegion) {
+      // Check to see if an earlier function execution has already targeted a 
member hosting this
+      // region. If one has, there is no point sending a function for this 
region as it has already
+      // had redundancy restored
+      if (!Collections.disjoint(completedMembers, memberPRInfo.dsMemberList)) {
+        continue;
+      }
+      // Try the function on the first member for this region
+      DistributedMember targetMember = memberPRInfo.dsMemberList.get(0);
+      RestoreRedundancyResults functionResult = 
executeFunctionAndGetFunctionResult(
+          new RestoreRedundancyFunction(), functionArgs, targetMember);
+      if (!functionResult.getSuccess()) {
+        // Record the error and then give up
+        RestoreRedundancyResultsImpl results = new 
RestoreRedundancyResultsImpl();
+        results.setSuccess(false);
+        String errorString =
+            String.format(EXCEPTION_MEMBER_MESSAGE, targetMember.getName(),
+                functionResult.getStatusMessage());
+        results.setStatusMessage(errorString);
+        results.setSuccess(false);
+        return results;
+      }
+      functionResults.add(functionResult);
+      completedMembers.add(targetMember);
+    }
+
+    RestoreRedundancyResultsImpl finalResult = new 
RestoreRedundancyResultsImpl();
+    finalResult.addIncludedRegionsWithNoMembers(includedRegionsWithNoMembers);
+    for (RestoreRedundancyResults functionResult : functionResults) {
+      finalResult.addRegionResults(functionResult);
+      finalResult.setSuccess(functionResult.getSuccess());
+      finalResult.setStatusMessage(functionResult.getStatusMessage());
+    }
+    return finalResult;
+  }
+
+  // this returns either an Exception or RestoreRedundancyResults

Review comment:
       This comment does not seem entirely accurate. The method either returns 
`null` or a `RestoreRedundancyResults` object.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = 
mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> 
argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);
+    resultSender = mock(ResultSender.class);
+    when(mockContext.getResultSender()).thenReturn(resultSender);
+    argumentCaptor = 
ArgumentCaptor.forClass(SerializableRestoreRedundancyResultsImpl.class);
+  }
+
+  @Test
+  public void executeFunctionSetsFieldsOnRestoreRedundancyOperation() {
+    String[] includeRegions = {"includedRegion1", "includedRegion2"};
+    String[] excludeRegions = {"excludedRegion1", "excludedRegion2"};
+    request.setExcludeRegions(Arrays.asList(excludeRegions));
+    request.setIncludeRegions(Arrays.asList(includeRegions));
+
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(new 
HashSet<>(request.getIncludeRegions()));
+    verify(mockOperation).excludeRegions(new 
HashSet<>(request.getExcludeRegions()));
+    
verify(mockOperation).shouldReassignPrimaries(request.getReassignPrimaries());
+  }
+
+  @Test
+  public void 
executeFunctionSetsIncludedAndExcludedRegionsOnRestoreRedundancyOperationWhenNull()
 {
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(null);
+    verify(mockOperation).excludeRegions(null);
+    verify(mockOperation).shouldReassignPrimaries(true);
+  }
+
+  @Test
+  public void executeFunctionUsesStatusMethodWhenIsStatusCommandIsTrue() {
+    when(mockOperation.redundancyStatus()).thenReturn(mockResults);
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    // isStatusCommand is the fourth argument passed to the function
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, true});
+
+    function.execute(mockContext);
+
+    verify(mockOperation, times(1)).redundancyStatus();
+    verify(mockOperation, times(0)).start();
+  }
+
+  @Test
+  public void executeFunctionReturnsErrorWhenResultStatusIsError() {
+    
when(mockResults.getRegionOperationStatus()).thenReturn(RestoreRedundancyResults.Status.ERROR);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    RestoreRedundancyResults result = argumentCaptor.getValue();
+    assertThat(result.getSuccess()).isFalse();
+    assertThat(result.getStatusMessage()).isEqualTo(message);
+  }
+
+  @Test
+  // The function was able to execute successfully but redundancy was not able 
to be established for
+  // at least one region
+  public void executeFunctionReturnsOkWhenResultStatusIsFailure() {
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.FAILURE);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    SerializableRestoreRedundancyResultsImpl result = 
argumentCaptor.getValue();
+    verify(result).setSuccess(true);
+    assertThat(result.getRegionOperationStatus())
+        .isEqualTo(RestoreRedundancyResults.Status.FAILURE);
+    assertThat(result).isSameAs(mockResults);
+  }
+
+  @Test
+  public void executeFunctionReturnsOkWhenResultStatusIsSuccess() {
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    function.execute(mockContext);
+    verify(resultSender).lastResult(argumentCaptor.capture());
+
+    SerializableRestoreRedundancyResultsImpl result = 
argumentCaptor.getValue();
+    verify(result).setSuccess(true);
+    assertThat(result.getRegionOperationStatus())
+        .isEqualTo(RestoreRedundancyResults.Status.SUCCESS);
+    assertThat(result).isSameAs(mockResults);
+  }
+
+  @Test
+  public void whenFunctionThrowException() throws Exception {

Review comment:
       This test name could be a little more descriptive, saying what the 
expected behaviour is given the test conditions, such as 
"executeFunctionReturnsFailureResultWhenExceptionIsThrownDuringOperation". 
Also, an exception is never thrown from this method, so the `throws Exception` 
can be removed from the method signature.

##########
File path: 
geode-core/src/test/java/org/apache/geode/management/internal/functions/RestoreRedundancyFunctionTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+
+package org.apache.geode.management.internal.functions;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.control.RestoreRedundancyOperation;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.ResultSender;
+import 
org.apache.geode.internal.cache.control.SerializableRestoreRedundancyResultsImpl;
+import org.apache.geode.management.operation.RestoreRedundancyRequest;
+import org.apache.geode.management.runtime.RestoreRedundancyResults;
+
+public class RestoreRedundancyFunctionTest {
+  @SuppressWarnings("unchecked")
+  private final FunctionContext<Object[]> mockContext = 
mock(FunctionContext.class);
+  private final Cache mockCache = mock(Cache.class, RETURNS_DEEP_STUBS);
+  private final RestoreRedundancyOperation mockOperation =
+      mock(RestoreRedundancyOperation.class, RETURNS_DEEP_STUBS);
+  private final SerializableRestoreRedundancyResultsImpl mockResults =
+      mock(SerializableRestoreRedundancyResultsImpl.class);
+  private final String message = "expected message";
+  private RestoreRedundancyFunction function;
+  private ResultSender resultSender;
+  private ArgumentCaptor<SerializableRestoreRedundancyResultsImpl> 
argumentCaptor;
+  private RestoreRedundancyRequest request;
+
+  @Before
+  public void setUp() throws InterruptedException, ExecutionException {
+    function = new RestoreRedundancyFunction();
+    when(mockContext.getCache()).thenReturn(mockCache);
+    request = new RestoreRedundancyRequest();
+    request.setReassignPrimaries(true);
+
+    when(mockContext.getArguments()).thenReturn(new Object[] {request, false});
+    when(mockCache.getResourceManager().createRestoreRedundancyOperation())
+        .thenReturn(mockOperation);
+    CompletableFuture<RestoreRedundancyResults> future =
+        CompletableFuture.completedFuture(mockResults);
+    when(mockOperation.start()).thenReturn(future);
+    when(mockResults.getRegionOperationMessage()).thenReturn(message);
+    // when(mockResults.getStatusMessage()).thenReturn(message);
+    resultSender = mock(ResultSender.class);
+    when(mockContext.getResultSender()).thenReturn(resultSender);
+    argumentCaptor = 
ArgumentCaptor.forClass(SerializableRestoreRedundancyResultsImpl.class);
+  }
+
+  @Test
+  public void executeFunctionSetsFieldsOnRestoreRedundancyOperation() {
+    String[] includeRegions = {"includedRegion1", "includedRegion2"};
+    String[] excludeRegions = {"excludedRegion1", "excludedRegion2"};
+    request.setExcludeRegions(Arrays.asList(excludeRegions));
+    request.setIncludeRegions(Arrays.asList(includeRegions));
+
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(new 
HashSet<>(request.getIncludeRegions()));
+    verify(mockOperation).excludeRegions(new 
HashSet<>(request.getExcludeRegions()));
+    
verify(mockOperation).shouldReassignPrimaries(request.getReassignPrimaries());
+  }
+
+  @Test
+  public void 
executeFunctionSetsIncludedAndExcludedRegionsOnRestoreRedundancyOperationWhenNull()
 {
+    function.execute(mockContext);
+
+    verify(mockOperation).includeRegions(null);
+    verify(mockOperation).excludeRegions(null);
+    verify(mockOperation).shouldReassignPrimaries(true);
+  }
+
+  @Test
+  public void executeFunctionUsesStatusMethodWhenIsStatusCommandIsTrue() {
+    when(mockOperation.redundancyStatus()).thenReturn(mockResults);
+    when(mockResults.getRegionOperationStatus())
+        .thenReturn(RestoreRedundancyResults.Status.SUCCESS);
+    // isStatusCommand is the fourth argument passed to the function

Review comment:
       This comment is no longer correct. The argument that controls whether or 
not the function should restore redundancy or just check the redundancy status 
is now the second argument. Also, see the comment in 
`RestoreRedundancyFunction` regarding the name of this variable.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to