This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
The following commit(s) were added to refs/heads/trunk by this push: new 94d1860 AMBARI-24917. Implement complex Add Service request using default configs (#2631) 94d1860 is described below commit 94d18601e8adef6bd43d2ce1e70d3c1e4f14ae3c Author: Doroszlai, Attila <6454655+adorosz...@users.noreply.github.com> AuthorDate: Tue Nov 20 08:14:45 2018 +0100 AMBARI-24917. Implement complex Add Service request using default configs (#2631) --- .../ambari/server/api/handlers/CreateHandler.java | 2 +- .../internal/HostComponentResourceProvider.java | 2 +- .../internal/ServiceResourceProvider.java | 8 +- .../ambari/server/controller/internal/Stack.java | 24 +++ .../server/controller/internal/UnitUpdater.java | 23 ++- .../server/topology/addservice/AddServiceInfo.java | 46 +++++- .../addservice/AddServiceOrchestrator.java | 61 ++++++- .../addservice/ResourceProviderAdapter.java | 183 +++++++++++++++++++-- 8 files changed, 312 insertions(+), 37 deletions(-) diff --git a/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java b/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java index 484547a..b5dddf8 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/api/handlers/CreateHandler.java @@ -77,7 +77,7 @@ public class CreateHandler extends BaseManagementHandler { } catch (ResourceAlreadyExistsException e) { result = new ResultImpl(new ResultStatus(ResultStatus.STATUS.CONFLICT, e.getMessage())); } catch(IllegalArgumentException e) { - LOG.error("Bad request received: " + e.getMessage()); + LOG.error("Bad request received: " + e.getMessage(), e); result = new ResultImpl(new ResultStatus(ResultStatus.STATUS.BAD_REQUEST, e.getMessage())); } catch (RuntimeException e) { if (LOG.isErrorEnabled()) { diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java index d6f52a4..79441d5 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/HostComponentResourceProvider.java @@ -799,7 +799,7 @@ public class HostComponentResourceProvider extends AbstractControllerResourcePro * @throws NoSuchResourceException the query didn't match any resources * @throws NoSuchParentResourceException a specified parent resource doesn't exist */ - private RequestStageContainer doUpdateResources(final RequestStageContainer stages, final Request request, + public RequestStageContainer doUpdateResources(final RequestStageContainer stages, final Request request, Predicate predicate, boolean performQueryEvaluation, boolean useGeneratedConfigs, boolean useClusterHostInfo) throws UnsupportedPropertyException, diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java index a4aaf0b..eaa8cb1 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/ServiceResourceProvider.java @@ -254,9 +254,9 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider if (request.getProperties().size() == 1) { Map<String, Object> requestProperties = request.getProperties().iterator().next(); if (isAddServiceRequest(requestProperties)) { - processAddServiceRequest(requestProperties, request.getRequestInfoProperties()); + RequestStatusResponse response = processAddServiceRequest(requestProperties, request.getRequestInfoProperties()); notifyCreate(Resource.Type.Service, request); - return getRequestStatus(null); + return getRequestStatus(response); } } @@ -1228,11 +1228,11 @@ public class ServiceResourceProvider extends AbstractControllerResourceProvider return OperationType.ADD_SERVICE.name().equals(properties.get(OPERATION_TYPE)); } - private void processAddServiceRequest(Map<String, Object> requestProperties, Map<String, String> requestInfoProperties) throws NoSuchParentResourceException { + private RequestStatusResponse processAddServiceRequest(Map<String, Object> requestProperties, Map<String, String> requestInfoProperties) throws NoSuchParentResourceException { AddServiceRequest request = createAddServiceRequest(requestProperties, requestInfoProperties); String clusterName = String.valueOf(requestProperties.get(SERVICE_CLUSTER_NAME_PROPERTY_ID)); try { - addServiceOrchestrator.processAddServiceRequest(getManagementController().getClusters().getCluster(clusterName), request); + return addServiceOrchestrator.processAddServiceRequest(getManagementController().getClusters().getCluster(clusterName), request); } catch (AmbariException e) { throw new NoSuchParentResourceException(e.getMessage(), e); } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java index 1c85d88..02e8b4e 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/Stack.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; @@ -48,6 +49,7 @@ import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.ValueAttributesInfo; import org.apache.ambari.server.topology.Cardinality; import org.apache.ambari.server.topology.Configuration; +import org.apache.ambari.server.topology.validators.UnitValidatedProperty; /** * Encapsulates stack information. @@ -643,6 +645,28 @@ public class Stack { } /** + * @return default configuration of the stack, with some updates necessary so that the config can be applied + * (eg. some properties need a unit to be appended) + */ + public Configuration getValidDefaultConfig() { + Configuration config = getConfiguration(); + + for (UnitValidatedProperty p : UnitValidatedProperty.ALL) { + if (config.isPropertySet(p.getConfigType(), p.getPropertyName())) { + String value = config.getPropertyValue(p.getConfigType(), p.getPropertyName()); + String updatedValue = UnitUpdater.updateForClusterCreate(this, p.getServiceName(), p.getConfigType(), p.getPropertyName(), value); + config.setProperty(p.getConfigType(), p.getPropertyName(), updatedValue); + } + } + + config.getProperties().values().forEach( + each -> each.values().removeIf(Objects::isNull) + ); + + return config; + } + + /** * Parse components for the specified service from the stack definition. * * @param service service name diff --git a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java index 501e16a..6691519 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/controller/internal/UnitUpdater.java @@ -49,15 +49,20 @@ public class UnitUpdater implements BlueprintConfigurationProcessor.PropertyUpda String origValue, Map<String, Map<String, String>> properties, ClusterTopology topology) { - PropertyUnit stackUnit = PropertyUnit.of(topology.getBlueprint().getStack(), serviceName, configType, propertyName); - PropertyValue value = PropertyValue.of(propertyName, origValue); - if (value.hasUnit(stackUnit)) { - return value.toString(); - } else if (!value.hasAnyUnit()) { - return value.withUnit(stackUnit); - } else { // should not happen because of pre-validation in UnitValidator - throw new IllegalArgumentException("Property " + propertyName + "=" + origValue + " has an unsupported unit. Stack supported unit is: " + stackUnit + " or no unit"); - } + Stack stack = topology.getBlueprint().getStack(); + return updateForClusterCreate(stack, serviceName, configType, propertyName, origValue); + } + + public static String updateForClusterCreate(Stack stack, String serviceName, String configType, String propertyName, String origValue) { + PropertyUnit stackUnit = PropertyUnit.of(stack, serviceName, configType, propertyName); + PropertyValue value = PropertyValue.of(propertyName, origValue); + if (value.hasUnit(stackUnit)) { + return value.toString(); + } else if (!value.hasAnyUnit()) { + return value.withUnit(stackUnit); + } else { // should not happen because of pre-validation in UnitValidator + throw new IllegalArgumentException("Property " + propertyName + "=" + origValue + " has an unsupported unit. Stack supported unit is: " + stackUnit + " or no unit"); + } } /** diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java index 24e530d..8662746 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceInfo.java @@ -17,26 +17,47 @@ */ package org.apache.ambari.server.topology.addservice; +import static java.util.stream.Collectors.joining; + import java.util.Map; import java.util.Set; +import org.apache.ambari.server.controller.internal.RequestStageContainer; +import org.apache.ambari.server.controller.internal.Stack; +import org.apache.ambari.server.topology.Configuration; + /** * Processed info for adding new services/components to an existing cluster. */ public final class AddServiceInfo { private final String clusterName; + private final Stack stack; private final Map<String, Map<String, Set<String>>> newServices; + private final RequestStageContainer stages; + private final Configuration config; - public AddServiceInfo(String clusterName, Map<String, Map<String, Set<String>>> newServices) { + public AddServiceInfo(String clusterName, Stack stack, Configuration config, RequestStageContainer stages, Map<String, Map<String, Set<String>>> newServices) { this.clusterName = clusterName; + this.stack = stack; this.newServices = newServices; + this.stages = stages; + this.config = config; + } + + @Override + public String toString() { + return "AddServiceRequest(" + stages.getId() + ")"; } public String clusterName() { return clusterName; } + public RequestStageContainer getStages() { + return stages; + } + /** * New services to be added to the cluster: service -> component -> host * This should include both explicitly requested services, and services of the requested components. @@ -44,4 +65,27 @@ public final class AddServiceInfo { public Map<String, Map<String, Set<String>>> newServices() { return newServices; } + + public Stack getStack() { + return stack; + } + + public Configuration getConfig() { + return config; + } + + /** + * Creates a descriptive label to be displayed in the UI. + */ + public String describe() { + int maxServicesToShow = 3; + StringBuilder sb = new StringBuilder("Add Services: ") + .append(newServices.keySet().stream().sorted().limit(maxServicesToShow).collect(joining(", "))); + if (newServices.size() > maxServicesToShow) { + sb.append(" and ").append(newServices.size() - maxServicesToShow).append(" more"); + } + sb.append(" to cluster ").append(clusterName); + return sb.toString(); + } + } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java index 426c833..e137d4a 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/AddServiceOrchestrator.java @@ -27,12 +27,17 @@ import javax.inject.Inject; import javax.inject.Singleton; import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.actionmanager.ActionManager; +import org.apache.ambari.server.actionmanager.RequestFactory; import org.apache.ambari.server.controller.AddServiceRequest; import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.RequestStatusResponse; +import org.apache.ambari.server.controller.internal.RequestStageContainer; import org.apache.ambari.server.controller.internal.Stack; import org.apache.ambari.server.state.Cluster; import org.apache.ambari.server.state.StackId; import org.apache.ambari.server.state.State; +import org.apache.ambari.server.topology.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,13 +52,23 @@ public class AddServiceOrchestrator { @Inject private AmbariManagementController controller; - public void processAddServiceRequest(Cluster cluster, AddServiceRequest request) { + @Inject + private ActionManager actionManager; + + @Inject + private RequestFactory requestFactory; + + public RequestStatusResponse processAddServiceRequest(Cluster cluster, AddServiceRequest request) { LOG.info("Received {} request for {}: {}", request.getOperationType(), cluster.getClusterName(), request); AddServiceInfo validatedRequest = validate(cluster, request); AddServiceInfo requestWithLayout = recommendLayout(validatedRequest); - createResources(requestWithLayout); - createHostTasks(requestWithLayout); + AddServiceInfo requestWithConfig = recommendConfiguration(requestWithLayout); + + createResources(requestWithConfig); + createHostTasks(requestWithConfig); + + return requestWithConfig.getStages().getRequestStatusResponse(); } /** @@ -68,8 +83,9 @@ public class AddServiceOrchestrator { Map<String, Map<String, Set<String>>> newServices = new LinkedHashMap<>(); StackId stackId = new StackId(request.getStackName(), request.getStackVersion()); + Stack stack; try { - Stack stack = new Stack(stackId, controller); + stack = new Stack(stackId, controller); Set<String> existingServices = cluster.getServices().keySet(); for (AddServiceRequest.Component requestedComponent : request.getComponents()) { String serviceName = stack.getServiceForComponent(requestedComponent.getName()); @@ -93,7 +109,17 @@ public class AddServiceOrchestrator { throw new IllegalArgumentException(e); } - return new AddServiceInfo(cluster.getClusterName(), newServices); + if (newServices.isEmpty()) { + throw new IllegalArgumentException("No new services to be added"); + } + + Configuration config = stack.getValidDefaultConfig(); + // TODO add user-defined config + + RequestStageContainer stages = new RequestStageContainer(actionManager.getNextRequestId(), null, requestFactory, actionManager); + AddServiceInfo validatedRequest = new AddServiceInfo(cluster.getClusterName(), stack, config, stages, newServices); + stages.setRequestContext(validatedRequest.describe()); + return validatedRequest; } /** @@ -108,20 +134,41 @@ public class AddServiceOrchestrator { } /** + * Requests config recommendation from the stack advisor. + * @return new request, updated with the recommended config + * @throws IllegalArgumentException if the request cannot be satisfied + */ + private AddServiceInfo recommendConfiguration(AddServiceInfo request) { + LOG.info("Recommending configuration for {}", request); + // TODO implement + return request; + } + + /** * Creates the service, component and host component resources for the request. */ private void createResources(AddServiceInfo request) { LOG.info("Creating resources for {}", request); resourceProviders.createServices(request); resourceProviders.createComponents(request); - resourceProviders.createHostComponents(request); + resourceProviders.createConfigs(request); resourceProviders.updateServiceDesiredState(request, State.INSTALLED); resourceProviders.updateServiceDesiredState(request, State.STARTED); + resourceProviders.createHostComponents(request); } private void createHostTasks(AddServiceInfo request) { LOG.info("Creating host tasks for {}", request); - // TODO implement + + resourceProviders.updateHostComponentDesiredState(request, State.INSTALLED); + resourceProviders.updateHostComponentDesiredState(request, State.STARTED); + try { + request.getStages().persist(); + } catch (AmbariException e) { + String msg = String.format("Error creating host tasks for %s", request); + LOG.error(msg, e); + throw new IllegalStateException(msg, e); + } } } diff --git a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java index 70b730e..722d5f6 100644 --- a/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java +++ b/ambari-server/src/main/java/org/apache/ambari/server/topology/addservice/ResourceProviderAdapter.java @@ -17,15 +17,36 @@ */ package org.apache.ambari.server.topology.addservice; +import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; +import javax.inject.Inject; +import javax.inject.Singleton; + +import org.apache.ambari.server.AmbariException; +import org.apache.ambari.server.controller.AmbariManagementController; +import org.apache.ambari.server.controller.ClusterRequest; +import org.apache.ambari.server.controller.ConfigurationRequest; +import org.apache.ambari.server.controller.internal.ClusterResourceProvider; +import org.apache.ambari.server.controller.internal.ComponentResourceProvider; +import org.apache.ambari.server.controller.internal.HostComponentResourceProvider; import org.apache.ambari.server.controller.internal.RequestImpl; +import org.apache.ambari.server.controller.internal.RequestOperationLevel; import org.apache.ambari.server.controller.internal.ServiceResourceProvider; +import org.apache.ambari.server.controller.predicate.AndPredicate; +import org.apache.ambari.server.controller.predicate.EqualsPredicate; +import org.apache.ambari.server.controller.predicate.OrPredicate; import org.apache.ambari.server.controller.spi.ClusterController; import org.apache.ambari.server.controller.spi.NoSuchParentResourceException; +import org.apache.ambari.server.controller.spi.NoSuchResourceException; +import org.apache.ambari.server.controller.spi.Predicate; import org.apache.ambari.server.controller.spi.Request; import org.apache.ambari.server.controller.spi.Resource; import org.apache.ambari.server.controller.spi.ResourceAlreadyExistsException; @@ -33,20 +54,28 @@ import org.apache.ambari.server.controller.spi.ResourceProvider; import org.apache.ambari.server.controller.spi.SystemException; import org.apache.ambari.server.controller.spi.UnsupportedPropertyException; import org.apache.ambari.server.controller.utilities.ClusterControllerHelper; +import org.apache.ambari.server.controller.utilities.PropertyHelper; +import org.apache.ambari.server.security.authorization.AuthorizationException; +import org.apache.ambari.server.state.ConfigHelper; import org.apache.ambari.server.state.State; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; /** * Creates resources using the resource providers. * Translates {@link AddServiceInfo} to internal requests accepted by those. */ +@Singleton public class ResourceProviderAdapter { private static final Logger LOG = LoggerFactory.getLogger(ResourceProviderAdapter.class); + @Inject + private AmbariManagementController controller; + public void createServices(AddServiceInfo request) { LOG.info("Creating service resources for {}", request); @@ -54,16 +83,115 @@ public class ResourceProviderAdapter { .map(service -> createServiceRequestProperties(request, service)) .collect(toSet()); + createResources(properties, Resource.Type.Service); + } + + public void createComponents(AddServiceInfo request) { + LOG.info("Creating component resources for {}", request); + + Set<Map<String, Object>> properties = request.newServices().entrySet().stream() + .flatMap(componentsOfService -> componentsOfService.getValue().keySet().stream() + .map(component -> createComponentRequestProperties(request, componentsOfService.getKey(), component))) + .collect(toSet()); + + createResources(properties, Resource.Type.Component); + } + + public void createHostComponents(AddServiceInfo request) { + LOG.info("Creating host component resources for {}", request); + + Set<Map<String, Object>> properties = request.newServices().entrySet().stream() + .flatMap(componentsOfService -> componentsOfService.getValue().entrySet().stream() + .flatMap(hostsOfComponent -> hostsOfComponent.getValue().stream() + .map(host -> createHostComponentRequestProperties(request, componentsOfService.getKey(), hostsOfComponent.getKey(), host)))) + .collect(toSet()); + + createResources(properties, Resource.Type.HostComponent); + } + + public void createConfigs(AddServiceInfo request) { + LOG.info("Creating configurations for {}", request); + + Set<ClusterRequest> requests = new HashSet<>(); + for (String service : request.newServices().keySet()) { + List<ConfigurationRequest> configRequests = request.getStack().getConfigurationTypes(service).stream() + .filter(configType -> !Objects.equals(configType, ConfigHelper.CLUSTER_ENV)) + .map(configType -> createClusterConfigRequestProperties(request, service, configType)) + .collect(toList()); + ClusterRequest internalRequest = new ClusterRequest(null, request.clusterName(), null, null); + internalRequest.setDesiredConfig(configRequests); + requests.add(internalRequest); + } + + try { + controller.updateClusters(requests, null); + } catch (AmbariException | AuthorizationException e) { + String msg = String.format("Error creating configurations for %s", request); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + public void updateServiceDesiredState(AddServiceInfo request, State desiredState) { + LOG.info("Updating service desired state to {} for {}", desiredState, request); + + Set<Map<String, Object>> properties = ImmutableSet.of(ImmutableMap.of( + ServiceResourceProvider.SERVICE_SERVICE_STATE_PROPERTY_ID, desiredState.name() + )); + updateResources(request, properties, Resource.Type.Service, predicateForNewServices(request, "ServiceInfo")); + } + + public void updateHostComponentDesiredState(AddServiceInfo request, State desiredState) { + LOG.info("Updating host component desired state to {} for {}", desiredState, request); + + Set<Map<String, Object>> properties = ImmutableSet.of(ImmutableMap.of( + HostComponentResourceProvider.STATE, desiredState.name(), + "context", String.format("Put new components to %s state", desiredState) + )); + HostComponentResourceProvider rp = (HostComponentResourceProvider) getClusterController().ensureResourceProvider(Resource.Type.HostComponent); + Request internalRequest = createRequest(request, properties, Resource.Type.HostComponent); + try { + rp.doUpdateResources(request.getStages(), internalRequest, predicateForNewServices(request, HostComponentResourceProvider.HOST_ROLES), false, false, false); + } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) { + String msg = String.format("Error updating host component desired state for %s", request); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + private static void createResources(Set<Map<String, Object>> properties, Resource.Type resourceType) { Request internalRequest = new RequestImpl(null, properties, null, null); - ResourceProvider rp = getClusterController().ensureResourceProvider(Resource.Type.Service); + ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType); try { rp.createResources(internalRequest); } catch (UnsupportedPropertyException | SystemException | ResourceAlreadyExistsException | NoSuchParentResourceException e) { - LOG.error("Error creating services", e); - throw new RuntimeException("Error creating services", e); + String msg = String.format("Error creating resources: %s", resourceType); + LOG.error(msg, e); + throw new RuntimeException(msg, e); + } + } + + private static void updateResources(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType, Predicate predicate) { + Request internalRequest = createRequest(request, properties, resourceType); + ResourceProvider rp = getClusterController().ensureResourceProvider(resourceType); + try { + rp.updateResources(internalRequest, predicate); + } catch (UnsupportedPropertyException | SystemException | NoSuchParentResourceException | NoSuchResourceException e) { + String msg = String.format("Error updating resources: %s", resourceType); + LOG.error(msg, e); + throw new RuntimeException(msg, e); } } + private static Request createRequest(AddServiceInfo request, Set<Map<String, Object>> properties, Resource.Type resourceType) { + Map<String, String> requestInfoProperties = ImmutableMap.of( + + RequestOperationLevel.OPERATION_LEVEL_ID, RequestOperationLevel.getExternalLevelName(resourceType.name()), + RequestOperationLevel.OPERATION_CLUSTER_ID, request.clusterName() + ); + return new RequestImpl(null, properties, requestInfoProperties, null); + } + private static Map<String, Object> createServiceRequestProperties(AddServiceInfo request, String service) { ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder(); @@ -74,22 +202,49 @@ public class ResourceProviderAdapter { return properties.build(); } - private ClusterController getClusterController() { - return ClusterControllerHelper.getClusterController(); + private static Map<String, Object> createComponentRequestProperties(AddServiceInfo request, String service, String component) { + ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder(); + + properties.put(ComponentResourceProvider.CLUSTER_NAME, request.clusterName()); + properties.put(ComponentResourceProvider.SERVICE_NAME, service); + properties.put(ComponentResourceProvider.COMPONENT_NAME, component); + properties.put(ComponentResourceProvider.STATE, State.INIT.name()); + + return properties.build(); } - public void createComponents(AddServiceInfo request) { - LOG.info("Creating component resources for {}", request); - // TODO implement + private static Map<String, Object> createHostComponentRequestProperties(AddServiceInfo request, String service, String component, String host) { + ImmutableMap.Builder<String, Object> properties = ImmutableMap.builder(); + + properties.put(HostComponentResourceProvider.CLUSTER_NAME, request.clusterName()); + properties.put(HostComponentResourceProvider.SERVICE_NAME, service); + properties.put(HostComponentResourceProvider.COMPONENT_NAME, component); + properties.put(HostComponentResourceProvider.HOST_NAME, host); + properties.put(HostComponentResourceProvider.STATE, State.INIT.name()); + + return properties.build(); } - public void createHostComponents(AddServiceInfo request) { - LOG.info("Creating host component resources for {}", request); - // TODO implement + private static ConfigurationRequest createClusterConfigRequestProperties(AddServiceInfo request, String service, String configType) { + LOG.debug("Creating config type {} for service {}", configType, service); + + return new ConfigurationRequest(request.clusterName(), configType, "ADD_SERVICE", + request.getConfig().getProperties().getOrDefault(configType, new HashMap<>(0)), + request.getConfig().getAttributes().getOrDefault(configType, new HashMap<>(0))); } - public void updateServiceDesiredState(AddServiceInfo request, State desiredState) { - LOG.info("Updating service desired state to {} for {}", desiredState, request); - // TODO implement, reuse parts of AmbariContext#createAmbariServiceAndComponentResources + private static Predicate predicateForNewServices(AddServiceInfo request, String category) { + return new AndPredicate( + new EqualsPredicate<>(PropertyHelper.getPropertyId(category, ClusterResourceProvider.CLUSTER_NAME), request.clusterName()), + new OrPredicate( + request.newServices().keySet().stream() + .map(service -> new EqualsPredicate<>(PropertyHelper.getPropertyId(category, "service_name"), service)) + .toArray(Predicate[]::new) + ) + ); + } + + private static ClusterController getClusterController() { + return ClusterControllerHelper.getClusterController(); } }