This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch feature/GEODE-4362 in repository https://gitbox.apache.org/repos/asf/geode.git
commit d7260c190774f966acd61f121b029345c814f6f4 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Feb 9 13:24:50 2018 -0800 GEODE-3643 Add function execution on specific member Refactored the Function operation handlers and added an OnGroups version of function execution. --- .../src/main/proto/v1/clientProtocol.proto | 3 + .../src/main/proto/v1/function_API.proto | 10 + ...> AbstractFunctionRequestOperationHandler.java} | 87 +++--- ...cuteFunctionOnGroupRequestOperationHandler.java | 132 ++++++++ ...uteFunctionOnMemberRequestOperationHandler.java | 144 +++++---- ...uteFunctionOnRegionRequestOperationHandler.java | 140 ++++----- .../registry/ProtobufOperationContextRegistry.java | 10 + .../v1/ExecuteFunctionOnGroupIntegrationTest.java | 332 +++++++++++++++++++++ ...ionOnGroupRequestOperationHandlerJUnitTest.java | 201 +++++++++++++ 9 files changed, 863 insertions(+), 196 deletions(-) diff --git a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto index 13498d4..e4181c7 100644 --- a/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto +++ b/geode-protobuf-messages/src/main/proto/v1/clientProtocol.proto @@ -63,6 +63,9 @@ message Message { AuthenticationRequest authenticationRequest = 22; AuthenticationResponse authenticationResponse = 23; + + ExecuteFunctionOnGroupRequest executeFunctionOnGroupRequest = 24; + ExecuteFunctionOnGroupResponse executeFunctionOnGroupResponse= 25; } } diff --git a/geode-protobuf-messages/src/main/proto/v1/function_API.proto b/geode-protobuf-messages/src/main/proto/v1/function_API.proto index 9558e72..6758739 100644 --- a/geode-protobuf-messages/src/main/proto/v1/function_API.proto +++ b/geode-protobuf-messages/src/main/proto/v1/function_API.proto @@ -38,3 +38,13 @@ message ExecuteFunctionOnMemberResponse { repeated EncodedValue results = 1; // some functions don't return results. } +message ExecuteFunctionOnGroupRequest { + string functionID = 1; + repeated string groupName = 2; + EncodedValue arguments = 3; +} + +message ExecuteFunctionOnGroupResponse { + repeated EncodedValue results = 1; // some functions don't return results. +} + diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java similarity index 64% copy from geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java copy to geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java index 17e9f09..3a738c1 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/AbstractFunctionRequestOperationHandler.java @@ -14,11 +14,11 @@ */ package org.apache.geode.internal.protocol.protobuf.v1.operations; -import java.util.HashSet; import java.util.List; import java.util.Set; -import org.apache.geode.cache.Region; +import com.google.protobuf.AbstractMessage; + import org.apache.geode.cache.execute.Execution; import org.apache.geode.cache.execute.Function; import org.apache.geode.cache.execute.FunctionException; @@ -26,29 +26,23 @@ import org.apache.geode.cache.execute.FunctionService; import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.internal.exception.InvalidExecutionContextException; import org.apache.geode.internal.i18n.LocalizedStrings; -import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.Failure; -import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI; import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; import org.apache.geode.internal.protocol.protobuf.v1.Result; -import org.apache.geode.internal.protocol.protobuf.v1.Success; import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException; import org.apache.geode.internal.security.SecurityService; import org.apache.geode.security.NotAuthorizedException; -public class ExecuteFunctionOnRegionRequestOperationHandler implements - ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, FunctionAPI.ExecuteFunctionOnRegionResponse> { - @Override - public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse> process( - ProtobufSerializationService serializationService, - FunctionAPI.ExecuteFunctionOnRegionRequest request, +public abstract class AbstractFunctionRequestOperationHandler { + + + public Result process(ProtobufSerializationService serializationService, AbstractMessage request, MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException { - final String functionID = request.getFunctionID(); - final String regionName = request.getRegion(); + final String functionID = getFunctionID(request); final Function<?> function = FunctionService.getFunction(functionID); if (function == null) { @@ -60,15 +54,8 @@ public class ExecuteFunctionOnRegionRequestOperationHandler implements .build()); } - final Region<Object, Object> region = messageExecutionContext.getCache().getRegion(regionName); - if (region == null) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST) - .setMessage("Region \"" + regionName + "\" not found")) - .build()); - } - final SecurityService securityService = messageExecutionContext.getCache().getSecurityService(); + final String regionName = getRegionName(request); try { // check security for function. @@ -81,31 +68,34 @@ public class ExecuteFunctionOnRegionRequestOperationHandler implements .build()); } + Object executionTarget = getExecutionTarget(request, regionName, messageExecutionContext); + if (executionTarget instanceof Failure) { + return (Failure) executionTarget; + } + try { - Execution execution = FunctionService.onRegion(region); + Execution execution = getFunctionExecutionObject(executionTarget); - final Object arguments = serializationService.decode(request.getArguments()); + Object arguments = getFunctionArguments(request, serializationService); if (arguments != null) { execution = execution.setArguments(arguments); } - execution = execution.withFilter(parseFilter(serializationService, request)); + Set<?> parseFilter = parseFilter(serializationService, request); + if (parseFilter != null) { + execution = execution.withFilter(parseFilter); + } final ResultCollector<Object, List<Object>> resultCollector = execution.execute(functionID); if (function.hasResult()) { List<Object> results = resultCollector.getResult(); - final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder responseMessage = - FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder(); - for (Object result : results) { - responseMessage.addResults(serializationService.encode(result)); - } - return Success.of(responseMessage.build()); + return buildResultMessage(serializationService, results); } else { // This is fire and forget. - return Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build()); + return buildResultMessage(serializationService); } } catch (FunctionException ex) { return Failure.of(ClientProtocol.ErrorResponse.newBuilder() @@ -120,14 +110,31 @@ public class ExecuteFunctionOnRegionRequestOperationHandler implements } } - private Set<Object> parseFilter(ProtobufSerializationService serializationService, - FunctionAPI.ExecuteFunctionOnRegionRequest request) throws EncodingException { - List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList(); - Set<Object> filter = new HashSet<>(); + protected abstract Set<?> parseFilter(ProtobufSerializationService serializationService, + AbstractMessage request) throws EncodingException; + + protected abstract String getFunctionID(AbstractMessage request); + + /** the result of this may be null, which is used by the security service to mean "no region" */ + protected abstract String getRegionName(AbstractMessage request); + + /** region, list of members, etc */ + protected abstract Object getExecutionTarget(AbstractMessage request, String regionName, + MessageExecutionContext executionContext) throws InvalidExecutionContextException; + + /** arguments for the function */ + protected abstract Object getFunctionArguments(AbstractMessage request, + ProtobufSerializationService serializationService) throws EncodingException; + + protected abstract Execution getFunctionExecutionObject(Object executionTarget) + throws InvalidExecutionContextException; + + protected abstract Result buildResultMessage(ProtobufSerializationService serializationService) + throws EncodingException; + + protected abstract Result buildResultMessage(ProtobufSerializationService serializationService, + List<Object> results) throws EncodingException; + + - for (BasicTypes.EncodedValue filterKey : encodedFilter) { - filter.add(serializationService.decode(filterKey)); - } - return filter; - } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java new file mode 100644 index 0000000..4d6faa0 --- /dev/null +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandler.java @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1.operations; + +import java.util.List; +import java.util.Set; + +import com.google.protobuf.AbstractMessage; +import com.google.protobuf.ProtocolStringList; + +import org.apache.geode.cache.execute.Execution; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedSystem; +import org.apache.geode.internal.exception.InvalidExecutionContextException; +import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler; +import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.Failure; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupRequest; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnGroupResponse; +import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext; +import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; +import org.apache.geode.internal.protocol.protobuf.v1.Result; +import org.apache.geode.internal.protocol.protobuf.v1.Success; +import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException; + +public class ExecuteFunctionOnGroupRequestOperationHandler + extends AbstractFunctionRequestOperationHandler implements + ProtobufOperationHandler<ExecuteFunctionOnGroupRequest, ExecuteFunctionOnGroupResponse> { + + + @Override + public Result<ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> process( + ProtobufSerializationService serializationService, ExecuteFunctionOnGroupRequest request, + MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException { + + return (Result<ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse>) super.process( + serializationService, request, messageExecutionContext); + } + + @Override + protected Set<?> parseFilter(ProtobufSerializationService serializationService, + AbstractMessage request) throws EncodingException { + // filters are not allowed on functions not associated with regions + return null; + } + + @Override + protected String getFunctionID(AbstractMessage request) { + return ((ExecuteFunctionOnGroupRequest) request).getFunctionID(); + } + + @Override + protected String getRegionName(AbstractMessage request) { + // region name is not allowed in onMember invocation + return null; + } + + @Override + protected Object getExecutionTarget(AbstractMessage abstractRequest, String regionName, + MessageExecutionContext executionContext) throws InvalidExecutionContextException { + + ExecuteFunctionOnGroupRequest request = ((ExecuteFunctionOnGroupRequest) abstractRequest); + ProtocolStringList groupList = request.getGroupNameList(); + + // unfortunately FunctionServiceManager throws a FunctionException if there are no + // servers matching any of the given groups. In order to distinguish between + // function execution failure and this condition we have to preprocess the groups + // and ensure that there is at least one server that has one of the given groups + DistributedSystem distributedSystem = + executionContext.getCache().getDistributionManager().getSystem(); + boolean foundMatch = false; + for (String group : groupList) { + if (distributedSystem.getGroupMembers(group).size() > 0) { + foundMatch = true; + break; + } + } + if (!foundMatch) { + return Failure.of(ClientProtocol.ErrorResponse.newBuilder() + .setError(BasicTypes.Error.newBuilder() + .setMessage("No server in groups " + groupList + " could be found to execute \"" + + request.getFunctionID() + "\"") + .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER)) + .build()); + } + return groupList; + } + + @Override + protected Object getFunctionArguments(AbstractMessage request, + ProtobufSerializationService serializationService) throws EncodingException { + return serializationService.decode(((ExecuteFunctionOnGroupRequest) request).getArguments()); + } + + @Override + protected Execution getFunctionExecutionObject(Object executionTarget) + throws InvalidExecutionContextException { + ProtocolStringList groupList = (ProtocolStringList) executionTarget; + return FunctionService.onMember(groupList.toArray(new String[0])); + } + + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService, + List<Object> results) throws EncodingException { + final ExecuteFunctionOnGroupResponse.Builder responseMessage = + ExecuteFunctionOnGroupResponse.newBuilder(); + for (Object result : results) { + responseMessage.addResults(serializationService.encode(result)); + } + return Success.of(responseMessage.build()); + } + + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService) + throws EncodingException { + return Success.of(ExecuteFunctionOnGroupResponse.newBuilder().build()); + } + +} diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java index 4519f5a..cc608c4 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnMemberRequestOperationHandler.java @@ -18,130 +18,120 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.google.protobuf.AbstractMessage; import com.google.protobuf.ProtocolStringList; import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.distributed.DistributedMember; import org.apache.geode.distributed.internal.DistributionManager; import org.apache.geode.internal.exception.InvalidExecutionContextException; -import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.Failure; -import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberRequest; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnMemberResponse; import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; import org.apache.geode.internal.protocol.protobuf.v1.Result; import org.apache.geode.internal.protocol.protobuf.v1.Success; import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException; -import org.apache.geode.internal.protocol.protobuf.v1.state.exception.ConnectionStateException; -import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.security.NotAuthorizedException; -public class ExecuteFunctionOnMemberRequestOperationHandler implements - ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnMemberRequest, FunctionAPI.ExecuteFunctionOnMemberResponse> { +public class ExecuteFunctionOnMemberRequestOperationHandler + extends AbstractFunctionRequestOperationHandler implements + ProtobufOperationHandler<ExecuteFunctionOnMemberRequest, ExecuteFunctionOnMemberResponse> { + + @Override - public Result<FunctionAPI.ExecuteFunctionOnMemberResponse, ClientProtocol.ErrorResponse> process( - ProtobufSerializationService serializationService, - FunctionAPI.ExecuteFunctionOnMemberRequest request, + public Result<ExecuteFunctionOnMemberResponse, ClientProtocol.ErrorResponse> process( + ProtobufSerializationService serializationService, ExecuteFunctionOnMemberRequest request, MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException { - final String functionID = request.getFunctionID(); + return (Result<ExecuteFunctionOnMemberResponse, ClientProtocol.ErrorResponse>) super.process( + serializationService, request, messageExecutionContext); + } - final Function<?> function = FunctionService.getFunction(functionID); - if (function == null) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST) - .setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED - .toLocalizedString(functionID)) - .build()) - .build()); - } + @Override + protected Set<?> parseFilter(ProtobufSerializationService serializationService, + AbstractMessage request) throws EncodingException { + // filters are not allowed on functions not associated with regions + return null; + } + + @Override + protected String getFunctionID(AbstractMessage request) { + return ((ExecuteFunctionOnMemberRequest) request).getFunctionID(); + } - final SecurityService securityService = messageExecutionContext.getCache().getSecurityService(); + @Override + protected String getRegionName(AbstractMessage request) { + // region name is not allowed in onMember invocation + return null; + } - try { - // check security for function. - final String noRegion = null; - function.getRequiredPermissions(noRegion).forEach(securityService::authorize); - } catch (NotAuthorizedException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder() - .setMessage("Authorization failed for function \"" + functionID + "\"") - .setErrorCode(BasicTypes.ErrorCode.AUTHORIZATION_FAILED)) - .build()); - } + @Override + protected Object getExecutionTarget(AbstractMessage abstractRequest, String regionName, + MessageExecutionContext executionContext) throws InvalidExecutionContextException { + ExecuteFunctionOnMemberRequest request = (ExecuteFunctionOnMemberRequest) abstractRequest; ProtocolStringList memberNameList = request.getMemberNameList(); Set<DistributedMember> memberIds = new HashSet<>(memberNameList.size()); - DistributionManager distributionManager = - messageExecutionContext.getCache().getDistributionManager(); + DistributionManager distributionManager = executionContext.getCache().getDistributionManager(); for (String name : memberNameList) { DistributedMember member = distributionManager.getMemberWithName(name); if (member == null) { return Failure.of(ClientProtocol.ErrorResponse.newBuilder() .setError(BasicTypes.Error.newBuilder() - .setMessage("Member " + name + " not found to execute \"" + functionID + "\"") + .setMessage( + "Member " + name + " not found to execute \"" + request.getFunctionID() + "\"") .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER)) .build()); } memberIds.add(member); } - if (memberIds.isEmpty()) { return Failure.of(ClientProtocol.ErrorResponse.newBuilder() .setError(BasicTypes.Error.newBuilder() - .setMessage("No members found to execute \"" + functionID + "\"") + .setMessage("No members found to execute \"" + request.getFunctionID() + "\"") .setErrorCode(BasicTypes.ErrorCode.NO_AVAILABLE_SERVER)) .build()); } + return memberIds; + } - try { - Execution execution; - if (memberIds.size() == 1) { - execution = FunctionService.onMember(memberIds.iterator().next()); - } else { - execution = FunctionService.onMembers(memberIds); - } - - final Object arguments = serializationService.decode(request.getArguments()); - - if (arguments != null) { - execution = execution.setArguments(arguments); - } - - final ResultCollector<Object, List<Object>> resultCollector = execution.execute(functionID); + @Override + protected Object getFunctionArguments(AbstractMessage request, + ProtobufSerializationService serializationService) throws EncodingException { + return serializationService.decode(((ExecuteFunctionOnMemberRequest) request).getArguments()); + } - if (function.hasResult()) { - List<Object> results = resultCollector.getResult(); + @Override + protected Execution getFunctionExecutionObject(Object executionTarget) { + Set<DistributedMember> memberIds = (Set<DistributedMember>) executionTarget; + if (memberIds.size() == 1) { + return FunctionService.onMember(memberIds.iterator().next()); + } else { + return FunctionService.onMembers(memberIds); + } + } - final FunctionAPI.ExecuteFunctionOnMemberResponse.Builder responseMessage = - FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder(); - for (Object result : results) { - responseMessage.addResults(serializationService.encode(result)); - } - return Success.of(responseMessage.build()); - } else { - // This is fire and forget. - return Success.of(FunctionAPI.ExecuteFunctionOnMemberResponse.newBuilder().build()); - } - } catch (FunctionException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR) - .setMessage("Function execution failed: " + ex.toString())) - .build()); - } catch (EncodingException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR) - .setMessage("Encoding failed: " + ex.toString())) - .build()); + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService, + List<Object> results) throws EncodingException { + final ExecuteFunctionOnMemberResponse.Builder responseMessage = + ExecuteFunctionOnMemberResponse.newBuilder(); + for (Object result : results) { + responseMessage.addResults(serializationService.encode(result)); } + return Success.of(responseMessage.build()); + } + + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService) + throws EncodingException { + return Success.of(ExecuteFunctionOnMemberResponse.newBuilder().build()); } } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java index 17e9f09..b0798ce 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnRegionRequestOperationHandler.java @@ -18,116 +18,98 @@ import java.util.HashSet; import java.util.List; import java.util.Set; +import com.google.protobuf.AbstractMessage; + import org.apache.geode.cache.Region; import org.apache.geode.cache.execute.Execution; -import org.apache.geode.cache.execute.Function; -import org.apache.geode.cache.execute.FunctionException; import org.apache.geode.cache.execute.FunctionService; -import org.apache.geode.cache.execute.ResultCollector; import org.apache.geode.internal.exception.InvalidExecutionContextException; -import org.apache.geode.internal.i18n.LocalizedStrings; import org.apache.geode.internal.protocol.operations.ProtobufOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.BasicTypes; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.Failure; -import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionRequest; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI.ExecuteFunctionOnRegionResponse; import org.apache.geode.internal.protocol.protobuf.v1.MessageExecutionContext; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; import org.apache.geode.internal.protocol.protobuf.v1.Result; import org.apache.geode.internal.protocol.protobuf.v1.Success; import org.apache.geode.internal.protocol.protobuf.v1.serialization.exception.EncodingException; -import org.apache.geode.internal.security.SecurityService; -import org.apache.geode.security.NotAuthorizedException; -public class ExecuteFunctionOnRegionRequestOperationHandler implements - ProtobufOperationHandler<FunctionAPI.ExecuteFunctionOnRegionRequest, FunctionAPI.ExecuteFunctionOnRegionResponse> { +public class ExecuteFunctionOnRegionRequestOperationHandler + extends AbstractFunctionRequestOperationHandler implements + ProtobufOperationHandler<ExecuteFunctionOnRegionRequest, ExecuteFunctionOnRegionResponse> { + @Override - public Result<FunctionAPI.ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse> process( - ProtobufSerializationService serializationService, - FunctionAPI.ExecuteFunctionOnRegionRequest request, + public Result<ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse> process( + ProtobufSerializationService serializationService, ExecuteFunctionOnRegionRequest request, MessageExecutionContext messageExecutionContext) throws InvalidExecutionContextException { - final String functionID = request.getFunctionID(); - final String regionName = request.getRegion(); + return (Result<ExecuteFunctionOnRegionResponse, ClientProtocol.ErrorResponse>) super.process( + serializationService, request, messageExecutionContext); + } - final Function<?> function = FunctionService.getFunction(functionID); - if (function == null) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST) - .setMessage(LocalizedStrings.ExecuteFunction_FUNCTION_NAMED_0_IS_NOT_REGISTERED - .toLocalizedString(functionID)) - .build()) - .build()); + protected Set<Object> parseFilter(ProtobufSerializationService serializationService, + AbstractMessage request) throws EncodingException { + List<BasicTypes.EncodedValue> encodedFilter = + ((ExecuteFunctionOnRegionRequest) request).getKeyFilterList(); + Set<Object> filter = new HashSet<>(); + + for (BasicTypes.EncodedValue filterKey : encodedFilter) { + filter.add(serializationService.decode(filterKey)); } + return filter; + } - final Region<Object, Object> region = messageExecutionContext.getCache().getRegion(regionName); + @Override + protected String getFunctionID(AbstractMessage request) { + return ((ExecuteFunctionOnRegionRequest) request).getFunctionID(); + } + + @Override + protected String getRegionName(AbstractMessage request) { + return ((ExecuteFunctionOnRegionRequest) request).getRegion(); + } + + @Override + protected Object getExecutionTarget(AbstractMessage request, String regionName, + MessageExecutionContext executionContext) throws InvalidExecutionContextException { + final Region<Object, Object> region = executionContext.getCache().getRegion(regionName); if (region == null) { return Failure.of(ClientProtocol.ErrorResponse.newBuilder() .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.INVALID_REQUEST) .setMessage("Region \"" + regionName + "\" not found")) .build()); } + return region; + } - final SecurityService securityService = messageExecutionContext.getCache().getSecurityService(); - - try { - // check security for function. - function.getRequiredPermissions(regionName).forEach(securityService::authorize); - } catch (NotAuthorizedException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder() - .setMessage("Authorization failed for function \"" + functionID + "\"") - .setErrorCode(BasicTypes.ErrorCode.AUTHORIZATION_FAILED)) - .build()); - } - - try { - Execution execution = FunctionService.onRegion(region); - - final Object arguments = serializationService.decode(request.getArguments()); - - if (arguments != null) { - execution = execution.setArguments(arguments); - } - - execution = execution.withFilter(parseFilter(serializationService, request)); - - final ResultCollector<Object, List<Object>> resultCollector = execution.execute(functionID); + @Override + protected Object getFunctionArguments(AbstractMessage request, + ProtobufSerializationService serializationService) throws EncodingException { + return serializationService.decode(((ExecuteFunctionOnRegionRequest) request).getArguments()); + } - if (function.hasResult()) { - List<Object> results = resultCollector.getResult(); + @Override + protected Execution getFunctionExecutionObject(Object executionTarget) { + return FunctionService.onRegion((Region) executionTarget); + } - final FunctionAPI.ExecuteFunctionOnRegionResponse.Builder responseMessage = - FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder(); - for (Object result : results) { - responseMessage.addResults(serializationService.encode(result)); - } - return Success.of(responseMessage.build()); - } else { - // This is fire and forget. - return Success.of(FunctionAPI.ExecuteFunctionOnRegionResponse.newBuilder().build()); - } - } catch (FunctionException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR) - .setMessage("Function execution failed: " + ex.toString())) - .build()); - } catch (EncodingException ex) { - return Failure.of(ClientProtocol.ErrorResponse.newBuilder() - .setError(BasicTypes.Error.newBuilder().setErrorCode(BasicTypes.ErrorCode.SERVER_ERROR) - .setMessage("Encoding failed: " + ex.toString())) - .build()); + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService, + List<Object> results) throws EncodingException { + final ExecuteFunctionOnRegionResponse.Builder responseMessage = + ExecuteFunctionOnRegionResponse.newBuilder(); + for (Object result : results) { + responseMessage.addResults(serializationService.encode(result)); } + return Success.of(responseMessage.build()); } - private Set<Object> parseFilter(ProtobufSerializationService serializationService, - FunctionAPI.ExecuteFunctionOnRegionRequest request) throws EncodingException { - List<BasicTypes.EncodedValue> encodedFilter = request.getKeyFilterList(); - Set<Object> filter = new HashSet<>(); - - for (BasicTypes.EncodedValue filterKey : encodedFilter) { - filter.add(serializationService.decode(filterKey)); - } - return filter; + @Override + protected Result buildResultMessage(ProtobufSerializationService serializationService) + throws EncodingException { + return Success.of(ExecuteFunctionOnRegionResponse.newBuilder().build()); } + } diff --git a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java index f9bbba1..6a88e41 100644 --- a/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java +++ b/geode-protobuf/src/main/java/org/apache/geode/internal/protocol/protobuf/v1/registry/ProtobufOperationContextRegistry.java @@ -21,6 +21,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.geode.annotations.Experimental; import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; import org.apache.geode.internal.protocol.protobuf.v1.ProtobufOperationContext; +import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnGroupRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnMemberRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.ExecuteFunctionOnRegionRequestOperationHandler; import org.apache.geode.internal.protocol.protobuf.v1.operations.GetAllRequestOperationHandler; @@ -128,5 +129,14 @@ public class ProtobufOperationContextRegistry { // Resource permissions get handled per-function, since they have varying permission // requirements. new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL))); + + operationContexts.put(ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPREQUEST, + new ProtobufOperationContext<>(ClientProtocol.Message::getExecuteFunctionOnGroupRequest, + new ExecuteFunctionOnGroupRequestOperationHandler(), + opsResp -> ClientProtocol.Message.newBuilder() + .setExecuteFunctionOnGroupResponse(opsResp), + // Resource permissions get handled per-function, since they have varying permission + // requirements. + new ResourcePermission(ResourcePermission.NULL, ResourcePermission.NULL))); } } diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java new file mode 100644 index 0000000..9746af8 --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/ExecuteFunctionOnGroupIntegrationTest.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1; + +import static org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.net.Socket; +import java.util.Arrays; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.awaitility.Awaitility; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.contrib.java.lang.system.RestoreSystemProperties; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.Cache; +import org.apache.geode.cache.CacheFactory; +import org.apache.geode.cache.DataPolicy; +import org.apache.geode.cache.RegionFactory; +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionException; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.cache.server.CacheServer; +import org.apache.geode.distributed.ConfigurationProperties; +import org.apache.geode.internal.AvailablePortHelper; +import org.apache.geode.management.internal.security.ResourceConstants; +import org.apache.geode.security.ResourcePermission; +import org.apache.geode.security.SecurityManager; +import org.apache.geode.test.junit.categories.IntegrationTest; + +@Category(IntegrationTest.class) +public class ExecuteFunctionOnGroupIntegrationTest { + private static final String TEST_REGION = "testRegion"; + private static final String TEST_FUNCTION_ID = "testFunction"; + private static final String SECURITY_PRINCIPAL = "principle"; + private static final String COMPUTE_SERVERS = "computeServers"; + private ProtobufSerializationService serializationService; + private Socket socket; + private Cache cache; + private SecurityManager securityManager; + + @Rule + public RestoreSystemProperties restoreSystemProperties = new RestoreSystemProperties(); + + @Before + public void setUp() throws Exception { + CacheFactory cacheFactory = new CacheFactory(new Properties()); + cacheFactory.set(ConfigurationProperties.MCAST_PORT, "0"); + cacheFactory.set(ConfigurationProperties.ENABLE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.USE_CLUSTER_CONFIGURATION, "false"); + cacheFactory.set(ConfigurationProperties.GROUPS, COMPUTE_SERVERS); + + securityManager = mock(SecurityManager.class); + cacheFactory.setSecurityManager(securityManager); + when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL); + when(securityManager.authorize(eq(SECURITY_PRINCIPAL), any())).thenReturn(true); + + cache = cacheFactory.create(); + + CacheServer cacheServer = cache.addCacheServer(); + int cacheServerPort = AvailablePortHelper.getRandomAvailableTCPPort(); + cacheServer.setPort(cacheServerPort); + cacheServer.start(); + + RegionFactory<Object, Object> regionFactory = cache.createRegionFactory(); + regionFactory.setDataPolicy(DataPolicy.PARTITION); + regionFactory.create(TEST_REGION); + + + System.setProperty("geode.feature-protobuf-protocol", "true"); + + socket = new Socket("localhost", cacheServerPort); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(socket::isConnected); + + MessageUtil.performAndVerifyHandshake(socket); + + serializationService = new ProtobufSerializationService(); + } + + private static class TestFunction<T> implements Function<T> { + private final java.util.function.Function<FunctionContext<T>, Object> executeFunction; + // non-null iff function has been executed. + private final AtomicReference<FunctionContext> context = new AtomicReference<>(); + private final boolean hasResult; + + TestFunction() { + this.executeFunction = arg -> null; + this.hasResult = true; + } + + TestFunction(java.util.function.Function<FunctionContext<T>, Object> executeFunction, + boolean hasResult) { + this.executeFunction = executeFunction; + this.hasResult = hasResult; + } + + @Override + public String getId() { + return TEST_FUNCTION_ID; + } + + @Override + public void execute(FunctionContext<T> context) { + this.context.set(context); + context.getResultSender().lastResult(executeFunction.apply(context)); + } + + @Override + public boolean hasResult() { + return hasResult; + } + + @Override + public boolean isHA() { + // set for testing; we shouldn't need to test with isHA true because that's function service + // details. + return false; + } + + FunctionContext getContext() { + return context.get(); + } + } + + @After + public void tearDown() { + cache.close(); + try { + socket.close(); + } catch (IOException ignore) { + } + FunctionService.unregisterFunction(TEST_FUNCTION_ID); + } + + @Test + public void handlesNoResultFunction() throws IOException { + TestFunction<Object> testFunction = new TestFunction<>(context -> null, false); + FunctionService.registerFunction(testFunction); + final ClientProtocol.Message responseMessage = authenticateAndSendMessage(); + + assertNotNull(responseMessage); + assertEquals(EXECUTEFUNCTIONONGROUPRESPONSE, responseMessage.getMessageTypeCase()); + final FunctionAPI.ExecuteFunctionOnGroupResponse executeFunctionOnGroupResponse = + responseMessage.getExecuteFunctionOnGroupResponse(); + + assertEquals(0, executeFunctionOnGroupResponse.getResultsCount()); + + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> testFunction.getContext() != null); + } + + @Test + public void handlesResultFunction() throws Exception { + final TestFunction<Object> testFunction = + new TestFunction<>(functionContext -> Integer.valueOf(22), true); + FunctionService.registerFunction(testFunction); + final ClientProtocol.Message responseMessage = authenticateAndSendMessage(); + + final FunctionAPI.ExecuteFunctionOnGroupResponse executeFunctionOnGroupResponse = + getFunctionResponse(responseMessage); + + assertEquals(1, executeFunctionOnGroupResponse.getResultsCount()); + + final Object responseValue = + serializationService.decode(executeFunctionOnGroupResponse.getResults(0)); + assertTrue(responseValue instanceof Integer); + assertEquals(22, responseValue); + } + + @Test + public void handlesException() throws IOException { + final TestFunction<Object> testFunction = new TestFunction<>(context -> { + throw new FunctionException(); + }, true); + FunctionService.registerFunction(testFunction); + + final ClientProtocol.Message message = authenticateAndSendMessage(); + + assertEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE, + message.getMessageTypeCase()); + final BasicTypes.Error error = message.getErrorResponse().getError(); + assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, error.getErrorCode()); + } + + @Test + public void handlesObjectThatCannotBeDecoded() throws IOException { + final TestFunction<Object> testFunction = new TestFunction<>(context -> { + return new Object(); + }, true); + FunctionService.registerFunction(testFunction); + + final ClientProtocol.Message message = authenticateAndSendMessage(); + + + assertEquals(ClientProtocol.Message.MessageTypeCase.ERRORRESPONSE, + message.getMessageTypeCase()); + final BasicTypes.Error error = message.getErrorResponse().getError(); + + assertEquals(BasicTypes.ErrorCode.SERVER_ERROR, error.getErrorCode()); + + } + + @Test + public void handlesNullReturnValues() throws Exception { + final TestFunction<Object> testFunction = new TestFunction<>(functionContext -> null, true); + FunctionService.registerFunction(testFunction); + final ClientProtocol.Message responseMessage = authenticateAndSendMessage(); + + final FunctionAPI.ExecuteFunctionOnGroupResponse executeFunctionOnGroupResponse = + getFunctionResponse(responseMessage); + + assertEquals(1, executeFunctionOnGroupResponse.getResultsCount()); + + final Object responseValue = + serializationService.decode(executeFunctionOnGroupResponse.getResults(0)); + assertNull(responseValue); + } + + @Test + public void argumentsArePassedToFunction() throws Exception { + final TestFunction<Object> testFunction = + new TestFunction<>(functionContext -> functionContext.getArguments(), true); + FunctionService.registerFunction(testFunction); + ClientProtocol.Message.Builder message = createRequestMessageBuilder( + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID) + .addGroupName(COMPUTE_SERVERS).setArguments(serializationService.encode("hello"))); + + authenticateWithServer(); + final ClientProtocol.Message responseMessage = writeMessage(message.build()); + + FunctionAPI.ExecuteFunctionOnGroupResponse response = getFunctionResponse(responseMessage); + + assertEquals("hello", serializationService.decode(response.getResults(0))); + } + + @Test + public void permissionsAreRequiredToExecute() throws IOException { + final ResourcePermission requiredPermission = new ResourcePermission( + ResourcePermission.Resource.DATA, ResourcePermission.Operation.MANAGE); + + final TestFunction<Object> testFunction = new TestFunction<Object>() { + @Override + public Collection<ResourcePermission> getRequiredPermissions(String regionName) { + return Arrays.asList(requiredPermission); + } + }; + FunctionService.registerFunction(testFunction); + + when(securityManager.authenticate(any())).thenReturn(SECURITY_PRINCIPAL); + + when(securityManager.authorize(eq(SECURITY_PRINCIPAL), eq(requiredPermission))) + .thenReturn(false); + + final ClientProtocol.Message message = authenticateAndSendMessage(); + assertEquals("message=" + message, BasicTypes.ErrorCode.AUTHORIZATION_FAILED, + message.getErrorResponse().getError().getErrorCode()); + + verify(securityManager).authorize(eq(SECURITY_PRINCIPAL), eq(requiredPermission)); + } + + private FunctionAPI.ExecuteFunctionOnGroupResponse getFunctionResponse( + ClientProtocol.Message responseMessage) { + assertEquals(responseMessage.toString(), + ClientProtocol.Message.MessageTypeCase.EXECUTEFUNCTIONONGROUPRESPONSE, + responseMessage.getMessageTypeCase()); + return responseMessage.getExecuteFunctionOnGroupResponse(); + } + + private void authenticateWithServer() throws IOException { + ClientProtocol.Message.Builder request = ClientProtocol.Message.newBuilder() + .setAuthenticationRequest(ConnectionAPI.AuthenticationRequest.newBuilder() + .putCredentials(ResourceConstants.USER_NAME, "someuser") + .putCredentials(ResourceConstants.PASSWORD, "somepassword")); + + ClientProtocol.Message response = writeMessage(request.build()); + assertEquals(response.toString(), true, + response.getAuthenticationResponse().getAuthenticated()); + } + + + private ClientProtocol.Message authenticateAndSendMessage() throws IOException { + authenticateWithServer(); + + final ClientProtocol.Message request = + createRequestMessageBuilder(FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder() + .setFunctionID(TEST_FUNCTION_ID).addGroupName(COMPUTE_SERVERS)).build(); + + return writeMessage(request); + } + + + private ClientProtocol.Message.Builder createRequestMessageBuilder( + FunctionAPI.ExecuteFunctionOnGroupRequest.Builder functionRequest) { + return ClientProtocol.Message.newBuilder().setExecuteFunctionOnGroupRequest(functionRequest); + } + + private ClientProtocol.Message writeMessage(ClientProtocol.Message request) throws IOException { + request.writeDelimitedTo(socket.getOutputStream()); + + return ClientProtocol.Message.parseDelimitedFrom(socket.getInputStream()); + } + +} diff --git a/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java new file mode 100644 index 0000000..6449c3d --- /dev/null +++ b/geode-protobuf/src/test/java/org/apache/geode/internal/protocol/protobuf/v1/operations/ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.protocol.protobuf.v1.operations; + +import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.AUTHORIZATION_FAILED; +import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.INVALID_REQUEST; +import static org.apache.geode.internal.protocol.protobuf.v1.BasicTypes.ErrorCode.NO_AVAILABLE_SERVER; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.geode.cache.execute.Function; +import org.apache.geode.cache.execute.FunctionContext; +import org.apache.geode.cache.execute.FunctionService; +import org.apache.geode.distributed.DistributedMember; +import org.apache.geode.distributed.DistributedSystemDisconnectedException; +import org.apache.geode.distributed.internal.DistributionManager; +import org.apache.geode.distributed.internal.InternalDistributedSystem; +import org.apache.geode.distributed.internal.membership.InternalDistributedMember; +import org.apache.geode.internal.cache.InternalCache; +import org.apache.geode.internal.protocol.protobuf.statistics.ProtobufClientStatistics; +import org.apache.geode.internal.protocol.protobuf.v1.ClientProtocol; +import org.apache.geode.internal.protocol.protobuf.v1.Failure; +import org.apache.geode.internal.protocol.protobuf.v1.FunctionAPI; +import org.apache.geode.internal.protocol.protobuf.v1.ProtobufSerializationService; +import org.apache.geode.internal.protocol.protobuf.v1.Result; +import org.apache.geode.internal.protocol.protobuf.v1.ServerMessageExecutionContext; +import org.apache.geode.internal.security.SecurityService; +import org.apache.geode.management.internal.security.ResourcePermissions; +import org.apache.geode.security.NotAuthorizedException; +import org.apache.geode.test.junit.categories.UnitTest; + +@Category(UnitTest.class) +public class ExecuteFunctionOnGroupRequestOperationHandlerJUnitTest { + private static final String TEST_GROUP1 = "group1"; + private static final String TEST_GROUP2 = "group2"; + private static final String TEST_FUNCTION_ID = "testFunction"; + public static final String NOT_A_GROUP = "notAGroup"; + private InternalCache cacheStub; + private DistributionManager distributionManager; + private ExecuteFunctionOnGroupRequestOperationHandler operationHandler; + private ProtobufSerializationService serializationService; + private TestFunction function; + private InternalDistributedSystem distributedSystem; + + private static class TestFunction implements Function { + // non-null iff function has been executed. + private AtomicReference<FunctionContext> context = new AtomicReference<>(); + + @Override + public String getId() { + return TEST_FUNCTION_ID; + } + + @Override + public void execute(FunctionContext context) { + this.context.set(context); + context.getResultSender().lastResult("result"); + } + + FunctionContext getContext() { + return context.get(); + } + } + + @Before + public void setUp() throws Exception { + cacheStub = mock(InternalCache.class); + serializationService = new ProtobufSerializationService(); + when(cacheStub.getSecurityService()).thenReturn(mock(SecurityService.class)); + + distributionManager = mock(DistributionManager.class); + when(cacheStub.getDistributionManager()).thenReturn(distributionManager); + + distributedSystem = mock(InternalDistributedSystem.class); + when(distributionManager.getSystem()).thenReturn(distributedSystem); + + InternalDistributedMember localhost = new InternalDistributedMember("localhost", 0); + Set<DistributedMember> members = new HashSet<>(); + members.add(localhost); + when(distributedSystem.getGroupMembers(TEST_GROUP1)).thenReturn(members); + + operationHandler = new ExecuteFunctionOnGroupRequestOperationHandler(); + + function = new TestFunction(); + FunctionService.registerFunction(function); + } + + @After + public void tearDown() throws Exception { + FunctionService.unregisterFunction(TEST_FUNCTION_ID); + } + + @Test + public void failsOnUnknownGroup() throws Exception { + final FunctionAPI.ExecuteFunctionOnGroupRequest request = + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID) + .addGroupName(NOT_A_GROUP).build(); + + final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> result = + operationHandler.process(serializationService, request, mockedMessageExecutionContext()); + + assertTrue(result instanceof Failure); + assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode()); + } + + @Test + public void failsIfNoGroupSpecified() throws Exception { + final FunctionAPI.ExecuteFunctionOnGroupRequest request = + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID) + .build(); + + final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> result = + operationHandler.process(serializationService, request, mockedMessageExecutionContext()); + + assertTrue(result instanceof Failure); + assertEquals(NO_AVAILABLE_SERVER, result.getErrorMessage().getError().getErrorCode()); + } + + @Test(expected = DistributedSystemDisconnectedException.class) + public void succeedsWithValidMembers() throws Exception { + when(distributionManager.getMemberWithName(any(String.class))).thenReturn( + new InternalDistributedMember("localhost", 0), + new InternalDistributedMember("localhost", 1), null); + + final FunctionAPI.ExecuteFunctionOnGroupRequest request = + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID) + .addGroupName(TEST_GROUP1).addGroupName(TEST_GROUP2).build(); + + final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> result = + operationHandler.process(serializationService, request, mockedMessageExecutionContext()); + + // unfortunately FunctionService fishes for a DistributedSystem and throws an exception + // if it can't find one. It uses a static method on InternalDistributedSystem, so no + // mocking is possible. If the test throws DistributedSystemDisconnectedException it + // means that the operation handler got to the point of trying get an execution + // context + } + + @Test + public void requiresPermissions() throws Exception { + final SecurityService securityService = mock(SecurityService.class); + doThrow(new NotAuthorizedException("we should catch this")).when(securityService) + .authorize(ResourcePermissions.DATA_WRITE); + when(cacheStub.getSecurityService()).thenReturn(securityService); + + final FunctionAPI.ExecuteFunctionOnGroupRequest request = + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder().setFunctionID(TEST_FUNCTION_ID) + .addGroupName(TEST_GROUP1).build(); + + final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> result = + operationHandler.process(serializationService, request, mockedMessageExecutionContext()); + + assertTrue(result instanceof Failure); + + assertEquals(AUTHORIZATION_FAILED, result.getErrorMessage().getError().getErrorCode()); + + } + + @Test + public void functionNotFound() throws Exception { + final FunctionAPI.ExecuteFunctionOnGroupRequest request = + FunctionAPI.ExecuteFunctionOnGroupRequest.newBuilder() + .setFunctionID("I am not a function, I am a human").addGroupName(TEST_GROUP1).build(); + + final Result<FunctionAPI.ExecuteFunctionOnGroupResponse, ClientProtocol.ErrorResponse> result = + operationHandler.process(serializationService, request, mockedMessageExecutionContext()); + + final ClientProtocol.ErrorResponse errorMessage = result.getErrorMessage(); + + assertEquals(INVALID_REQUEST, errorMessage.getError().getErrorCode()); + } + + private ServerMessageExecutionContext mockedMessageExecutionContext() { + return new ServerMessageExecutionContext(cacheStub, mock(ProtobufClientStatistics.class), null); + } +} -- To stop receiving notification emails like this one, please contact bschucha...@apache.org.