This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new d459997 AMBARI-24972. Improve Add Service request validation. (#2670) d459997 is described below commit d4599975382e053b7a614fa2aa5531383f8ee98f Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com> AuthorDate: Sat Dec 1 00:26:32 2018 +0100 AMBARI-24972. Improve Add Service request validation. (#2670) --- .../server/controller/AddServiceRequest.java | 15 +- .../ambari/server/controller/ControllerModule.java | 2 + .../ambari/server/controller/internal/Stack.java | 5 + .../apache/ambari/server/state/ConfigHelper.java | 13 +- .../ambari/server/topology/BlueprintFactory.java | 23 -- .../ambari/server/topology/Configuration.java | 17 + .../server/topology/DefaultStackFactory.java | 34 ++ .../ambari/server/topology/StackFactory.java | 26 ++ .../addservice/AddServiceOrchestrator.java | 104 +---- .../topology/addservice/RequestValidator.java | 281 +++++++++++++ .../addservice/RequestValidatorFactory.java | 31 ++ .../server/controller/AddServiceRequestTest.java | 3 +- .../server/topology/BlueprintFactoryTest.java | 4 +- .../topology/addservice/RequestValidatorTest.java | 433 +++++++++++++++++++++ 14 files changed, 862 insertions(+), 129 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java index 9c0ccb5..4a88a6d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/AddServiceRequest.java @@ -18,7 +18,6 @@ package org.apache.ambari.server.controller; -import static com.google.common.base.Preconditions.checkArgument; import static java.util.Collections.emptySet; import static java.util.stream.Collectors.toMap; import static org.apache.ambari.server.controller.internal.BaseClusterRequest.PROVISION_ACTION_PROPERTY; @@ -63,7 +62,7 @@ import io.swagger.annotations.ApiModelProperty; */ @ApiModel @JsonInclude(JsonInclude.Include.NON_EMPTY) -public final class AddServiceRequest { +public class AddServiceRequest { static final String STACK_NAME = "stack_name"; static final String STACK_VERSION = "stack_version"; @@ -130,8 +129,6 @@ public final class AddServiceRequest { this.credentials = null != credentials ? credentials.stream().collect(toMap(Credential::getAlias, Function.identity())) : ImmutableMap.of(); - - checkArgument(!this.services.isEmpty() || !this.components.isEmpty(), "Either services or components must be specified"); } // TODO move to JsonUtils -- pick part of 0252c08d86f @@ -278,6 +275,11 @@ public final class AddServiceRequest { public int hashCode() { return Objects.hash(name, fqdn); } + + @Override + public String toString() { + return name; + } } @ApiModel @@ -315,5 +317,10 @@ public final class AddServiceRequest { public int hashCode() { return Objects.hash(name); } + + @Override + public String toString() { + return name; + } } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java index 403565a..0f530cb 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/ControllerModule.java @@ -520,9 +520,11 @@ public class ControllerModule extends AbstractModule { install(new FactoryModuleBuilder().build(MetricPropertyProviderFactory.class)); install(new FactoryModuleBuilder().build(UpgradeContextFactory.class)); install(new FactoryModuleBuilder().build(MpackManagerFactory.class)); + install(new FactoryModuleBuilder().build(org.apache.ambari.server.topology.addservice.RequestValidatorFactory.class)); bind(HostRoleCommandFactory.class).to(HostRoleCommandFactoryImpl.class); bind(SecurityHelper.class).toInstance(SecurityHelperImpl.getInstance()); + bind(org.apache.ambari.server.topology.StackFactory.class).to(org.apache.ambari.server.topology.DefaultStackFactory.class); bind(BlueprintFactory.class); install(new FactoryModuleBuilder().implement(AmbariEvent.class, Names.named("userCreated"), UserCreatedEvent.class).build(AmbariEventFactory.class)); diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java index b4fec0c..c83999b 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java @@ -210,6 +210,11 @@ public class Stack { return new StackId(name, version); } + @Override + public String toString() { + return "stack " + getStackId(); + } + Map<DependencyInfo, String> getDependencyConditionalServiceMap() { return dependencyConditionalServiceMap; } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java index c685c79..6d622ef 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/state/ConfigHelper.java @@ -59,6 +59,7 @@ import org.apache.commons.collections.MapUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.math.NumberUtils; import org.apache.commons.lang3.StringEscapeUtils; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -2147,7 +2148,6 @@ public class ConfigHelper { * @param cluster the cluster * @param hostname a hostname * @return a map of the existing configurations - * @throws AmbariException */ public Map<String, Map<String, String>> calculateExistingConfigurations(AmbariManagementController ambariManagementController, Cluster cluster, String hostname) throws AmbariException { // For a configuration type, both tag and an actual configuration can be stored @@ -2179,4 +2179,15 @@ public class ConfigHelper { return configurations; } + /** + * Determines the existing configurations for the cluster, both properties and attributes. + */ + public Pair<Map<String, Map<String, String>>, Map<String, Map<String, Map<String, String>>>> calculateExistingConfigs(Cluster cluster) throws AmbariException { + Map<String, Map<String, String>> desiredConfigTags = getEffectiveDesiredTags(cluster, null); + return Pair.of( + getEffectiveConfigProperties(cluster, desiredConfigTags), + getEffectiveConfigAttributes(cluster, desiredConfigTags) + ); + } + } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintFactory.java index 054979f..aaa4087 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintFactory.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/BlueprintFactory.java @@ -27,7 +27,6 @@ import java.util.Set; import org.apache.ambari.server.AmbariException; import org.apache.ambari.server.ObjectNotFoundException; -import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.AmbariServer; import org.apache.ambari.server.controller.RootComponent; import org.apache.ambari.server.controller.internal.ProvisionAction; @@ -221,26 +220,4 @@ public class BlueprintFactory { blueprintDAO = dao; } - /** - * Internal interface used to abstract out the process of creating the Stack object. - * - * This is used to simplify unit testing, since a new Factory can be provided to - * simulate various Stack or error conditions. - */ - interface StackFactory { - Stack createStack(String stackName, String stackVersion, AmbariManagementController managementController) throws AmbariException; - } - - /** - * Default implementation of StackFactory. - * - * Calls the Stack constructor to create the Stack instance. - * - */ - private static class DefaultStackFactory implements StackFactory { - @Override - public Stack createStack(String stackName, String stackVersion, AmbariManagementController managementController) throws AmbariException { - return new Stack(stackName, stackVersion, managementController); - } - } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/Configuration.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/Configuration.java index 8363716..fae5232 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/Configuration.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/Configuration.java @@ -25,6 +25,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import org.apache.commons.lang3.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -472,4 +473,20 @@ public class Configuration { parentConfiguration.removeConfigType(configType); } } + + /** + * Create a new {@code Configuration} based on a pair of maps. + * (This is just a convenience method to be able to avoid local variables in a few places.) + */ + public static Configuration of(Pair<Map<String, Map<String, String>>, Map<String, Map<String, Map<String, String>>>> propertiesAndAttributes) { + return new Configuration(propertiesAndAttributes.getLeft(), propertiesAndAttributes.getRight()); + } + + /** + * @return this configuration's properties and attributes as a pair of maps, + * in order to be able to pass around more easily without polluting non-topology code with the Configuration object + */ + public Pair<Map<String, Map<String, String>>, Map<String, Map<String, Map<String, String>>>> asPair() { + return Pair.of(properties, attributes); + } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/DefaultStackFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/DefaultStackFactory.java new file mode 100644 index 0000000..1ece0cd --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/DefaultStackFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.Stack; + +/** + * Default implementation of StackFactory. + * + * Calls the Stack constructor to create the Stack instance. + */ +public class DefaultStackFactory implements StackFactory { + @Override + public Stack createStack(String stackName, String stackVersion, AmbariManagementController managementController) throws AmbariException { + return new Stack(stackName, stackVersion, managementController); + } +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/StackFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/StackFactory.java new file mode 100644 index 0000000..3a1c056 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/StackFactory.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.Stack; + +public interface StackFactory { + Stack createStack(String stackName, String stackVersion, AmbariManagementController managementController) throws AmbariException; +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java index f4bd08a..ed0b71d 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java @@ -17,11 +17,7 @@ */ package org.apache.ambari.server.topology.addservice; -import static com.google.common.base.Preconditions.checkArgument; - import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; @@ -35,14 +31,10 @@ import org.apache.ambari.server.controller.AddServiceRequest; import org.apache.ambari.server.controller.AmbariManagementController; import org.apache.ambari.server.controller.KerberosHelper; import org.apache.ambari.server.controller.RequestStatusResponse; -import org.apache.ambari.server.controller.internal.RequestStageContainer; -import org.apache.ambari.server.controller.internal.Stack; import org.apache.ambari.server.serveraction.kerberos.KerberosInvalidConfigurationException; import org.apache.ambari.server.state.Cluster; -import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.SecurityType; import org.apache.ambari.server.state.Service; -import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.State; import org.apache.ambari.server.topology.Configuration; import org.apache.ambari.server.utils.StageUtils; @@ -70,7 +62,7 @@ public class AddServiceOrchestrator { private RequestFactory requestFactory; @Inject - private ConfigHelper configHelper; + private RequestValidatorFactory requestValidatorFactory; @Inject private StackAdvisorAdapter stackAdvisorAdapter; @@ -82,7 +74,7 @@ public class AddServiceOrchestrator { AddServiceInfo requestWithLayout = recommendLayout(validatedRequest); AddServiceInfo requestWithConfig = recommendConfiguration(requestWithLayout); - createResources(requestWithConfig); + createResources(cluster, requestWithConfig); createHostTasks(requestWithConfig); return requestWithConfig.getStages().getRequestStatusResponse(); @@ -97,64 +89,10 @@ public class AddServiceOrchestrator { private AddServiceInfo validate(Cluster cluster, AddServiceRequest request) { LOG.info("Validating {}", request); - request.getSecurity().ifPresent(requestSecurity -> - checkArgument(requestSecurity.getType() == cluster.getSecurityType(), - "Security type in the request (%s), if specified, should match cluster's security type (%s)", - requestSecurity.getType(), cluster.getSecurityType() - ) - ); - - Map<String, Map<String, Set<String>>> newServices = new LinkedHashMap<>(); - - StackId stackId = new StackId(request.getStackName(), request.getStackVersion()); - Stack stack; - try { - stack = new Stack(stackId, controller); - Set<String> existingServices = cluster.getServices().keySet(); - // process service declarations - for (AddServiceRequest.Service service : request.getServices()) { - checkAndLog(!stack.getServices().contains(service.getName()), - "Unknown service %s in stack %s", service, stack.getStackId()); - newServices.computeIfAbsent(service.getName(), __ -> new HashMap<>()); - } - // process component declarations - for (AddServiceRequest.Component requestedComponent : request.getComponents()) { - String serviceName = stack.getServiceForComponent(requestedComponent.getName()); - checkAndLog( serviceName == null, - "No service found for component %s in stack %s", requestedComponent.getName(), stackId); - checkAndLog( existingServices.contains(serviceName), - "Service %s already exists in cluster %s", serviceName, cluster.getClusterName()); - - newServices.computeIfAbsent(serviceName, __ -> new HashMap<>()) - .computeIfAbsent(requestedComponent.getName(), __ -> new HashSet<>()) - .add(requestedComponent.getFqdn()); - } - } catch (AmbariException e) { - LOG.error("Stack {} not found", stackId); - throw new IllegalArgumentException(e); - } - - if (newServices.isEmpty()) { - throw new IllegalArgumentException("No new services to be added"); - } - - Configuration config = request.getConfiguration(); - Configuration clusterConfig = getClusterDesiredConfigs(cluster); - clusterConfig.setParentConfiguration(stack.getValidDefaultConfig()); - config.setParentConfiguration(clusterConfig); + RequestValidator validator = requestValidatorFactory.create(request, cluster); + validator.validate(); - RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager); - AddServiceInfo validatedRequest = new AddServiceInfo(request, cluster.getClusterName(), stack, config, stages, newServices); - stages.setRequestContext(validatedRequest.describe()); - return validatedRequest; - } - - private static void checkAndLog(boolean errorCondition, String errorMessage, Object... messageParams) { - if (errorCondition) { - String msg = String.format(errorMessage, messageParams); - LOG.error(msg); - throw new IllegalArgumentException(msg); - } + return validator.createValidServiceInfo(actionManager, requestFactory); } /** @@ -181,10 +119,9 @@ public class AddServiceOrchestrator { /** * Creates the service, component and host component resources for the request. */ - private void createResources(AddServiceInfo request) { + private void createResources(Cluster cluster, AddServiceInfo request) { LOG.info("Creating resources for {}", request); - Cluster cluster = getCluster(request.clusterName()); Set<String> existingServices = cluster.getServices().keySet(); resourceProviders.createCredentials(request); @@ -274,33 +211,4 @@ public class AddServiceOrchestrator { return serviceComponentMap; } - private Configuration getClusterDesiredConfigs(Cluster cluster) { - Map<String, Map<String, String>> desiredConfigTags = getDesiredTags(cluster); - - return new Configuration( - configHelper.getEffectiveConfigProperties(cluster, desiredConfigTags), - configHelper.getEffectiveConfigAttributes(cluster, desiredConfigTags) - ); - } - - private Map<String, Map<String, String>> getDesiredTags(Cluster cluster) { - try { - return configHelper.getEffectiveDesiredTags(cluster, null); - } catch (AmbariException e) { - String msg = String.format("Error getting tags for desired config of cluster %s", cluster.getClusterName()); - LOG.error(msg); - throw new IllegalStateException(msg, e); - } - } - - private Cluster getCluster(String clusterName) { - try { - return controller.getClusters().getCluster(clusterName); - } catch (AmbariException e) { - String msg = String.format("Cannot find cluster %s", clusterName); - LOG.error(msg); - throw new IllegalStateException(msg, e); - } - } - } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java new file mode 100644 index 0000000..3106697 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidator.java @@ -0,0 +1,281 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.addservice; + +import static java.util.stream.Collectors.toSet; + +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Function; + +import javax.inject.Inject; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.actionmanager.RequestFactory; +import org.apache.ambari.server.controller.AddServiceRequest; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.RequestStageContainer; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.topology.Configuration; +import org.apache.ambari.server.topology.StackFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Sets; +import com.google.inject.assistedinject.Assisted; + +/** + * Validates a specific {@link AddServiceRequest}. + */ +public class RequestValidator { + + private static final Logger LOG = LoggerFactory.getLogger(RequestValidator.class); + + private final AddServiceRequest request; + private final Cluster cluster; + private final AmbariManagementController controller; + private final ConfigHelper configHelper; + private final StackFactory stackFactory; + private final AtomicBoolean serviceInfoCreated = new AtomicBoolean(); + + private State state; + + @Inject + public RequestValidator( + @Assisted AddServiceRequest request, @Assisted Cluster cluster, + AmbariManagementController controller, ConfigHelper configHelper, + StackFactory stackFactory + ) { + this.state = State.INITIAL; + this.request = request; + this.cluster = cluster; + this.controller = controller; + this.configHelper = configHelper; + this.stackFactory = stackFactory; + } + + /** + * Perform validation of the request. + */ + void validate() { + validateSecurity(); + validateStack(); + validateServicesAndComponents(); + validateHosts(); + validateConfiguration(); + } + + /** + * Create an {@link AddServiceInfo} based on the validated request. + */ + AddServiceInfo createValidServiceInfo(ActionManager actionManager, RequestFactory requestFactory) { + final State state = this.state; + + checkState(state.isValid(), "The request needs to be validated first"); + checkState(!serviceInfoCreated.getAndSet(true), "Can create only one instance for each validated add service request"); + + RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager); + AddServiceInfo validatedRequest = new AddServiceInfo(request, cluster.getClusterName(), state.getStack(), state.getConfig(), stages, state.getNewServices()); + stages.setRequestContext(validatedRequest.describe()); + return validatedRequest; + } + + @VisibleForTesting + State getState() { + return state; + } + + @VisibleForTesting + void setState(State state) { + this.state = state; + } + + @VisibleForTesting + void validateSecurity() { + request.getSecurity().ifPresent(requestSecurity -> + checkArgument(requestSecurity.getType() == cluster.getSecurityType(), + "Security type in the request (%s), if specified, should match cluster's security type (%s)", + requestSecurity.getType(), cluster.getSecurityType() + ) + ); + } + + @VisibleForTesting + void validateStack() { + Optional<StackId> requestStackId = request.getStackId(); + StackId stackId = requestStackId.orElseGet(cluster::getCurrentStackVersion); + try { + Stack stack = stackFactory.createStack(stackId.getStackName(), stackId.getStackVersion(), controller); + state = state.with(stack); + } catch (AmbariException e) { + logAndThrow(requestStackId.isPresent() + ? msg -> new IllegalArgumentException(msg, e) + : IllegalStateException::new, + "Stack %s not found", stackId + ); + } + } + + @VisibleForTesting + void validateServicesAndComponents() { + Stack stack = state.getStack(); + Map<String, Map<String, Set<String>>> newServices = new LinkedHashMap<>(); + + Set<String> existingServices = cluster.getServices().keySet(); + + // process service declarations + for (AddServiceRequest.Service service : request.getServices()) { + String serviceName = service.getName(); + + checkArgument(stack.getServices().contains(serviceName), + "Unknown service %s in %s", service, stack); + checkArgument(!existingServices.contains(serviceName), + "Service %s already exists in cluster %s", serviceName, cluster.getClusterName()); + + newServices.computeIfAbsent(serviceName, __ -> new HashMap<>()); + } + + // process component declarations + for (AddServiceRequest.Component requestedComponent : request.getComponents()) { + String componentName = requestedComponent.getName(); + String serviceName = stack.getServiceForComponent(componentName); + + checkArgument(serviceName != null, + "No service found for component %s in %s", componentName, stack); + checkArgument(!existingServices.contains(serviceName), + "Service %s (for component %s) already exists in cluster %s", serviceName, componentName, cluster.getClusterName()); + + newServices.computeIfAbsent(serviceName, __ -> new HashMap<>()) + .computeIfAbsent(componentName, __ -> new HashSet<>()) + .add(requestedComponent.getFqdn()); + } + + checkArgument(!newServices.isEmpty(), "Request should have at least one new service or component to be added"); + + state = state.withNewServices(newServices); + } + + @VisibleForTesting + void validateConfiguration() { + Configuration config = request.getConfiguration(); + Configuration clusterConfig = getClusterDesiredConfigs(); + clusterConfig.setParentConfiguration(state.getStack().getValidDefaultConfig()); + config.setParentConfiguration(clusterConfig); + + // no validation here so far + + state = state.with(config); + } + + @VisibleForTesting + void validateHosts() { + Set<String> clusterHosts = cluster.getHostNames(); + Set<String> requestHosts = state.getNewServices().values().stream() + .flatMap(componentHosts -> componentHosts.values().stream()) + .flatMap(Collection::stream) + .collect(toSet()); + Set<String> unknownHosts = new TreeSet<>(Sets.difference(requestHosts, clusterHosts)); + + checkArgument(unknownHosts.isEmpty(), + "Requested host not associated with cluster %s: %s", cluster.getClusterName(), unknownHosts); + } + + private Configuration getClusterDesiredConfigs() { + try { + return Configuration.of(configHelper.calculateExistingConfigs(cluster)); + } catch (AmbariException e) { + logAndThrow(msg -> new IllegalStateException(msg, e), "Error getting effective configuration of cluster %s", cluster.getClusterName()); + return Configuration.newEmpty(); // unreachable + } + } + + private static void checkArgument(boolean expression, String errorMessage, Object... messageParams) { + if (!expression) { + logAndThrow(IllegalArgumentException::new, errorMessage, messageParams); + } + } + + private static void checkState(boolean expression, String errorMessage, Object... messageParams) { + if (!expression) { + logAndThrow(IllegalStateException::new, errorMessage, messageParams); + } + } + + private static void logAndThrow(Function<String, RuntimeException> exceptionCreator, String errorMessage, Object... messageParams) { + String msg = String.format(errorMessage, messageParams); + LOG.error(msg); + throw exceptionCreator.apply(msg); + } + + @VisibleForTesting + static class State { + + static final State INITIAL = new State(null, null, null); + + private final Stack stack; + private final Map<String, Map<String, Set<String>>> newServices; + private final Configuration config; + + State(Stack stack, Map<String, Map<String, Set<String>>> newServices, Configuration config) { + this.stack = stack; + this.newServices = newServices; + this.config = config; + } + + boolean isValid() { + return stack != null && newServices != null && config != null; + } + + State with(Stack stack) { + return new State(stack, newServices, config); + } + + State withNewServices(Map<String, Map<String, Set<String>>> newServices) { + return new State(stack, newServices, config); + } + + State with(Configuration config) { + return new State(stack, newServices, config); + } + + Stack getStack() { + return stack; + } + + Map<String, Map<String, Set<String>>> getNewServices() { + return newServices; + } + + Configuration getConfig() { + return config; + } + } + +} diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidatorFactory.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidatorFactory.java new file mode 100644 index 0000000..f23a9f8 --- /dev/null +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/RequestValidatorFactory.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.addservice; + +import org.apache.ambari.server.controller.AddServiceRequest; +import org.apache.ambari.server.state.Cluster; + +/** + * Factory for {@link RequestValidator} objects. + * Implemented by Guice, needed for {@code Assisted} injection. + */ +public interface RequestValidatorFactory { + + RequestValidator create(AddServiceRequest request, Cluster cluster); + +} diff --git a/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java b/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java index 96c3d75..d3b6aa0 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/controller/AddServiceRequestTest.java @@ -200,8 +200,9 @@ public class AddServiceRequestTest { assertTrue(request.getServices().isEmpty()); } - @Test(expected = JsonProcessingException.class) + @Test public void testDeserialize_invalid_noServicesAndComponents() throws Exception { + // empty service/component list should be accepted at the JSON level, will be rejected by the request handler mapper.readValue(REQUEST_INVALID_NO_SERVICES_AND_COMPONENTS, AddServiceRequest.class); } diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintFactoryTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintFactoryTest.java index e7c0fe9..8049dc3 100644 --- a/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintFactoryTest.java +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/BlueprintFactoryTest.java @@ -166,8 +166,8 @@ public class BlueprintFactoryTest { @Test(expected=NoSuchStackException.class) public void testCreateInvalidStack() throws Exception { EasyMockSupport mockSupport = new EasyMockSupport(); - BlueprintFactory.StackFactory mockStackFactory = - mockSupport.createMock(BlueprintFactory.StackFactory.class); + StackFactory mockStackFactory = + mockSupport.createMock(StackFactory.class); // setup mock to throw exception, to simulate invalid stack request expect(mockStackFactory.createStack("null", "null", null)).andThrow(new ObjectNotFoundException("Invalid Stack")); diff --git a/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java new file mode 100644 index 0000000..041b326 --- /dev/null +++ b/ambari-server/src/test/java/org/apache/ambari/server/topology/addservice/RequestValidatorTest.java @@ -0,0 +1,433 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ambari.server.topology.addservice; + +import static org.easymock.EasyMock.expect; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.actionmanager.RequestFactory; +import org.apache.ambari.server.controller.AddServiceRequest; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.state.Cluster; +import org.apache.ambari.server.state.ConfigHelper; +import org.apache.ambari.server.state.SecurityType; +import org.apache.ambari.server.state.Service; +import org.apache.ambari.server.state.StackId; +import org.apache.ambari.server.topology.Configuration; +import org.apache.ambari.server.topology.SecurityConfiguration; +import org.apache.ambari.server.topology.StackFactory; +import org.easymock.EasyMockSupport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Sets; + +public class RequestValidatorTest extends EasyMockSupport { + + private final AddServiceRequest request = createNiceMock(AddServiceRequest.class); + private final Cluster cluster = createMock(Cluster.class); + private final AmbariManagementController controller = createNiceMock(AmbariManagementController.class); + private final ConfigHelper configHelper = createMock(ConfigHelper.class); + private final StackFactory stackFactory = createNiceMock(StackFactory.class); + private final RequestValidator validator = new RequestValidator(request, cluster, controller, configHelper, stackFactory); + + @Before + public void setUp() { + validator.setState(RequestValidator.State.INITIAL); + expect(cluster.getClusterName()).andReturn("TEST").anyTimes(); + expect(cluster.getServices()).andStubReturn(ImmutableMap.of()); + expect(request.getServices()).andStubReturn(ImmutableSet.of()); + expect(request.getComponents()).andStubReturn(ImmutableSet.of()); + } + + @After + public void tearDown() { + resetAll(); + } + + @Test + public void cannotConstructInvalidRequestInfo() { + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + + Stack stack = simpleMockStack(); + Map<String, Map<String, Set<String>>> newServices = someNewServices(); + Configuration config = Configuration.newEmpty(); + + validator.setState(RequestValidator.State.INITIAL.with(stack)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + validator.setState(validator.getState().with(config)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + + validator.setState(RequestValidator.State.INITIAL.withNewServices(newServices)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + validator.setState(validator.getState().with(stack)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + + validator.setState(RequestValidator.State.INITIAL.with(config)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + validator.setState(validator.getState().withNewServices(newServices)); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(null, null)); + } + + @Test + public void canConstructValidRequestInfo() { + validator.setState( + RequestValidator.State.INITIAL + .withNewServices(someNewServices()) + .with(simpleMockStack()) + .with(Configuration.newEmpty()) + ); + ActionManager actionManager = createNiceMock(ActionManager.class); + RequestFactory requestFactory = createNiceMock(RequestFactory.class); + replayAll(); + + AddServiceInfo addServiceInfo = validator.createValidServiceInfo(actionManager, requestFactory); + assertNotNull(addServiceInfo); + assertSame(request, addServiceInfo.getRequest()); + assertEquals(cluster.getClusterName(), addServiceInfo.clusterName()); + assertSame(validator.getState().getConfig(), addServiceInfo.getConfig()); + assertSame(validator.getState().getStack(), addServiceInfo.getStack()); + assertEquals(validator.getState().getNewServices(), addServiceInfo.newServices()); + } + + @Test + public void cannotConstructTwice() { + ActionManager actionManager = createNiceMock(ActionManager.class); + RequestFactory requestFactory = createNiceMock(RequestFactory.class); + replayAll(); + + validator.setState( + RequestValidator.State.INITIAL + .withNewServices(someNewServices()) + .with(simpleMockStack()) + .with(Configuration.newEmpty()) + ); + validator.createValidServiceInfo(actionManager, requestFactory); + assertThrows(IllegalStateException.class, () -> validator.createValidServiceInfo(actionManager, requestFactory)); + } + + @Test + public void reportsUnknownStackFromRequest() throws Exception { + StackId requestStackId = new StackId("HDP", "123"); + expect(request.getStackId()).andReturn(Optional.of(requestStackId)).anyTimes(); + expect(stackFactory.createStack(requestStackId.getStackName(), requestStackId.getStackVersion(), controller)).andThrow(new AmbariException("Stack not found")); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateStack); + assertTrue(e.getMessage().contains(requestStackId.toString())); + assertNull(validator.getState().getStack()); + } + + @Test + public void reportsUnknownStackFromCluster() throws Exception { + StackId clusterStackId = new StackId("CLUSTER", "555"); + expect(request.getStackId()).andReturn(Optional.empty()).anyTimes(); + expect(cluster.getCurrentStackVersion()).andReturn(clusterStackId); + expect(stackFactory.createStack(clusterStackId.getStackName(), clusterStackId.getStackVersion(), controller)).andThrow(new AmbariException("Stack not found")); + replayAll(); + + IllegalStateException e = assertThrows(IllegalStateException.class, validator::validateStack); + assertTrue(e.getMessage().contains(clusterStackId.toString())); + assertNull(validator.getState().getStack()); + } + + @Test + public void useClusterStackIfAbsentInRequest() throws Exception { + StackId clusterStackId = new StackId("CLUSTER", "123"); + Stack expectedStack = createNiceMock(Stack.class); + expect(request.getStackId()).andReturn(Optional.empty()).anyTimes(); + expect(cluster.getCurrentStackVersion()).andReturn(clusterStackId); + expect(stackFactory.createStack(clusterStackId.getStackName(), clusterStackId.getStackVersion(), controller)).andReturn(expectedStack); + replayAll(); + + validator.validateStack(); + + assertSame(expectedStack, validator.getState().getStack()); + } + + @Test + public void acceptsKnownServices() { + expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of("KAFKA"))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + validator.validateServicesAndComponents(); + + Map<String, Map<String, Set<String>>> expectedNewServices = ImmutableMap.of( + "KAFKA", ImmutableMap.of() + ); + assertEquals(expectedNewServices, validator.getState().getNewServices()); + } + + @Test + public void acceptsKnownComponents() { + expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of("KAFKA_BROKER", "c7401.ambari.apache.org"))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + validator.validateServicesAndComponents(); + + Map<String, Map<String, Set<String>>> expectedNewServices = ImmutableMap.of( + "KAFKA", ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7401.ambari.apache.org")) + ); + assertEquals(expectedNewServices, validator.getState().getNewServices()); + } + + @Test + public void rejectsUnknownService() { + String serviceName = "UNKNOWN_SERVICE"; + expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of(serviceName))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateServicesAndComponents); + assertTrue(e.getMessage().contains(serviceName)); + assertNull(validator.getState().getNewServices()); + } + + @Test + public void rejectsUnknownComponent() { + String componentName = "UNKNOWN_COMPONENT"; + expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of(componentName, "c7401.ambari.apache.org"))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateServicesAndComponents); + assertTrue(e.getMessage().contains(componentName)); + assertNull(validator.getState().getNewServices()); + } + + @Test + public void rejectsExistingServiceForService() { + String serviceName = "KAFKA"; + expect(cluster.getServices()).andReturn(ImmutableMap.of(serviceName, createNiceMock(Service.class))).anyTimes(); + expect(request.getServices()).andReturn(ImmutableSet.of(AddServiceRequest.Service.of(serviceName))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateServicesAndComponents); + assertTrue(e.getMessage().contains(serviceName)); + assertNull(validator.getState().getNewServices()); + } + + @Test + public void rejectsExistingServiceForComponent() { + String serviceName = "KAFKA"; + String componentName = "KAFKA_BROKER"; + expect(cluster.getServices()).andReturn(ImmutableMap.of(serviceName, createNiceMock(Service.class))).anyTimes(); + expect(request.getComponents()).andReturn(ImmutableSet.of(AddServiceRequest.Component.of(componentName, "c7401.ambari.apache.org"))); + validator.setState(RequestValidator.State.INITIAL.with(simpleMockStack())); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateServicesAndComponents); + assertTrue(e.getMessage().contains(serviceName)); + assertTrue(e.getMessage().contains(componentName)); + assertNull(validator.getState().getNewServices()); + } + + @Test + public void rejectsEmptyServiceAndComponentList() { + replayAll(); + + assertThrows(IllegalArgumentException.class, validator::validateServicesAndComponents); + assertNull(validator.getState().getNewServices()); + } + + @Test + public void acceptsKnownHosts() { + Set<String> requestHosts = ImmutableSet.of("c7401.ambari.apache.org", "c7402.ambari.apache.org"); + Set<String> otherHosts = ImmutableSet.of("c7403.ambari.apache.org", "c7404.ambari.apache.org"); + Set<String> clusterHosts = Sets.union(requestHosts, otherHosts); + expect(cluster.getHostNames()).andReturn(clusterHosts).anyTimes(); + validator.setState(RequestValidator.State.INITIAL.withNewServices(ImmutableMap.of( + "KAFKA", ImmutableMap.of("KAFKA_BROKER", requestHosts) + ))); + replayAll(); + + validator.validateHosts(); + } + + @Test + public void rejectsUnknownHosts() { + Set<String> clusterHosts = ImmutableSet.of("c7401.ambari.apache.org", "c7402.ambari.apache.org"); + Set<String> otherHosts = ImmutableSet.of("c7403.ambari.apache.org", "c7404.ambari.apache.org"); + Set<String> requestHosts = ImmutableSet.copyOf(Sets.union(clusterHosts, otherHosts)); + expect(cluster.getHostNames()).andReturn(clusterHosts).anyTimes(); + validator.setState(RequestValidator.State.INITIAL.withNewServices(ImmutableMap.of( + "KAFKA", ImmutableMap.of("KAFKA_BROKER", requestHosts) + ))); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateHosts); + assertTrue(e.getMessage(), e.getMessage().contains("host")); + } + + @Test + public void acceptsAbsentSecurityWhenClusterHasKerberos() { + expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.empty()).anyTimes(); + replayAll(); + + validator.validateSecurity(); + } + + @Test + public void acceptsAbsentSecurityWhenClusterHasNone() { + expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.empty()).anyTimes(); + replayAll(); + + validator.validateSecurity(); + } + + @Test + public void acceptsMatchingKerberosSecurity() { + expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.of(new SecurityConfiguration(SecurityType.KERBEROS))).anyTimes(); + replayAll(); + + validator.validateSecurity(); + } + + @Test + public void acceptsMatchingNoneSecurity() { + expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.of(SecurityConfiguration.NONE)).anyTimes(); + replayAll(); + + validator.validateSecurity(); + } + + @Test + public void rejectsNoneSecurityWhenClusterHasKerberos() { + expect(cluster.getSecurityType()).andReturn(SecurityType.KERBEROS).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.of(SecurityConfiguration.NONE)).anyTimes(); + replayAll(); + + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity); + assertTrue(e.getMessage().contains("KERBEROS")); + } + + @Test + public void rejectsKerberosSecurityWhenClusterHasNone() { + expect(cluster.getSecurityType()).andReturn(SecurityType.NONE).anyTimes(); + expect(request.getSecurity()).andReturn(Optional.of(new SecurityConfiguration(SecurityType.KERBEROS))).anyTimes(); + replayAll(); + + assertThrows(IllegalArgumentException.class, validator::validateSecurity); + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, validator::validateSecurity); + assertTrue(e.getMessage().contains("KERBEROS")); + } + + @Test + public void combinesRequestConfigWithClusterAndStack() throws AmbariException { + Configuration requestConfig = Configuration.newEmpty(); + requestConfig.setProperty("kafka-broker", "zookeeper.connect", "zookeeper.connect:request"); + requestConfig.setProperty("kafka-env", "custom_property", "custom_property:request"); + expect(request.getConfiguration()).andReturn(requestConfig.copy()).anyTimes(); + + Configuration clusterConfig = Configuration.newEmpty(); + clusterConfig.setProperty("zookeeper-env", "zk_user", "zk_user:cluster_level"); + expect(configHelper.calculateExistingConfigs(cluster)).andReturn(clusterConfig.asPair()).anyTimes(); + + Stack stack = simpleMockStack(); + Configuration stackConfig = Configuration.newEmpty(); + stackConfig.setProperty("zookeeper-env", "zk_user", "zk_user:stack_default"); + stackConfig.setProperty("zookeeper-env", "zk_log_dir", "zk_log_dir:stack_default"); + stackConfig.setProperty("kafka-broker", "zookeeper.connect", "zookeeper.connect:stack_default"); + expect(stack.getValidDefaultConfig()).andReturn(stackConfig).anyTimes(); + + replayAll(); + + validator.setState(RequestValidator.State.INITIAL.with(stack)); + validator.validateConfiguration(); + + Configuration config = validator.getState().getConfig(); + verifyConfigOverrides(requestConfig, clusterConfig, stackConfig, config); + } + + private static void verifyConfigOverrides(Configuration requestConfig, Configuration clusterConfig, Configuration stackConfig, Configuration actualConfig) { + requestConfig.getProperties().forEach( + (type, properties) -> properties.forEach( + (propertyName, propertyValue) -> assertEquals(type + "/" + propertyName, propertyValue, actualConfig.getPropertyValue(type, propertyName)) + ) + ); + clusterConfig.getProperties().forEach( + (type, properties) -> properties.forEach( + (propertyName, propertyValue) -> { + if (!requestConfig.isPropertySet(type, propertyName)) { + assertEquals(type + "/" + propertyName, propertyValue, actualConfig.getPropertyValue(type, propertyName)); + } + } + ) + ); + stackConfig.getProperties().forEach( + (type, properties) -> properties.forEach( + (propertyName, propertyValue) -> { + if (!requestConfig.isPropertySet(type, propertyName) && !clusterConfig.isPropertySet(type, propertyName)) { + assertEquals(type + "/" + propertyName, propertyValue, actualConfig.getPropertyValue(type, propertyName)); + } + } + ) + ); + } + + private Stack simpleMockStack() { + Stack stack = createNiceMock(Stack.class); + Set<String> stackServices = ImmutableSet.of("KAFKA", "ZOOKEEPER"); + expect(stack.getServices()).andReturn(stackServices).anyTimes(); + expect(stack.getServiceForComponent("KAFKA_BROKER")).andReturn("KAFKA").anyTimes(); + expect(stack.getServiceForComponent("ZOOKEEPER_SERVER")).andReturn("ZOOKEEPER").anyTimes(); + expect(stack.getServiceForComponent("ZOOKEEPER_CLIENT")).andReturn("ZOOKEEPER").anyTimes(); + return stack; + } + + private static Map<String, Map<String, Set<String>>> someNewServices() { + return ImmutableMap.of( + "KAFKA", ImmutableMap.of("KAFKA_BROKER", ImmutableSet.of("c7401.ambari.apache.org")) + ); + } + + private static <T extends Throwable> T assertThrows(Class<T> expectedException, Runnable code) { + try { + code.run(); + } catch (Throwable t) { + if (expectedException.isInstance(t)) { + return expectedException.cast(t); + } + throw new AssertionError("Expected exception: " + expectedException + " but " + t.getClass() + " was thrown instead"); + } + + throw new AssertionError("Expected exception: " + expectedException + ", but was not thrown"); + } + +}