This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch feature/GEODE-4362
in repository https://gitbox.apache.org/repos/asf/geode.git

commit d7260c190774f966acd61f121b029345c814f6f4
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Feb 9 13:24:50 2018 -0800

    GEODE-3643 Add function execution on specific member
    
    Refactored the Function operation handlers and added an OnGroups version
    of function execution.
---
 .../src/main/proto/v1/clientProtocol.proto         |   3 +
 .../src/main/proto/v1/function_API.proto           |  10 +
 ...> AbstractFunctionRequestOperationHandler.java} |  87 +++---
 ...cuteFunctionOnGroupRequestOperationHandler.java | 132 ++++++++
 ...uteFunctionOnMemberRequestOperationHandler.java | 144 +++++----
 ...uteFunctionOnRegionRequestOperationHandler.java | 140 ++++-----
 .../registry/ProtobufOperationContextRegistry.java |  10 +
 .../v1/ExecuteFunctionOnGroupIntegrationTest.java  | 332 +++++++++++++++++++++
 ...ionOnGroupRequestOperationHandlerJUnitTest.java | 201 +++++++++++++
 9 files changed, 863 insertions(+), 196 deletions(-)

diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto 
b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
index 13498d4..e4181c7 100644
--- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto
@@ -63,6 +63,9 @@ message Message {
 
         AuthenticationRequest authenticationRequest = 22;
         AuthenticationResponse authenticationResponse = 23;
+
+        ExecuteFunctionOnGroupRequest executeFunctionOnGroupRequest = 24;
+        ExecuteFunctionOnGroupResponse executeFunctionOnGroupResponse= 25;
     }
 }
 
diff --git a/geode-protobuf-messages/src/main/proto/v1/function_API.proto 
b/geode-protobuf-messages/src/main/proto/v1/function_API.proto
index 9558e72..6758739 100644
--- a/geode-protobuf-messages/src/main/proto/v1/function_API.proto
+++ b/geode-protobuf-messages/src/main/proto/v1/function_API.proto
@@ -38,3 +38,13 @@ message ExecuteFunctionOnMemberResponse {
     repeated EncodedValue results = 1; // some functions don't return results.
 }
 
+message ExecuteFunctionOnGroupRequest {
+    string functionID = 1;
+    repeated string groupName = 2;
+    EncodedValue arguments = 3;
+}
+
+message ExecuteFunctionOnGroupResponse {
+    repeated EncodedValue results = 1; // some functions don't return results.
+}
+
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
similarity index 64%
copy from 
geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
copy to 
geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
index 17e9f09..3a738c1 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java
@@ -14,11 +14,11 @@
  */
 package org.apache.geode.internal.protocol.protobuf.v1.operations;
 
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.geode.cache.Region;
+import com.google.protobuf.AbstractMessage;
+
 import org.apache.geode.cache.execute.Execution;
 import org.apache.geode.cache.execute.Function;
 import org.apache.geode.cache.execute.FunctionException;
@@ -26,29 +26,23 @@ import org.apache.geode.cache.execute.FunctionService;
 import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
 import org.apache.geode.internal.i18n.LocalizedStrings;
-import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.Failure;
-import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
-import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.security.NotAuthorizedException;
 
-public class ExecuteFunctionOnRegionRequestOperationHandler implements
-    ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, 
FunctionAPI.ExecuteFunctionOnRegionResponse> {
-  @Override
-  public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, 
ClientProtocol.ErrorResponse> process(
-      ProtobufSerializationService serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request,
+public abstract class AbstractFunctionRequestOperationHandler {
+
+
+  public Result process(ProtobufSerializationService serializationService, 
AbstractMessage request,
       MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
 
-    final String functionID = request.getFunctionID();
-    final String regionName = request.getRegion();
+    final String functionID = getFunctionID(request);
 
     final Function<?> function = FunctionService.getFunction(functionID);
     if (function == null) {
@@ -60,15 +54,8 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
           .build());
     }
 
-    final Region<Object, Object> region = 
messageExecutionContext.getCache().getRegion(regionName);
-    if (region == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              .setMessage("Region \"" + regionName + "\" not found"))
-          .build());
-    }
-
     final SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
+    final String regionName = getRegionName(request);
 
     try {
       // check security for function.
@@ -81,31 +68,34 @@ public class ExecuteFunctionOnRegionRequestOperationHandler 
implements
           .build());
     }
 
+    Object executionTarget = getExecutionTarget(request, regionName, 
messageExecutionContext);
+    if (executionTarget instanceof Failure) {
+      return (Failure) executionTarget;
+    }
+
     try {
-      Execution execution = FunctionService.onRegion(region);
+      Execution execution = getFunctionExecutionObject(executionTarget);
 
-      final Object arguments = 
serializationService.decode(request.getArguments());
+      Object arguments = getFunctionArguments(request, serializationService);
 
       if (arguments != null) {
         execution = execution.setArguments(arguments);
       }
 
-      execution = execution.withFilter(parseFilter(serializationService, 
request));
+      Set<?> parseFilter = parseFilter(serializationService, request);
+      if (parseFilter != null) {
+        execution = execution.withFilter(parseFilter);
+      }
 
       final ResultCollector<Object, List<Object>> resultCollector = 
execution.execute(functionID);
 
       if (function.hasResult()) {
         List<Object> results = resultCollector.getResult();
 
-        final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder 
responseMessage =
-            FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder();
-        for (Object result : results) {
-          responseMessage.addResults(serializationService.encode(result));
-        }
-        return Success.of(responseMessage.build());
+        return buildResultMessage(serializationService, results);
       } else {
         // This is fire and forget.
-        return 
Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build());
+        return buildResultMessage(serializationService);
       }
     } catch (FunctionException ex) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
@@ -120,14 +110,31 @@ public class 
ExecuteFunctionOnRegionRequestOperationHandler implements
     }
   }
 
-  private Set<Object> parseFilter(ProtobufSerializationService 
serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request) throws 
EncodingException {
-    List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList();
-    Set<Object> filter = new HashSet<>();
+  protected abstract Set<?> parseFilter(ProtobufSerializationService 
serializationService,
+      AbstractMessage request) throws EncodingException;
+
+  protected abstract String getFunctionID(AbstractMessage request);
+
+  /** the result of this may be null, which is used by the security service to 
mean "no region" */
+  protected abstract String getRegionName(AbstractMessage request);
+
+  /** region, list of members, etc */
+  protected abstract Object getExecutionTarget(AbstractMessage request, String 
regionName,
+      MessageExecutionContext executionContext) throws 
InvalidExecutionContextException;
+
+  /** arguments for the function */
+  protected abstract Object getFunctionArguments(AbstractMessage request,
+      ProtobufSerializationService serializationService) throws 
EncodingException;
+
+  protected abstract Execution getFunctionExecutionObject(Object 
executionTarget)
+      throws InvalidExecutionContextException;
+
+  protected abstract Result buildResultMessage(ProtobufSerializationService 
serializationService)
+      throws EncodingException;
+
+  protected abstract Result buildResultMessage(ProtobufSerializationService 
serializationService,
+      List<Object> results) throws EncodingException;
+
+
 
-    for (BasicTypes.EncodedValue filterKey : encodedFilter) {
-      filter.add(serializationService.decode(filterKey));
-    }
-    return filter;
-  }
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
new file mode 100644
index 0000000..4d6faa0
--- /dev/null
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import java.util.List;
+import java.util.Set;
+
+import com.google.protobuf.AbstractMessage;
+import com.google.protobuf.ProtocolStringList;
+
+import org.apache.geode.cache.execute.Execution;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.DistributedSystem;
+import org.apache.geode.internal.exception.InvalidExecutionContextException;
+import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
+import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.Failure;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupRequest;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupResponse;
+import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import org.apache.geode.internal.protocol.protobuf.v1.Success;
+import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
+
+public class ExecuteFunctionOnGroupRequestOperationHandler
+    extends AbstractFunctionRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnGroupRequest, 
ExecuteFunctionOnGroupResponse> {
+
+
+  @Override
+  public Result<ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> 
process(
+      ProtobufSerializationService serializationService, 
ExecuteFunctionOnGroupRequest request,
+      MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
+
+    return (Result<ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse>) super.process(
+        serializationService, request, messageExecutionContext);
+  }
+
+  @Override
+  protected Set<?> parseFilter(ProtobufSerializationService 
serializationService,
+      AbstractMessage request) throws EncodingException {
+    // filters are not allowed on functions not associated with regions
+    return null;
+  }
+
+  @Override
+  protected String getFunctionID(AbstractMessage request) {
+    return ((ExecuteFunctionOnGroupRequest) request).getFunctionID();
+  }
+
+  @Override
+  protected String getRegionName(AbstractMessage request) {
+    // region name is not allowed in onMember invocation
+    return null;
+  }
+
+  @Override
+  protected Object getExecutionTarget(AbstractMessage abstractRequest, String 
regionName,
+      MessageExecutionContext executionContext) throws 
InvalidExecutionContextException {
+
+    ExecuteFunctionOnGroupRequest request = ((ExecuteFunctionOnGroupRequest) 
abstractRequest);
+    ProtocolStringList groupList = request.getGroupNameList();
+
+    // unfortunately FunctionServiceManager throws a FunctionException if 
there are no
+    // servers matching any of the given groups. In order to distinguish 
between
+    // function execution failure and this condition we have to preprocess the 
groups
+    // and ensure that there is at least one server that has one of the given 
groups
+    DistributedSystem distributedSystem =
+        executionContext.getCache().getDistributionManager().getSystem();
+    boolean foundMatch = false;
+    for (String group : groupList) {
+      if (distributedSystem.getGroupMembers(group).size() > 0) {
+        foundMatch = true;
+        break;
+      }
+    }
+    if (!foundMatch) {
+      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
+          .setError(BasicTypes.Error.newBuilder()
+              .setMessage("No server  in groups " + groupList + " could be 
found to execute \""
+                  + request.getFunctionID() + "\"")
+              .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER))
+          .build());
+    }
+    return groupList;
+  }
+
+  @Override
+  protected Object getFunctionArguments(AbstractMessage request,
+      ProtobufSerializationService serializationService) throws 
EncodingException {
+    return serializationService.decode(((ExecuteFunctionOnGroupRequest) 
request).getArguments());
+  }
+
+  @Override
+  protected Execution getFunctionExecutionObject(Object executionTarget)
+      throws InvalidExecutionContextException {
+    ProtocolStringList groupList = (ProtocolStringList) executionTarget;
+    return FunctionService.onMember(groupList.toArray(new String[0]));
+  }
+
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService,
+      List<Object> results) throws EncodingException {
+    final ExecuteFunctionOnGroupResponse.Builder responseMessage =
+        ExecuteFunctionOnGroupResponse.newBuilder();
+    for (Object result : results) {
+      responseMessage.addResults(serializationService.encode(result));
+    }
+    return Success.of(responseMessage.build());
+  }
+
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService)
+      throws EncodingException {
+    return Success.of(ExecuteFunctionOnGroupResponse.newBuilder().build());
+  }
+
+}
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
index 4519f5a..cc608c4 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java
@@ -18,130 +18,120 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.protobuf.AbstractMessage;
 import com.google.protobuf.ProtocolStringList;
 
 import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.distributed.DistributedMember;
 import org.apache.geode.distributed.internal.DistributionManager;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.Failure;
-import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberRequest;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
-import 
org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.NotAuthorizedException;
 
-public class ExecuteFunctionOnMemberRequestOperationHandler implements
-    ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnMemberRequest, 
FunctionAPI.ExecuteFunctionOnMemberResponse> {
+public class ExecuteFunctionOnMemberRequestOperationHandler
+    extends AbstractFunctionRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnMemberRequest, 
ExecuteFunctionOnMemberResponse> {
+
+
   @Override
-  public Result<FunctionAPI.ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse> process(
-      ProtobufSerializationService serializationService,
-      FunctionAPI.ExecuteFunctionOnMemberRequest request,
+  public Result<ExecuteFunctionOnMemberResponse, ClientProtocol.ErrorResponse> 
process(
+      ProtobufSerializationService serializationService, 
ExecuteFunctionOnMemberRequest request,
       MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
 
-    final String functionID = request.getFunctionID();
+    return (Result<ExecuteFunctionOnMemberResponse, 
ClientProtocol.ErrorResponse>) super.process(
+        serializationService, request, messageExecutionContext);
+  }
 
-    final Function<?> function = FunctionService.getFunction(functionID);
-    if (function == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              
.setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
-                  .toLocalizedString(functionID))
-              .build())
-          .build());
-    }
+  @Override
+  protected Set<?> parseFilter(ProtobufSerializationService 
serializationService,
+      AbstractMessage request) throws EncodingException {
+    // filters are not allowed on functions not associated with regions
+    return null;
+  }
+
+  @Override
+  protected String getFunctionID(AbstractMessage request) {
+    return ((ExecuteFunctionOnMemberRequest) request).getFunctionID();
+  }
 
-    final SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
+  @Override
+  protected String getRegionName(AbstractMessage request) {
+    // region name is not allowed in onMember invocation
+    return null;
+  }
 
-    try {
-      // check security for function.
-      final String noRegion = null;
-      
function.getRequiredPermissions(noRegion).forEach(securityService::authorize);
-    } catch (NotAuthorizedException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          .setError(BasicTypes.Error.newBuilder()
-              .setMessage("Authorization failed for function \"" + functionID 
+ "\"")
-              .setErrorCode(BasicTypes.ErrorCode.AUTHORIZATION_FAILED))
-          .build());
-    }
+  @Override
+  protected Object getExecutionTarget(AbstractMessage abstractRequest, String 
regionName,
+      MessageExecutionContext executionContext) throws 
InvalidExecutionContextException {
+    ExecuteFunctionOnMemberRequest request = (ExecuteFunctionOnMemberRequest) 
abstractRequest;
 
     ProtocolStringList memberNameList = request.getMemberNameList();
 
     Set<DistributedMember> memberIds = new HashSet<>(memberNameList.size());
-    DistributionManager distributionManager =
-        messageExecutionContext.getCache().getDistributionManager();
+    DistributionManager distributionManager = 
executionContext.getCache().getDistributionManager();
     for (String name : memberNameList) {
       DistributedMember member = distributionManager.getMemberWithName(name);
       if (member == null) {
         return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
             .setError(BasicTypes.Error.newBuilder()
-                .setMessage("Member " + name + " not found to execute \"" + 
functionID + "\"")
+                .setMessage(
+                    "Member " + name + " not found to execute \"" + 
request.getFunctionID() + "\"")
                 .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER))
             .build());
       }
       memberIds.add(member);
     }
-
     if (memberIds.isEmpty()) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
           .setError(BasicTypes.Error.newBuilder()
-              .setMessage("No members found to execute \"" + functionID + "\"")
+              .setMessage("No members found to execute \"" + 
request.getFunctionID() + "\"")
               .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER))
           .build());
     }
+    return memberIds;
+  }
 
-    try {
-      Execution execution;
-      if (memberIds.size() == 1) {
-        execution = FunctionService.onMember(memberIds.iterator().next());
-      } else {
-        execution = FunctionService.onMembers(memberIds);
-      }
-
-      final Object arguments = 
serializationService.decode(request.getArguments());
-
-      if (arguments != null) {
-        execution = execution.setArguments(arguments);
-      }
-
-      final ResultCollector<Object, List<Object>> resultCollector = 
execution.execute(functionID);
+  @Override
+  protected Object getFunctionArguments(AbstractMessage request,
+      ProtobufSerializationService serializationService) throws 
EncodingException {
+    return serializationService.decode(((ExecuteFunctionOnMemberRequest) 
request).getArguments());
+  }
 
-      if (function.hasResult()) {
-        List<Object> results = resultCollector.getResult();
+  @Override
+  protected Execution getFunctionExecutionObject(Object executionTarget) {
+    Set<DistributedMember> memberIds = (Set<DistributedMember>) 
executionTarget;
+    if (memberIds.size() == 1) {
+      return FunctionService.onMember(memberIds.iterator().next());
+    } else {
+      return FunctionService.onMembers(memberIds);
+    }
+  }
 
-        final FunctionAPI.ExecuteFunctionOnMemberResponse.Builder 
responseMessage =
-            FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder();
-        for (Object result : results) {
-          responseMessage.addResults(serializationService.encode(result));
-        }
-        return Success.of(responseMessage.build());
-      } else {
-        // This is fire and forget.
-        return 
Success.of(FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder().build());
-      }
-    } catch (FunctionException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
-              .setMessage("Function execution failed: " + ex.toString()))
-          .build());
-    } catch (EncodingException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
-              .setMessage("Encoding failed: " + ex.toString()))
-          .build());
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService,
+      List<Object> results) throws EncodingException {
+    final ExecuteFunctionOnMemberResponse.Builder responseMessage =
+        ExecuteFunctionOnMemberResponse.newBuilder();
+    for (Object result : results) {
+      responseMessage.addResults(serializationService.encode(result));
     }
+    return Success.of(responseMessage.build());
+  }
+
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService)
+      throws EncodingException {
+    return Success.of(ExecuteFunctionOnMemberResponse.newBuilder().build());
   }
 
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
index 17e9f09..b0798ce 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java
@@ -18,116 +18,98 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
+import com.google.protobuf.AbstractMessage;
+
 import org.apache.geode.cache.Region;
 import org.apache.geode.cache.execute.Execution;
-import org.apache.geode.cache.execute.Function;
-import org.apache.geode.cache.execute.FunctionException;
 import org.apache.geode.cache.execute.FunctionService;
-import org.apache.geode.cache.execute.ResultCollector;
 import org.apache.geode.internal.exception.InvalidExecutionContextException;
-import org.apache.geode.internal.i18n.LocalizedStrings;
 import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler;
 import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.Failure;
-import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionRequest;
+import 
org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionResponse;
 import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext;
 import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
 import org.apache.geode.internal.protocol.protobuf.v1.Result;
 import org.apache.geode.internal.protocol.protobuf.v1.Success;
 import 
org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException;
-import org.apache.geode.internal.security.SecurityService;
-import org.apache.geode.security.NotAuthorizedException;
 
-public class ExecuteFunctionOnRegionRequestOperationHandler implements
-    ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, 
FunctionAPI.ExecuteFunctionOnRegionResponse> {
+public class ExecuteFunctionOnRegionRequestOperationHandler
+    extends AbstractFunctionRequestOperationHandler implements
+    ProtobufOperationHandler<ExecuteFunctionOnRegionRequest, 
ExecuteFunctionOnRegionResponse> {
+
   @Override
-  public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, 
ClientProtocol.ErrorResponse> process(
-      ProtobufSerializationService serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request,
+  public Result<ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse> 
process(
+      ProtobufSerializationService serializationService, 
ExecuteFunctionOnRegionRequest request,
       MessageExecutionContext messageExecutionContext) throws 
InvalidExecutionContextException {
 
-    final String functionID = request.getFunctionID();
-    final String regionName = request.getRegion();
+    return (Result<ExecuteFunctionOnRegionResponse, 
ClientProtocol.ErrorResponse>) super.process(
+        serializationService, request, messageExecutionContext);
+  }
 
-    final Function<?> function = FunctionService.getFunction(functionID);
-    if (function == null) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
-              
.setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED
-                  .toLocalizedString(functionID))
-              .build())
-          .build());
+  protected Set<Object> parseFilter(ProtobufSerializationService 
serializationService,
+      AbstractMessage request) throws EncodingException {
+    List<BasicTypes.EncodedValue> encodedFilter =
+        ((ExecuteFunctionOnRegionRequest) request).getKeyFilterList();
+    Set<Object> filter = new HashSet<>();
+
+    for (BasicTypes.EncodedValue filterKey : encodedFilter) {
+      filter.add(serializationService.decode(filterKey));
     }
+    return filter;
+  }
 
-    final Region<Object, Object> region = 
messageExecutionContext.getCache().getRegion(regionName);
+  @Override
+  protected String getFunctionID(AbstractMessage request) {
+    return ((ExecuteFunctionOnRegionRequest) request).getFunctionID();
+  }
+
+  @Override
+  protected String getRegionName(AbstractMessage request) {
+    return ((ExecuteFunctionOnRegionRequest) request).getRegion();
+  }
+
+  @Override
+  protected Object getExecutionTarget(AbstractMessage request, String 
regionName,
+      MessageExecutionContext executionContext) throws 
InvalidExecutionContextException {
+    final Region<Object, Object> region = 
executionContext.getCache().getRegion(regionName);
     if (region == null) {
       return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
           
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST)
               .setMessage("Region \"" + regionName + "\" not found"))
           .build());
     }
+    return region;
+  }
 
-    final SecurityService securityService = 
messageExecutionContext.getCache().getSecurityService();
-
-    try {
-      // check security for function.
-      
function.getRequiredPermissions(regionName).forEach(securityService::authorize);
-    } catch (NotAuthorizedException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          .setError(BasicTypes.Error.newBuilder()
-              .setMessage("Authorization failed for function \"" + functionID 
+ "\"")
-              .setErrorCode(BasicTypes.ErrorCode.AUTHORIZATION_FAILED))
-          .build());
-    }
-
-    try {
-      Execution execution = FunctionService.onRegion(region);
-
-      final Object arguments = 
serializationService.decode(request.getArguments());
-
-      if (arguments != null) {
-        execution = execution.setArguments(arguments);
-      }
-
-      execution = execution.withFilter(parseFilter(serializationService, 
request));
-
-      final ResultCollector<Object, List<Object>> resultCollector = 
execution.execute(functionID);
+  @Override
+  protected Object getFunctionArguments(AbstractMessage request,
+      ProtobufSerializationService serializationService) throws 
EncodingException {
+    return serializationService.decode(((ExecuteFunctionOnRegionRequest) 
request).getArguments());
+  }
 
-      if (function.hasResult()) {
-        List<Object> results = resultCollector.getResult();
+  @Override
+  protected Execution getFunctionExecutionObject(Object executionTarget) {
+    return FunctionService.onRegion((Region) executionTarget);
+  }
 
-        final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder 
responseMessage =
-            FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder();
-        for (Object result : results) {
-          responseMessage.addResults(serializationService.encode(result));
-        }
-        return Success.of(responseMessage.build());
-      } else {
-        // This is fire and forget.
-        return 
Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build());
-      }
-    } catch (FunctionException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
-              .setMessage("Function execution failed: " + ex.toString()))
-          .build());
-    } catch (EncodingException ex) {
-      return Failure.of(ClientProtocol.ErrorResponse.newBuilder()
-          
.setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR)
-              .setMessage("Encoding failed: " + ex.toString()))
-          .build());
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService,
+      List<Object> results) throws EncodingException {
+    final ExecuteFunctionOnRegionResponse.Builder responseMessage =
+        ExecuteFunctionOnRegionResponse.newBuilder();
+    for (Object result : results) {
+      responseMessage.addResults(serializationService.encode(result));
     }
+    return Success.of(responseMessage.build());
   }
 
-  private Set<Object> parseFilter(ProtobufSerializationService 
serializationService,
-      FunctionAPI.ExecuteFunctionOnRegionRequest request) throws 
EncodingException {
-    List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList();
-    Set<Object> filter = new HashSet<>();
-
-    for (BasicTypes.EncodedValue filterKey : encodedFilter) {
-      filter.add(serializationService.decode(filterKey));
-    }
-    return filter;
+  @Override
+  protected Result buildResultMessage(ProtobufSerializationService 
serializationService)
+      throws EncodingException {
+    return Success.of(ExecuteFunctionOnRegionResponse.newBuilder().build());
   }
+
 }
diff --git 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
index f9bbba1..6a88e41 100644
--- 
a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
+++ 
b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java
@@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.geode.annotations.Experimental;
 import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
 import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext;
+import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler;
 import 
org.apache.geode.internal.protocol.protobuf.v1.operations.GetAllRequestOperationHandler;
@@ -128,5 +129,14 @@ public class ProtobufOperationContextRegistry {
             // Resource permissions get handled per-function, since they have 
varying permission
             // requirements.
             new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
+
+    
operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPREQUEST,
+        new 
ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnGroupRequest,
+            new ExecuteFunctionOnGroupRequestOperationHandler(),
+            opsResp -> ClientProtocol.Message.newBuilder()
+                .setExecuteFunctionOnGroupResponse(opsResp),
+            // Resource permissions get handled per-function, since they have 
varying permission
+            // requirements.
+            new ResourcePermission(ResourcePermission.NULL, 
ResourcePermission.NULL)));
   }
 }
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java
new file mode 100644
index 0000000..9746af8
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1;
+
+import static 
org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.contrib.java.lang.system.RestoreSystemProperties;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.Cache;
+import org.apache.geode.cache.CacheFactory;
+import org.apache.geode.cache.DataPolicy;
+import org.apache.geode.cache.RegionFactory;
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionException;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.cache.server.CacheServer;
+import org.apache.geode.distributed.ConfigurationProperties;
+import org.apache.geode.internal.AvailablePortHelper;
+import org.apache.geode.management.internal.security.ResourceConstants;
+import org.apache.geode.security.ResourcePermission;
+import org.apache.geode.security.SecurityManager;
+import org.apache.geode.test.junit.categories.IntegrationTest;
+
+@Category(IntegrationTest.class)
+public class ExecuteFunctionOnGroupIntegrationTest {
+  private static final String TEST_REGION = "testRegion";
+  private static final String TEST_FUNCTION_ID = "testFunction";
+  private static final String SECURITY_PRINCIPAL = "principle";
+  private static final String COMPUTE_SERVERS = "computeServers";
+  private ProtobufSerializationService serializationService;
+  private Socket socket;
+  private Cache cache;
+  private SecurityManager securityManager;
+
+  @Rule
+  public RestoreSystemProperties restoreSystemProperties = new 
RestoreSystemProperties();
+
+  @Before
+  public void setUp() throws Exception {
+    CacheFactory cacheFactory = new CacheFactory(new Properties());
+    cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0");
+    cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, 
"false");
+    cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, 
"false");
+    cacheFactory.set(ConfigurationProperties.GROUPS, COMPUTE_SERVERS);
+
+    securityManager = mock(SecurityManager.class);
+    cacheFactory.setSecurityManager(securityManager);
+    when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL);
+    when(securityManager.authorize(eq(SECURITY_PRINCIPAL), 
any())).thenReturn(true);
+
+    cache = cacheFactory.create();
+
+    CacheServer cacheServer = cache.addCacheServer();
+    int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort();
+    cacheServer.setPort(cacheServerPort);
+    cacheServer.start();
+
+    RegionFactory<Object, Object> regionFactory = cache.createRegionFactory();
+    regionFactory.setDataPolicy(DataPolicy.PARTITION);
+    regionFactory.create(TEST_REGION);
+
+
+    System.setProperty("geode.feature-protobuf-protocol", "true");
+
+    socket = new Socket("localhost", cacheServerPort);
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected);
+
+    MessageUtil.performAndVerifyHandshake(socket);
+
+    serializationService = new ProtobufSerializationService();
+  }
+
+  private static class TestFunction<T> implements Function<T> {
+    private final java.util.function.Function<FunctionContext<T>, Object> 
executeFunction;
+    // non-null iff function has been executed.
+    private final AtomicReference<FunctionContext> context = new 
AtomicReference<>();
+    private final boolean hasResult;
+
+    TestFunction() {
+      this.executeFunction = arg -> null;
+      this.hasResult = true;
+    }
+
+    TestFunction(java.util.function.Function<FunctionContext<T>, Object> 
executeFunction,
+        boolean hasResult) {
+      this.executeFunction = executeFunction;
+      this.hasResult = hasResult;
+    }
+
+    @Override
+    public String getId() {
+      return TEST_FUNCTION_ID;
+    }
+
+    @Override
+    public void execute(FunctionContext<T> context) {
+      this.context.set(context);
+      context.getResultSender().lastResult(executeFunction.apply(context));
+    }
+
+    @Override
+    public boolean hasResult() {
+      return hasResult;
+    }
+
+    @Override
+    public boolean isHA() {
+      // set for testing; we shouldn't need to test with isHA true because 
that's function service
+      // details.
+      return false;
+    }
+
+    FunctionContext getContext() {
+      return context.get();
+    }
+  }
+
+  @After
+  public void tearDown() {
+    cache.close();
+    try {
+      socket.close();
+    } catch (IOException ignore) {
+    }
+    FunctionService.unregisterFunction(TEST_FUNCTION_ID);
+  }
+
+  @Test
+  public void handlesNoResultFunction() throws IOException {
+    TestFunction<Object> testFunction = new TestFunction<>(context -> null, 
false);
+    FunctionService.registerFunction(testFunction);
+    final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
+
+    assertNotNull(responseMessage);
+    assertEquals(EXECUTEFUNCTIONONGROUPRESPONSE, 
responseMessage.getMessageTypeCase());
+    final FunctionAPI.ExecuteFunctionOnGroupResponse 
executeFunctionOnGroupResponse =
+        responseMessage.getExecuteFunctionOnGroupResponse();
+
+    assertEquals(0, executeFunctionOnGroupResponse.getResultsCount());
+
+    Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> 
testFunction.getContext() != null);
+  }
+
+  @Test
+  public void handlesResultFunction() throws Exception {
+    final TestFunction<Object> testFunction =
+        new TestFunction<>(functionContext -> Integer.valueOf(22), true);
+    FunctionService.registerFunction(testFunction);
+    final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
+
+    final FunctionAPI.ExecuteFunctionOnGroupResponse 
executeFunctionOnGroupResponse =
+        getFunctionResponse(responseMessage);
+
+    assertEquals(1, executeFunctionOnGroupResponse.getResultsCount());
+
+    final Object responseValue =
+        
serializationService.decode(executeFunctionOnGroupResponse.getResults(0));
+    assertTrue(responseValue instanceof Integer);
+    assertEquals(22, responseValue);
+  }
+
+  @Test
+  public void handlesException() throws IOException {
+    final TestFunction<Object> testFunction = new TestFunction<>(context -> {
+      throw new FunctionException();
+    }, true);
+    FunctionService.registerFunction(testFunction);
+
+    final ClientProtocol.Message message = authenticateAndSendMessage();
+
+    assertEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE,
+        message.getMessageTypeCase());
+    final BasicTypes.Error error = message.getErrorResponse().getError();
+    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, error.getErrorCode());
+  }
+
+  @Test
+  public void handlesObjectThatCannotBeDecoded() throws IOException {
+    final TestFunction<Object> testFunction = new TestFunction<>(context -> {
+      return new Object();
+    }, true);
+    FunctionService.registerFunction(testFunction);
+
+    final ClientProtocol.Message message = authenticateAndSendMessage();
+
+
+    assertEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE,
+        message.getMessageTypeCase());
+    final BasicTypes.Error error = message.getErrorResponse().getError();
+
+    assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, error.getErrorCode());
+
+  }
+
+  @Test
+  public void handlesNullReturnValues() throws Exception {
+    final TestFunction<Object> testFunction = new 
TestFunction<>(functionContext -> null, true);
+    FunctionService.registerFunction(testFunction);
+    final ClientProtocol.Message responseMessage = 
authenticateAndSendMessage();
+
+    final FunctionAPI.ExecuteFunctionOnGroupResponse 
executeFunctionOnGroupResponse =
+        getFunctionResponse(responseMessage);
+
+    assertEquals(1, executeFunctionOnGroupResponse.getResultsCount());
+
+    final Object responseValue =
+        
serializationService.decode(executeFunctionOnGroupResponse.getResults(0));
+    assertNull(responseValue);
+  }
+
+  @Test
+  public void argumentsArePassedToFunction() throws Exception {
+    final TestFunction<Object> testFunction =
+        new TestFunction<>(functionContext -> functionContext.getArguments(), 
true);
+    FunctionService.registerFunction(testFunction);
+    ClientProtocol.Message.Builder message = createRequestMessageBuilder(
+        
FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            
.addGroupName(COMPUTE_SERVERS).setArguments(serializationService.encode("hello")));
+
+    authenticateWithServer();
+    final ClientProtocol.Message responseMessage = 
writeMessage(message.build());
+
+    FunctionAPI.ExecuteFunctionOnGroupResponse response = 
getFunctionResponse(responseMessage);
+
+    assertEquals("hello", serializationService.decode(response.getResults(0)));
+  }
+
+  @Test
+  public void permissionsAreRequiredToExecute() throws IOException {
+    final ResourcePermission requiredPermission = new ResourcePermission(
+        ResourcePermission.Resource.DATA, ResourcePermission.Operation.MANAGE);
+
+    final TestFunction<Object> testFunction = new TestFunction<Object>() {
+      @Override
+      public Collection<ResourcePermission> getRequiredPermissions(String 
regionName) {
+        return Arrays.asList(requiredPermission);
+      }
+    };
+    FunctionService.registerFunction(testFunction);
+
+    when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL);
+
+    when(securityManager.authorize(eq(SECURITY_PRINCIPAL), 
eq(requiredPermission)))
+        .thenReturn(false);
+
+    final ClientProtocol.Message message = authenticateAndSendMessage();
+    assertEquals("message=" + message, 
BasicTypes.ErrorCode.AUTHORIZATION_FAILED,
+        message.getErrorResponse().getError().getErrorCode());
+
+    verify(securityManager).authorize(eq(SECURITY_PRINCIPAL), 
eq(requiredPermission));
+  }
+
+  private FunctionAPI.ExecuteFunctionOnGroupResponse getFunctionResponse(
+      ClientProtocol.Message responseMessage) {
+    assertEquals(responseMessage.toString(),
+        ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE,
+        responseMessage.getMessageTypeCase());
+    return responseMessage.getExecuteFunctionOnGroupResponse();
+  }
+
+  private void authenticateWithServer() throws IOException {
+    ClientProtocol.Message.Builder request = 
ClientProtocol.Message.newBuilder()
+        
.setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder()
+            .putCredentials(ResourceConstants.USER_NAME, "someuser")
+            .putCredentials(ResourceConstants.PASSWORD, "somepassword"));
+
+    ClientProtocol.Message response = writeMessage(request.build());
+    assertEquals(response.toString(), true,
+        response.getAuthenticationResponse().getAuthenticated());
+  }
+
+
+  private ClientProtocol.Message authenticateAndSendMessage() throws 
IOException {
+    authenticateWithServer();
+
+    final ClientProtocol.Message request =
+        
createRequestMessageBuilder(FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder()
+            
.setFunctionID(TEST_FUNCTION_ID).addGroupName(COMPUTE_SERVERS)).build();
+
+    return writeMessage(request);
+  }
+
+
+  private ClientProtocol.Message.Builder createRequestMessageBuilder(
+      FunctionAPI.ExecuteFunctionOnGroupRequest.Builder functionRequest) {
+    return 
ClientProtocol.Message.newBuilder().setExecuteFunctionOnGroupRequest(functionRequest);
+  }
+
+  private ClientProtocol.Message writeMessage(ClientProtocol.Message request) 
throws IOException {
+    request.writeDelimitedTo(socket.getOutputStream());
+
+    return ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream());
+  }
+
+}
diff --git 
a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
new file mode 100644
index 0000000..6449c3d
--- /dev/null
+++ 
b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.protocol.protobuf.v1.operations;
+
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED;
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST;
+import static 
org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.NO_AVAILABLE_SERVER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.geode.cache.execute.Function;
+import org.apache.geode.cache.execute.FunctionContext;
+import org.apache.geode.cache.execute.FunctionService;
+import org.apache.geode.distributed.DistributedMember;
+import org.apache.geode.distributed.DistributedSystemDisconnectedException;
+import org.apache.geode.distributed.internal.DistributionManager;
+import org.apache.geode.distributed.internal.InternalDistributedSystem;
+import 
org.apache.geode.distributed.internal.membership.InternalDistributedMember;
+import org.apache.geode.internal.cache.InternalCache;
+import 
org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics;
+import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol;
+import org.apache.geode.internal.protocol.protobuf.v1.Failure;
+import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService;
+import org.apache.geode.internal.protocol.protobuf.v1.Result;
+import 
org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext;
+import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.management.internal.security.ResourcePermissions;
+import org.apache.geode.security.NotAuthorizedException;
+import org.apache.geode.test.junit.categories.UnitTest;
+
+@Category(UnitTest.class)
+public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest {
+  private static final String TEST_GROUP1 = "group1";
+  private static final String TEST_GROUP2 = "group2";
+  private static final String TEST_FUNCTION_ID = "testFunction";
+  public static final String NOT_A_GROUP = "notAGroup";
+  private InternalCache cacheStub;
+  private DistributionManager distributionManager;
+  private ExecuteFunctionOnGroupRequestOperationHandler operationHandler;
+  private ProtobufSerializationService serializationService;
+  private TestFunction function;
+  private InternalDistributedSystem distributedSystem;
+
+  private static class TestFunction implements Function {
+    // non-null iff function has been executed.
+    private AtomicReference<FunctionContext> context = new AtomicReference<>();
+
+    @Override
+    public String getId() {
+      return TEST_FUNCTION_ID;
+    }
+
+    @Override
+    public void execute(FunctionContext context) {
+      this.context.set(context);
+      context.getResultSender().lastResult("result");
+    }
+
+    FunctionContext getContext() {
+      return context.get();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    cacheStub = mock(InternalCache.class);
+    serializationService = new ProtobufSerializationService();
+    
when(cacheStub.getSecurityService()).thenReturn(mock(SecurityService.class));
+
+    distributionManager = mock(DistributionManager.class);
+    when(cacheStub.getDistributionManager()).thenReturn(distributionManager);
+
+    distributedSystem = mock(InternalDistributedSystem.class);
+    when(distributionManager.getSystem()).thenReturn(distributedSystem);
+
+    InternalDistributedMember localhost = new 
InternalDistributedMember("localhost", 0);
+    Set<DistributedMember> members = new HashSet<>();
+    members.add(localhost);
+    when(distributedSystem.getGroupMembers(TEST_GROUP1)).thenReturn(members);
+
+    operationHandler = new ExecuteFunctionOnGroupRequestOperationHandler();
+
+    function = new TestFunction();
+    FunctionService.registerFunction(function);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    FunctionService.unregisterFunction(TEST_FUNCTION_ID);
+  }
+
+  @Test
+  public void failsOnUnknownGroup() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
+        
FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addGroupName(NOT_A_GROUP).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+    assertEquals(NO_AVAILABLE_SERVER, 
result.getErrorMessage().getError().getErrorCode());
+  }
+
+  @Test
+  public void failsIfNoGroupSpecified() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
+        
FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+    assertEquals(NO_AVAILABLE_SERVER, 
result.getErrorMessage().getError().getErrorCode());
+  }
+
+  @Test(expected = DistributedSystemDisconnectedException.class)
+  public void succeedsWithValidMembers() throws Exception {
+    when(distributionManager.getMemberWithName(any(String.class))).thenReturn(
+        new InternalDistributedMember("localhost", 0),
+        new InternalDistributedMember("localhost", 1), null);
+
+    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
+        
FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addGroupName(TEST_GROUP1).addGroupName(TEST_GROUP2).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    // unfortunately FunctionService fishes for a DistributedSystem and throws 
an exception
+    // if it can't find one. It uses a static method on 
InternalDistributedSystem, so no
+    // mocking is possible. If the test throws 
DistributedSystemDisconnectedException it
+    // means that the operation handler got to the point of trying get an 
execution
+    // context
+  }
+
+  @Test
+  public void requiresPermissions() throws Exception {
+    final SecurityService securityService = mock(SecurityService.class);
+    doThrow(new NotAuthorizedException("we should catch 
this")).when(securityService)
+        .authorize(ResourcePermissions.DATA_WRITE);
+    when(cacheStub.getSecurityService()).thenReturn(securityService);
+
+    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
+        
FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID)
+            .addGroupName(TEST_GROUP1).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    assertTrue(result instanceof Failure);
+
+    assertEquals(AUTHORIZATION_FAILED, 
result.getErrorMessage().getError().getErrorCode());
+
+  }
+
+  @Test
+  public void functionNotFound() throws Exception {
+    final FunctionAPI.ExecuteFunctionOnGroupRequest request =
+        FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder()
+            .setFunctionID("I am not a function, I am a 
human").addGroupName(TEST_GROUP1).build();
+
+    final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, 
ClientProtocol.ErrorResponse> result =
+        operationHandler.process(serializationService, request, 
mockedMessageExecutionContext());
+
+    final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage();
+
+    assertEquals(INVALID_REQUEST, errorMessage.getError().getErrorCode());
+  }
+
+  private ServerMessageExecutionContext mockedMessageExecutionContext() {
+    return new ServerMessageExecutionContext(cacheStub, 
mock(ProtobufClientStatistics.class), null);
+  }
+}

-- 
To stop receiving notification emails like this one, please contact
bschucha...@apache.org.

Reply via email to