http://git-wip-us.apache.org/repos/asf/ambari/blob/1863c3b9/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_25.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_25.py b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_25.py new file mode 100755 index 0000000..1f0ae18 --- /dev/null +++ b/ambari-server/src/main/resources/stacks/BigInsights/4.0/stack-advisor/stack_advisor_25.py @@ -0,0 +1,1940 @@ +#!/usr/bin/env ambari-python-wrap +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import math +import traceback + +from ambari_commons.str_utils import string_set_equals +from resource_management.core.logger import Logger +from resource_management.core.exceptions import Fail +from resource_management.libraries.functions.get_bare_principal import get_bare_principal + +try: + from stack_advisor_24 import * +except ImportError: + #Ignore ImportError + print("stack_advisor_24 not found") + +class HDP25StackAdvisor(HDP24StackAdvisor): + + def __init__(self): + super(HDP25StackAdvisor, self).__init__() + Logger.initialize_logger() + self.HIVE_INTERACTIVE_SITE = 'hive-interactive-site' + self.YARN_ROOT_DEFAULT_QUEUE_NAME = 'default' + self.AMBARI_MANAGED_LLAP_QUEUE_NAME = 'llap' + self.RANGER_TAGSYNC_SITE = 'ranger-tagsync-site'; + + def recommendOozieConfigurations(self, configurations, clusterData, services, hosts): + super(HDP25StackAdvisor,self).recommendOozieConfigurations(configurations, clusterData, services, hosts) + putOozieEnvProperty = self.putProperty(configurations, "oozie-env", services) + + if not "oozie-env" in services["configurations"] : + Logger.info("No oozie configurations available") + return + + if not "FALCON_SERVER" in clusterData["components"] : + Logger.info("Falcon is not part of the installation") + return + + falconUser = 'falcon' + + if "falcon-env" in services["configurations"] : + if "falcon_user" in services["configurations"]["falcon-env"]["properties"] : + falconUser = services["configurations"]["falcon-env"]["properties"]["falcon_user"] + Logger.info("Falcon user from configuration: %s " % falconUser) + + Logger.info("Falcon user : %s" % falconUser) + + oozieUser = 'oozie' + + if "oozie_user" \ + in services["configurations"]["oozie-env"]["properties"] : + oozieUser = services["configurations"]["oozie-env"]["properties"]["oozie_user"] + Logger.info("Oozie user from configuration %s" % oozieUser) + + Logger.info("Oozie user %s" % oozieUser) + + if "oozie_admin_users" \ + in services["configurations"]["oozie-env"]["properties"] : + currentAdminUsers = services["configurations"]["oozie-env"]["properties"]["oozie_admin_users"] + Logger.info("Oozie admin users from configuration %s" % currentAdminUsers) + else : + currentAdminUsers = "{0}, oozie-admin".format(oozieUser) + Logger.info("Setting default oozie admin users to %s" % currentAdminUsers) + + + if falconUser in currentAdminUsers : + Logger.info("Falcon user %s already member of oozie admin users " % falconUser) + return + + newAdminUsers = "{0},{1}".format(currentAdminUsers, falconUser) + + Logger.info("new oozie admin users : %s" % newAdminUsers) + + services["forced-configurations"].append({"type" : "oozie-env", "name" : "oozie_admin_users"}) + putOozieEnvProperty("oozie_admin_users", newAdminUsers) + + def createComponentLayoutRecommendations(self, services, hosts): + parentComponentLayoutRecommendations = super(HDP25StackAdvisor, self).createComponentLayoutRecommendations( + services, hosts) + return parentComponentLayoutRecommendations + + def getComponentLayoutValidations(self, services, hosts): + parentItems = super(HDP25StackAdvisor, self).getComponentLayoutValidations(services, hosts) + childItems = [] + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + if len(hsi_hosts) > 1: + message = "Only one host can install HIVE_SERVER_INTERACTIVE. " + childItems.append( + {"type": 'host-component', "level": 'ERROR', "message": message, "component-name": 'HIVE_SERVER_INTERACTIVE'}) + + parentItems.extend(childItems) + return parentItems + + def getServiceConfigurationValidators(self): + parentValidators = super(HDP25StackAdvisor, self).getServiceConfigurationValidators() + childValidators = { + "ATLAS": {"application-properties": self.validateAtlasConfigurations}, + "HIVE": {"hive-interactive-env": self.validateHiveInteractiveEnvConfigurations, + "hive-interactive-site": self.validateHiveInteractiveSiteConfigurations, + "hive-env": self.validateHiveConfigurationsEnv}, + "YARN": {"yarn-site": self.validateYARNConfigurations}, + "RANGER": {"ranger-tagsync-site": self.validateRangerTagsyncConfigurations}, + "SPARK2": {"spark2-defaults": self.validateSpark2Defaults, + "spark2-thrift-sparkconf": self.validateSpark2ThriftSparkConf} + } + self.mergeValidators(parentValidators, childValidators) + return parentValidators + + def validateAtlasConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + application_properties = getSiteProperties(configurations, "application-properties") + validationItems = [] + + #<editor-fold desc="LDAP and AD"> + auth_type = application_properties['atlas.authentication.method.ldap.type'] + Logger.info("Validating Atlas configs, authentication type: %s" % str(auth_type)) + + # Required props + ldap_props = {"atlas.authentication.method.ldap.url": "", + "atlas.authentication.method.ldap.userDNpattern": "uid=", + "atlas.authentication.method.ldap.groupSearchBase": "", + "atlas.authentication.method.ldap.groupSearchFilter": "", + "atlas.authentication.method.ldap.groupRoleAttribute": "cn", + "atlas.authentication.method.ldap.base.dn": "", + "atlas.authentication.method.ldap.bind.dn": "", + "atlas.authentication.method.ldap.bind.password": "", + "atlas.authentication.method.ldap.user.searchfilter": "" + } + ad_props = {"atlas.authentication.method.ldap.ad.domain": "", + "atlas.authentication.method.ldap.ad.url": "", + "atlas.authentication.method.ldap.ad.base.dn": "", + "atlas.authentication.method.ldap.ad.bind.dn": "", + "atlas.authentication.method.ldap.ad.bind.password": "", + "atlas.authentication.method.ldap.ad.user.searchfilter": "(sAMAccountName={0})" + } + + props_to_require = set() + if auth_type.lower() == "ldap": + props_to_require = set(ldap_props.keys()) + elif auth_type.lower() == "ad": + props_to_require = set(ad_props.keys()) + elif auth_type.lower() == "none": + pass + + for prop in props_to_require: + if prop not in application_properties or application_properties[prop] is None or application_properties[prop].strip() == "": + validationItems.append({"config-name": prop, + "item": self.getErrorItem("If authentication type is %s, this property is required." % auth_type)}) + #</editor-fold> + + if application_properties['atlas.graph.index.search.backend'] == 'solr5' and \ + not application_properties['atlas.graph.index.search.solr.zookeeper-url']: + validationItems.append({"config-name": "atlas.graph.index.search.solr.zookeeper-url", + "item": self.getErrorItem( + "If AMBARI_INFRA is not installed then the SOLR zookeeper url configuration must be specified.")}) + + if not application_properties['atlas.kafka.bootstrap.servers']: + validationItems.append({"config-name": "atlas.kafka.bootstrap.servers", + "item": self.getErrorItem( + "If KAFKA is not installed then the Kafka bootstrap servers configuration must be specified.")}) + + if not application_properties['atlas.kafka.zookeeper.connect']: + validationItems.append({"config-name": "atlas.kafka.zookeeper.connect", + "item": self.getErrorItem( + "If KAFKA is not installed then the Kafka zookeeper quorum configuration must be specified.")}) + + if application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' in services['configurations']: + hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum'] + + if not application_properties['atlas.graph.storage.hostname']: + validationItems.append({"config-name": "atlas.graph.storage.hostname", + "item": self.getErrorItem( + "If HBASE is not installed then the hbase zookeeper quorum configuration must be specified.")}) + elif string_set_equals(application_properties['atlas.graph.storage.hostname'], hbase_zookeeper_quorum): + validationItems.append({"config-name": "atlas.graph.storage.hostname", + "item": self.getWarnItem( + "Atlas is configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")}) + + if not application_properties['atlas.audit.hbase.zookeeper.quorum']: + validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum", + "item": self.getErrorItem( + "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")}) + + elif application_properties['atlas.graph.storage.backend'] == 'hbase' and 'hbase-site' not in services[ + 'configurations']: + if not application_properties['atlas.graph.storage.hostname']: + validationItems.append({"config-name": "atlas.graph.storage.hostname", + "item": self.getErrorItem( + "Atlas is not configured to use the HBase installed in this cluster. If you would like Atlas to use another HBase instance, please configure this property and HBASE_CONF_DIR variable in atlas-env appropriately.")}) + if not application_properties['atlas.audit.hbase.zookeeper.quorum']: + validationItems.append({"config-name": "atlas.audit.hbase.zookeeper.quorum", + "item": self.getErrorItem( + "If HBASE is not installed then the audit hbase zookeeper quorum configuration must be specified.")}) + + validationProblems = self.toConfigurationValidationProblems(validationItems, "application-properties") + return validationProblems + + def validateSpark2Defaults(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ + { + "config-name": 'spark.yarn.queue', + "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) + } + ] + return self.toConfigurationValidationProblems(validationItems, "spark2-defaults") + + def validateSpark2ThriftSparkConf(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [ + { + "config-name": 'spark.yarn.queue', + "item": self.validatorYarnQueue(properties, recommendedDefaults, 'spark.yarn.queue', services) + } + ] + return self.toConfigurationValidationProblems(validationItems, "spark2-thrift-sparkconf") + + def validateYarnConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + parentValidationProblems = super(HDP25StackAdvisor, self).validateYARNConfigurations(properties, recommendedDefaults, configurations, services, hosts) + yarn_site_properties = getSiteProperties(configurations, "yarn-site") + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + componentsListList = [service["components"] for service in services["services"]] + componentsList = [item["StackServiceComponents"] for sublist in componentsListList for item in sublist] + validationItems = [] + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + if len(hsi_hosts) > 0: + # HIVE_SERVER_INTERACTIVE is mapped to a host + if 'yarn.resourcemanager.work-preserving-recovery.enabled' not in yarn_site_properties or \ + 'true' != yarn_site_properties['yarn.resourcemanager.work-preserving-recovery.enabled']: + validationItems.append({"config-name": "yarn.resourcemanager.work-preserving-recovery.enabled", + "item": self.getWarnItem( + "While enabling HIVE_SERVER_INTERACTIVE it is recommended that you enable work preserving restart in YARN.")}) + + validationProblems = self.toConfigurationValidationProblems(validationItems, "yarn-site") + validationProblems.extend(parentValidationProblems) + return validationProblems + + """ + Does the following validation checks for HIVE_SERVER_INTERACTIVE's hive-interactive-site configs. + 1. Queue selected in 'hive.llap.daemon.queue.name' config should be sized >= to minimum required to run LLAP + and Hive2 app. + 2. Queue selected in 'hive.llap.daemon.queue.name' config state should not be 'STOPPED'. + 3. 'hive.server2.enable.doAs' config should be set to 'false' for Hive2. + 4. 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) should not consume more that 50% of selected queue for LLAP. + 5. if 'llap' queue is selected, in order to run Service Checks, 'remaining available capacity' in cluster is atleast 512 MB. + """ + def validateHiveInteractiveSiteConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + validationItems = [] + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + curr_selected_queue_for_llap = None + curr_selected_queue_for_llap_cap_perc = None + MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS = 512 + current_selected_queue_for_llap_cap = None + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + if len(hsi_hosts) > 0: + # Get total cluster capacity + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) + yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) + total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb + + capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if capacity_scheduler_properties: + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + curr_selected_queue_for_llap = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + if curr_selected_queue_for_llap: + current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, + curr_selected_queue_for_llap, total_cluster_capacity) + if current_selected_queue_for_llap_cap: + curr_selected_queue_for_llap_cap_perc = int(current_selected_queue_for_llap_cap * 100 / total_cluster_capacity) + min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) + + # Validate that the selected queue in 'hive.llap.daemon.queue.name' should be sized >= to minimum required + # to run LLAP and Hive2 app. + if curr_selected_queue_for_llap_cap_perc < min_reqd_queue_cap_perc: + errMsg1 = "Selected queue '{0}' capacity ({1}%) is less than minimum required capacity ({2}%) for LLAP " \ + "app to run".format(curr_selected_queue_for_llap, curr_selected_queue_for_llap_cap_perc, min_reqd_queue_cap_perc) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg1)}) + else: + Logger.error("Couldn't retrieve '{0}' queue's capacity from 'capacity-scheduler' while doing validation checks for " + "Hive Server Interactive.".format(curr_selected_queue_for_llap)) + + # Validate that current selected queue in 'hive.llap.daemon.queue.name' state is not STOPPED. + llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, curr_selected_queue_for_llap) + if llap_selected_queue_state: + if llap_selected_queue_state == "STOPPED": + errMsg2 = "Selected queue '{0}' current state is : '{1}'. It is required to be in 'RUNNING' state for LLAP to run"\ + .format(curr_selected_queue_for_llap, llap_selected_queue_state) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getErrorItem(errMsg2)}) + else: + Logger.error("Couldn't retrieve '{0}' queue's state from 'capacity-scheduler' while doing validation checks for " + "Hive Server Interactive.".format(curr_selected_queue_for_llap)) + else: + Logger.error("Couldn't retrieve current selection for 'hive.llap.daemon.queue.name' while doing validation " + "checks for Hive Server Interactive.") + else: + Logger.error("Couldn't retrieve 'hive.llap.daemon.queue.name' config from 'hive-interactive-site' while doing " + "validation checks for Hive Server Interactive.") + pass + else: + Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing validation checks for Hive Server Interactive.") + pass + + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + # Validate that 'hive.server2.enable.doAs' config is not set to 'true' for Hive2. + if 'hive.server2.enable.doAs' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + hive2_enable_do_as = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.server2.enable.doAs'] + if hive2_enable_do_as == 'true': + validationItems.append({"config-name": "hive.server2.enable.doAs","item": self.getErrorItem("Value should be set to 'false' for Hive2.")}) + + # Validate that 'Maximum Total Concurrent Queries'(hive.server2.tez.sessions.per.default.queue) is not consuming more that + # 50% of selected queue for LLAP. + if current_selected_queue_for_llap_cap and 'hive.server2.tez.sessions.per.default.queue' in \ + services['configurations']['hive-interactive-site']['properties']: + num_tez_sessions = services['configurations']['hive-interactive-site']['properties']['hive.server2.tez.sessions.per.default.queue'] + if num_tez_sessions: + num_tez_sessions = long(num_tez_sessions) + yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) + normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) + llap_selected_queue_cap_remaining = current_selected_queue_for_llap_cap - (normalized_tez_am_container_size * num_tez_sessions) + if llap_selected_queue_cap_remaining <= current_selected_queue_for_llap_cap/2: + errMsg3 = " Reducing the 'Maximum Total Concurrent Queries' (value: {0}) is advisable as it is consuming more than 50% of " \ + "'{1}' queue for LLAP.".format(num_tez_sessions, curr_selected_queue_for_llap) + validationItems.append({"config-name": "hive.server2.tez.sessions.per.default.queue","item": self.getWarnItem(errMsg3)}) + + # Validate that 'remaining available capacity' in cluster is atleast 512 MB, after 'llap' queue is selected, + # in order to run Service Checks. + if curr_selected_queue_for_llap and curr_selected_queue_for_llap_cap_perc and \ + curr_selected_queue_for_llap == self.AMBARI_MANAGED_LLAP_QUEUE_NAME: + curr_selected_queue_for_llap_cap = float(curr_selected_queue_for_llap_cap_perc) / 100 * total_cluster_capacity + available_cap_in_cluster = total_cluster_capacity - curr_selected_queue_for_llap_cap + if available_cap_in_cluster < MIN_ASSUMED_CAP_REQUIRED_FOR_SERVICE_CHECKS: + errMsg4 = "Capacity used by '{0}' queue is '{1}'. Service checks may not run as remaining available capacity " \ + "({2}) in cluster is less than 512 MB.".format(self.AMBARI_MANAGED_LLAP_QUEUE_NAME, curr_selected_queue_for_llap_cap, available_cap_in_cluster) + validationItems.append({"config-name": "hive.llap.daemon.queue.name","item": self.getWarnItem(errMsg4)}) + + validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-site") + return validationProblems + + def validateHiveConfigurationsEnv(self, properties, recommendedDefaults, configurations, services, hosts): + parentValidationProblems = super(HDP25StackAdvisor, self).validateHiveConfigurationsEnv(properties, recommendedDefaults, configurations, services, hosts) + hive_site_properties = self.getSiteProperties(configurations, "hive-site") + hive_env_properties = self.getSiteProperties(configurations, "hive-env") + validationItems = [] + + if 'hive.server2.authentication' in hive_site_properties and hive_site_properties['hive.server2.authentication'] in ["LDAP","PAM"]: + if 'alert_ldap_username' not in hive_env_properties or hive_env_properties['alert_ldap_username'] == "": + validationItems.append({"config-name": "alert_ldap_username", + "item": self.getWarnItem( + "Provide an user to be used for alerts. Hive authentication type LDAP and PAM requires valid credentials for the alerts.")}) + if 'alert_ldap_password' not in hive_env_properties or hive_env_properties['alert_ldap_password'] == "": + validationItems.append({"config-name": "alert_ldap_password", + "item": self.getWarnItem( + "Provide the password for the alert user. Hive authentication type LDAP and PAM requires valid credentials for the alerts.")}) + + validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-env") + validationProblems.extend(parentValidationProblems) + return validationProblems + + def validateHiveInteractiveEnvConfigurations(self, properties, recommendedDefaults, configurations, services, hosts): + hive_site_env_properties = getSiteProperties(configurations, "hive-interactive-env") + validationItems = [] + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + if len(hsi_hosts) > 0: + # HIVE_SERVER_INTERACTIVE is mapped to a host + if 'enable_hive_interactive' not in hive_site_env_properties or ( + 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ + 'enable_hive_interactive'].lower() != 'true'): + validationItems.append({"config-name": "enable_hive_interactive", + "item": self.getErrorItem( + "HIVE_SERVER_INTERACTIVE requires enable_hive_interactive in hive-interactive-env set to true.")}) + if 'hive_server_interactive_host' in hive_site_env_properties: + hsi_host = hsi_hosts[0] + if hive_site_env_properties['hive_server_interactive_host'].lower() != hsi_host.lower(): + validationItems.append({"config-name": "hive_server_interactive_host", + "item": self.getErrorItem( + "HIVE_SERVER_INTERACTIVE requires hive_server_interactive_host in hive-interactive-env set to its host name.")}) + pass + if 'hive_server_interactive_host' not in hive_site_env_properties: + validationItems.append({"config-name": "hive_server_interactive_host", + "item": self.getErrorItem( + "HIVE_SERVER_INTERACTIVE requires hive_server_interactive_host in hive-interactive-env set to its host name.")}) + pass + + else: + # no HIVE_SERVER_INTERACTIVE + if 'enable_hive_interactive' in hive_site_env_properties and hive_site_env_properties[ + 'enable_hive_interactive'].lower() != 'false': + validationItems.append({"config-name": "enable_hive_interactive", + "item": self.getErrorItem( + "enable_hive_interactive in hive-interactive-env should be set to false.")}) + pass + pass + + validationProblems = self.toConfigurationValidationProblems(validationItems, "hive-interactive-env") + return validationProblems + + def getServiceConfigurationRecommenderDict(self): + parentRecommendConfDict = super(HDP25StackAdvisor, self).getServiceConfigurationRecommenderDict() + childRecommendConfDict = { + "RANGER": self.recommendRangerConfigurations, + "HBASE": self.recommendHBASEConfigurations, + "HIVE": self.recommendHIVEConfigurations, + "ATLAS": self.recommendAtlasConfigurations, + "RANGER_KMS": self.recommendRangerKMSConfigurations, + "STORM": self.recommendStormConfigurations, + "OOZIE": self.recommendOozieConfigurations, + "SPARK2": self.recommendSpark2Configurations + } + parentRecommendConfDict.update(childRecommendConfDict) + return parentRecommendConfDict + + def recommendSpark2Configurations(self, configurations, clusterData, services, hosts): + """ + :type configurations dict + :type clusterData dict + :type services dict + :type hosts dict + """ + putSparkProperty = self.putProperty(configurations, "spark2-defaults", services) + putSparkThriftSparkConf = self.putProperty(configurations, "spark2-thrift-sparkconf", services) + + spark_queue = self.recommendYarnQueue(services, "spark2-defaults", "spark.yarn.queue") + if spark_queue is not None: + putSparkProperty("spark.yarn.queue", spark_queue) + + spart_thrift_queue = self.recommendYarnQueue(services, "spark2-thrift-sparkconf", "spark.yarn.queue") + if spart_thrift_queue is not None: + putSparkThriftSparkConf("spark.yarn.queue", spart_thrift_queue) + + def recommendStormConfigurations(self, configurations, clusterData, services, hosts): + super(HDP25StackAdvisor, self).recommendStormConfigurations(configurations, clusterData, services, hosts) + storm_site = getServicesSiteProperties(services, "storm-site") + putStormSiteProperty = self.putProperty(configurations, "storm-site", services) + putStormSiteAttributes = self.putPropertyAttribute(configurations, "storm-site") + security_enabled = (storm_site is not None and "storm.zookeeper.superACL" in storm_site) + + if security_enabled: + _storm_principal_name = services['configurations']['storm-env']['properties']['storm_principal_name'] + storm_bare_jaas_principal = get_bare_principal(_storm_principal_name) + if 'nimbus.impersonation.acl' in storm_site: + storm_nimbus_impersonation_acl = storm_site["nimbus.impersonation.acl"] + storm_nimbus_impersonation_acl.replace('{{storm_bare_jaas_principal}}', storm_bare_jaas_principal) + putStormSiteProperty('nimbus.impersonation.acl', storm_nimbus_impersonation_acl) + rangerPluginEnabled = '' + if 'ranger-storm-plugin-properties' in configurations and 'ranger-storm-plugin-enabled' in configurations['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = configurations['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + elif 'ranger-storm-plugin-properties' in services['configurations'] and 'ranger-storm-plugin-enabled' in services['configurations']['ranger-storm-plugin-properties']['properties']: + rangerPluginEnabled = services['configurations']['ranger-storm-plugin-properties']['properties']['ranger-storm-plugin-enabled'] + + storm_authorizer_class = 'org.apache.storm.security.auth.authorizer.SimpleACLAuthorizer' + ranger_authorizer_class = 'org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer' + # Cluster is kerberized + if security_enabled: + if rangerPluginEnabled and (rangerPluginEnabled.lower() == 'Yes'.lower()): + putStormSiteProperty('nimbus.authorizer',ranger_authorizer_class) + elif rangerPluginEnabled and (rangerPluginEnabled.lower() == 'No'.lower()) and (services["configurations"]["storm-site"]["properties"]["nimbus.authorizer"] == ranger_authorizer_class): + putStormSiteProperty('nimbus.authorizer', storm_authorizer_class) + else: + putStormSiteAttributes('nimbus.authorizer', 'delete', 'true') + + def constructAtlasRestAddress(self, services, hosts): + """ + :param services: Collection of services in the cluster with configs + :param hosts: Collection of hosts in the cluster + :return: The suggested property for atlas.rest.address if it is valid, otherwise, None + """ + atlas_rest_address = None + services_list = [service["StackServices"]["service_name"] for service in services["services"]] + is_atlas_in_cluster = "ATLAS" in services_list + + atlas_server_hosts_info = self.getHostsWithComponent("ATLAS", "ATLAS_SERVER", services, hosts) + if is_atlas_in_cluster and atlas_server_hosts_info and len(atlas_server_hosts_info) > 0: + # Multiple Atlas Servers can exist, so sort by hostname to create deterministic csv + atlas_host_names = [e['Hosts']['host_name'] for e in atlas_server_hosts_info] + if len(atlas_host_names) > 1: + atlas_host_names = sorted(atlas_host_names) + + scheme = "http" + metadata_port = "21000" + atlas_server_default_https_port = "21443" + tls_enabled = "false" + if 'application-properties' in services['configurations']: + if 'atlas.enableTLS' in services['configurations']['application-properties']['properties']: + tls_enabled = services['configurations']['application-properties']['properties']['atlas.enableTLS'] + if 'atlas.server.http.port' in services['configurations']['application-properties']['properties']: + metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.http.port']) + + if str(tls_enabled).lower() == "true": + scheme = "https" + if 'atlas.server.https.port' in services['configurations']['application-properties']['properties']: + metadata_port = str(services['configurations']['application-properties']['properties']['atlas.server.https.port']) + else: + metadata_port = atlas_server_default_https_port + + atlas_rest_address_list = ["{0}://{1}:{2}".format(scheme, hostname, metadata_port) for hostname in atlas_host_names] + atlas_rest_address = ",".join(atlas_rest_address_list) + Logger.info("Constructing atlas.rest.address=%s" % atlas_rest_address) + return atlas_rest_address + + def recommendAtlasConfigurations(self, configurations, clusterData, services, hosts): + putAtlasApplicationProperty = self.putProperty(configurations, "application-properties", services) + putAtlasRangerPluginProperty = self.putProperty(configurations, "ranger-atlas-plugin-properties", services) + putAtlasEnvProperty = self.putProperty(configurations, "atlas-env", services) + + servicesList = [service["StackServices"]["service_name"] for service in services["services"]] + + # Generate atlas.rest.address since the value is always computed + atlas_rest_address = self.constructAtlasRestAddress(services, hosts) + if atlas_rest_address is not None: + putAtlasApplicationProperty("atlas.rest.address", atlas_rest_address) + + if "AMBARI_INFRA" in servicesList and 'infra-solr-env' in services['configurations']: + if 'infra_solr_znode' in services['configurations']['infra-solr-env']['properties']: + infra_solr_znode = services['configurations']['infra-solr-env']['properties']['infra_solr_znode'] + else: + infra_solr_znode = None + + zookeeper_hosts = self.getHostNamesWithComponent("ZOOKEEPER", "ZOOKEEPER_SERVER", services) + zookeeper_host_arr = [] + + zookeeper_port = self.getZKPort(services) + for i in range(len(zookeeper_hosts)): + zookeeper_host = zookeeper_hosts[i] + ':' + zookeeper_port + if infra_solr_znode is not None: + zookeeper_host += infra_solr_znode + zookeeper_host_arr.append(zookeeper_host) + + solr_zookeeper_url = ",".join(zookeeper_host_arr) + + putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', solr_zookeeper_url) + else: + putAtlasApplicationProperty('atlas.graph.index.search.solr.zookeeper-url', "") + + # Kafka section + if "KAFKA" in servicesList and 'kafka-broker' in services['configurations']: + kafka_hosts = self.getHostNamesWithComponent("KAFKA", "KAFKA_BROKER", services) + + if 'port' in services['configurations']['kafka-broker']['properties']: + kafka_broker_port = services['configurations']['kafka-broker']['properties']['port'] + else: + kafka_broker_port = '6667' + + kafka_host_arr = [] + for i in range(len(kafka_hosts)): + kafka_host_arr.append(kafka_hosts[i] + ':' + kafka_broker_port) + + kafka_bootstrap_servers = ",".join(kafka_host_arr) + + if 'zookeeper.connect' in services['configurations']['kafka-broker']['properties']: + kafka_zookeeper_connect = services['configurations']['kafka-broker']['properties']['zookeeper.connect'] + else: + kafka_zookeeper_connect = None + + putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', kafka_bootstrap_servers) + putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', kafka_zookeeper_connect) + else: + putAtlasApplicationProperty('atlas.kafka.bootstrap.servers', "") + putAtlasApplicationProperty('atlas.kafka.zookeeper.connect', "") + + if "HBASE" in servicesList and 'hbase-site' in services['configurations']: + if 'hbase.zookeeper.quorum' in services['configurations']['hbase-site']['properties']: + hbase_zookeeper_quorum = services['configurations']['hbase-site']['properties']['hbase.zookeeper.quorum'] + else: + hbase_zookeeper_quorum = "" + + putAtlasApplicationProperty('atlas.graph.storage.hostname', hbase_zookeeper_quorum) + putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', hbase_zookeeper_quorum) + else: + putAtlasApplicationProperty('atlas.graph.storage.hostname', "") + putAtlasApplicationProperty('atlas.audit.hbase.zookeeper.quorum', "") + + if "ranger-env" in services["configurations"] and "ranger-atlas-plugin-properties" in services["configurations"] and \ + "ranger-atlas-plugin-enabled" in services["configurations"]["ranger-env"]["properties"]: + ranger_atlas_plugin_enabled = services["configurations"]["ranger-env"]["properties"]["ranger-atlas-plugin-enabled"] + putAtlasRangerPluginProperty('ranger-atlas-plugin-enabled', ranger_atlas_plugin_enabled) + + ranger_atlas_plugin_enabled = '' + if 'ranger-atlas-plugin-properties' in configurations and 'ranger-atlas-plugin-enabled' in configurations['ranger-atlas-plugin-properties']['properties']: + ranger_atlas_plugin_enabled = configurations['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled'] + elif 'ranger-atlas-plugin-properties' in services['configurations'] and 'ranger-atlas-plugin-enabled' in services['configurations']['ranger-atlas-plugin-properties']['properties']: + ranger_atlas_plugin_enabled = services['configurations']['ranger-atlas-plugin-properties']['properties']['ranger-atlas-plugin-enabled'] + + if ranger_atlas_plugin_enabled and (ranger_atlas_plugin_enabled.lower() == 'Yes'.lower()): + putAtlasApplicationProperty('atlas.authorizer.impl','ranger') + else: + putAtlasApplicationProperty('atlas.authorizer.impl','simple') + + #atlas server memory settings + if 'atlas-env' in services['configurations']: + atlas_server_metadata_size = 50000 + if 'atlas_server_metadata_size' in services['configurations']['atlas-env']['properties']: + atlas_server_metadata_size = int(services['configurations']['atlas-env']['properties']['atlas_server_metadata_size']) + + atlas_server_xmx = 2048 + + if 300000 <= atlas_server_metadata_size < 500000: + atlas_server_xmx = 1024*5 + if 500000 <= atlas_server_metadata_size < 1000000: + atlas_server_xmx = 1024*10 + if atlas_server_metadata_size >= 1000000: + atlas_server_xmx = 1024*16 + + atlas_server_max_new_size = (atlas_server_xmx / 100) * 30 + + putAtlasEnvProperty("atlas_server_xmx", atlas_server_xmx) + putAtlasEnvProperty("atlas_server_max_new_size", atlas_server_max_new_size) + + def recommendHBASEConfigurations(self, configurations, clusterData, services, hosts): + super(HDP25StackAdvisor, self).recommendHBASEConfigurations(configurations, clusterData, services, hosts) + putHbaseSiteProperty = self.putProperty(configurations, "hbase-site", services) + appendCoreSiteProperty = self.updateProperty(configurations, "core-site", services) + + if "cluster-env" in services["configurations"] \ + and "security_enabled" in services["configurations"]["cluster-env"]["properties"] \ + and services["configurations"]["cluster-env"]["properties"]["security_enabled"].lower() == "true": + # Set the master's UI to readonly + putHbaseSiteProperty('hbase.master.ui.readonly', 'true') + + phoenix_query_server_hosts = self.get_phoenix_query_server_hosts(services, hosts) + Logger.debug("Calculated Phoenix Query Server hosts: %s" % str(phoenix_query_server_hosts)) + if phoenix_query_server_hosts: + Logger.debug("Attempting to update hadoop.proxyuser.HTTP.hosts with %s" % str(phoenix_query_server_hosts)) + # The PQS hosts we want to ensure are set + new_value = ','.join(phoenix_query_server_hosts) + # Compute the unique set of hosts for the property + def updateCallback(originalValue, newValue): + Logger.debug("Original hadoop.proxyuser.HTTP.hosts value %s, appending %s" % (originalValue, newValue)) + # Only update the original value if it's not whitespace only + if originalValue and not originalValue.isspace(): + hosts = originalValue.split(',') + # Add in the new hosts if we have some + if newValue and not newValue.isspace(): + hosts.extend(newValue.split(',')) + # Return the combined (uniqued) list of hosts + result = ','.join(set(hosts)) + Logger.debug("Setting final to %s" % result) + return result + else: + Logger.debug("Setting final value to %s" % newValue) + return newValue + # Update the proxyuser setting, deferring to out callback to merge results together + appendCoreSiteProperty('hadoop.proxyuser.HTTP.hosts', new_value, updateCallback) + else: + Logger.debug("No phoenix query server hosts to update") + else: + putHbaseSiteProperty('hbase.master.ui.readonly', 'false') + + """ + Returns the list of Phoenix Query Server host names, or None. + """ + def get_phoenix_query_server_hosts(self, services, hosts): + if len(hosts['items']) > 0: + phoenix_query_server_hosts = self.getHostsWithComponent("HBASE", "PHOENIX_QUERY_SERVER", services, hosts) + if phoenix_query_server_hosts is None: + return [] + return [host['Hosts']['host_name'] for host in phoenix_query_server_hosts] + + def recommendHIVEConfigurations(self, configurations, clusterData, services, hosts): + super(HDP25StackAdvisor, self).recommendHIVEConfigurations(configurations, clusterData, services, hosts) + putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) + putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) + putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") + + # For 'Hive Server Interactive', if the component exists. + hsi_hosts = self.__getHostsForComponent(services, "HIVE", "HIVE_SERVER_INTERACTIVE") + if len(hsi_hosts) > 0: + hsi_host = hsi_hosts[0] + putHiveInteractiveEnvProperty('enable_hive_interactive', 'true') + putHiveInteractiveEnvProperty('hive_server_interactive_host', hsi_host) + + # Update 'hive.llap.daemon.queue.name' property attributes if capacity scheduler is changed. + if self.HIVE_INTERACTIVE_SITE in services['configurations']: + if 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) + + # Update 'hive.server2.tez.default.queues' value + hive_tez_default_queue = None + if 'hive-interactive-site' in configurations and \ + 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: + hive_tez_default_queue = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + Logger.info("'hive.llap.daemon.queue.name' value from configurations : '{0}'".format(hive_tez_default_queue)) + if not hive_tez_default_queue: + hive_tez_default_queue = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + Logger.info("'hive.llap.daemon.queue.name' value from services : '{0}'".format(hive_tez_default_queue)) + if hive_tez_default_queue: + putHiveInteractiveSiteProperty("hive.server2.tez.default.queues", hive_tez_default_queue) + Logger.info("Updated 'hive.server2.tez.default.queues' config : '{0}'".format(hive_tez_default_queue)) + else: + putHiveInteractiveEnvProperty('enable_hive_interactive', 'false') + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.zk.sm.connectionString' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + # Fill the property 'hive.llap.zk.sm.connectionString' required by Hive Server Interactive (HiveServer2) + zookeeper_host_port = self.getZKHostPortString(services) + if zookeeper_host_port: + putHiveInteractiveSiteProperty("hive.llap.zk.sm.connectionString", zookeeper_host_port) + pass + + def recommendYARNConfigurations(self, configurations, clusterData, services, hosts): + super(HDP25StackAdvisor, self).recommendYARNConfigurations(configurations, clusterData, services, hosts) + + # Queue 'llap' creation/removal logic (Used by Hive Interactive server and associated LLAP) + if 'hive-interactive-env' in services['configurations'] and \ + 'enable_hive_interactive' in services['configurations']['hive-interactive-env']['properties']: + enable_hive_interactive = services['configurations']['hive-interactive-env']['properties']['enable_hive_interactive'] + LLAP_QUEUE_NAME = 'llap' + + # Hive Server interactive is already added or getting added + if enable_hive_interactive == 'true': + self.checkAndManageLlapQueue(services, configurations, hosts, LLAP_QUEUE_NAME) + self.updateLlapConfigs(configurations, services, hosts, LLAP_QUEUE_NAME) + else: # When Hive Interactive Server is in 'off/removed' state. + self.checkAndStopLlapQueue(services, configurations, LLAP_QUEUE_NAME) + + putYarnSiteProperty = self.putProperty(configurations, "yarn-site", services) + stack_root = "/usr/hdp" + if "cluster-env" in services["configurations"] and "stack_root" in services["configurations"]["cluster-env"]["properties"]: + stack_root = services["configurations"]["cluster-env"]["properties"]["stack_root"] + + timeline_plugin_classes_values = [] + timeline_plugin_classpath_values = [] + + if self.__isServiceDeployed(services, "TEZ"): + timeline_plugin_classes_values.append('org.apache.tez.dag.history.logging.ats.TimelineCachePluginImpl') + + if self.__isServiceDeployed(services, "SPARK"): + timeline_plugin_classes_values.append('org.apache.spark.deploy.history.yarn.plugin.SparkATSPlugin') + timeline_plugin_classpath_values.append(stack_root + "/${hdp.version}/spark/hdpLib/*") + + putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classes', ",".join(timeline_plugin_classes_values)) + putYarnSiteProperty('yarn.timeline-service.entity-group-fs-store.group-id-plugin-classpath', ":".join(timeline_plugin_classpath_values)) + + """ + Entry point for updating Hive's 'LLAP app' configs namely : (1). num_llap_nodes (2). hive.llap.daemon.yarn.container.mb + (3). hive.llap.daemon.num.executors (4). hive.llap.io.memory.size (5). llap_heap_size (6). slider_am_container_mb, + and (7). hive.server2.tez.sessions.per.default.queue + + The trigger point for updating LLAP configs (mentioned above) is change in values of any of the following: + (1). 'enable_hive_interactive' set to 'true' (2). 'llap_queue_capacity' (3). 'hive.server2.tez.sessions.per.default.queue' + (4). Change in queue selection for config 'hive.llap.daemon.queue.name'. + + If change in value for 'llap_queue_capacity' or 'hive.server2.tez.sessions.per.default.queue' is detected, that config + value is not calulated, but read and use in calculation for dependent configs. + """ + def updateLlapConfigs(self, configurations, services, hosts, llap_queue_name): + putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) + putHiveInteractiveSitePropertyAttribute = self.putPropertyAttribute(configurations, self.HIVE_INTERACTIVE_SITE) + + putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) + putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") + + putTezInteractiveSiteProperty = self.putProperty(configurations, "tez-interactive-site", services) + + llap_daemon_selected_queue_name = None + llap_queue_selected_in_current_call = None + LLAP_MAX_CONCURRENCY = 32 # Allow a max of 32 concurrency. + + # Update 'hive.llap.daemon.queue.name' prop combo entries and llap capacity slider visibility. + self.setLlapDaemonQueuePropAttributesAndCapSliderVisibility(services, configurations) + + if not services["changed-configurations"]: + read_llap_daemon_yarn_cont_mb = long(self.get_yarn_min_container_size(services, configurations)) + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', read_llap_daemon_yarn_cont_mb) + # initial memory setting to make sure hive.llap.daemon.yarn.container.mb >= yarn.scheduler.minimum-allocation-mb + Logger.info("Adjusted 'hive.llap.daemon.yarn.container.mb' to yarn min container size as initial size " + "(" + str(self.get_yarn_min_container_size(services, configurations)) + " MB).") + + try: + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + + if 'hive.llap.daemon.queue.name' in configurations[self.HIVE_INTERACTIVE_SITE]['properties']: + llap_queue_selected_in_current_call = configurations[self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + + # Update Visibility of 'llap_queue_capacity' slider. + capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if capacity_scheduler_properties: + # Get all leaf queues. + leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) + if len(leafQueueNames) == 2 and \ + (llap_daemon_selected_queue_name != None and llap_daemon_selected_queue_name == llap_queue_name) or \ + (llap_queue_selected_in_current_call != None and llap_queue_selected_in_current_call == llap_queue_name): + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "true") + Logger.info("Selected YARN queue is '{0}'. Setting LLAP queue capacity slider visibility to 'True'".format(llap_queue_name)) + else: + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + Logger.info("Queue selected for LLAP app is : '{0}'. Current YARN queues : {1}. Setting '{2}' queue capacity slider " + "visibility to 'False'.".format(llap_daemon_selected_queue_name, list(leafQueueNames), llap_queue_name)) + if llap_daemon_selected_queue_name: + llap_selected_queue_state = self.__getQueueStateFromCapacityScheduler(capacity_scheduler_properties, llap_daemon_selected_queue_name) + if llap_selected_queue_state == None or llap_selected_queue_state == "STOPPED": + putHiveInteractiveEnvPropertyAttribute("llap_queue_capacity", "visible", "false") + raise Fail("Selected LLAP app queue '{0}' current state is : '{1}'. Setting LLAP configs to default values " + "and 'llap' queue capacity slider visibility to 'False'." + .format(llap_daemon_selected_queue_name, llap_selected_queue_state)) + else: + raise Fail("Retrieved LLAP app queue name is : '{0}'. Setting LLAP configs to default values." + .format(llap_daemon_selected_queue_name)) + else: + Logger.error("Couldn't retrieve 'capacity-scheduler' properties while doing YARN queue adjustment for Hive Server Interactive." + " Not calculating LLAP configs.") + return + + changed_configs_in_hive_int_env = None + llap_concurrency_in_changed_configs = None + llap_daemon_queue_in_changed_configs = None + # Calculations are triggered only if there is change in any one of the following props : + # 'llap_queue_capacity', 'enable_hive_interactive', 'hive.server2.tez.sessions.per.default.queue' + # or 'hive.llap.daemon.queue.name' has change in value selection. + # OR + # services['changed-configurations'] is empty implying that this is the Blueprint call. (1st invocation) + if 'changed-configurations' in services.keys(): + config_names_to_be_checked = set(['llap_queue_capacity', 'enable_hive_interactive']) + changed_configs_in_hive_int_env = self.are_config_props_in_changed_configs(services, "hive-interactive-env", + config_names_to_be_checked, False) + + # Determine if there is change detected in "hive-interactive-site's" configs based on which we calculate llap configs. + llap_concurrency_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", + set(['hive.server2.tez.sessions.per.default.queue']), False) + llap_daemon_queue_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-site", + set(['hive.llap.daemon.queue.name']), False) + + if not changed_configs_in_hive_int_env and \ + not llap_concurrency_in_changed_configs and \ + not llap_daemon_queue_in_changed_configs and \ + services["changed-configurations"]: + Logger.info("LLAP parameters not modified. Not adjusting LLAP configs.") + Logger.info("Current 'changed-configuration' received is : {0}".format(services["changed-configurations"])) + return + + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) + yarn_nm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) + total_cluster_capacity = node_manager_cnt * yarn_nm_mem_in_mb + Logger.info("\n\nCalculated total_cluster_capacity : {0}, using following : node_manager_cnt : {1}, " + "yarn_nm_mem_in_mb : {2}".format(total_cluster_capacity, node_manager_cnt, yarn_nm_mem_in_mb)) + + # Check which queue is selected in 'hive.llap.daemon.queue.name', to determine current queue capacity + current_selected_queue_for_llap_cap = None + yarn_root_queues = capacity_scheduler_properties.get("yarn.scheduler.capacity.root.queues") + if llap_queue_selected_in_current_call == llap_queue_name \ + or llap_daemon_selected_queue_name == llap_queue_name \ + and (llap_queue_name in yarn_root_queues and len(leafQueueNames) == 2): + current_selected_queue_for_llap_cap_perc = self.get_llap_cap_percent_slider(services, configurations) + current_selected_queue_for_llap_cap = current_selected_queue_for_llap_cap_perc / 100 * total_cluster_capacity + else: # any queue other than 'llap' + current_selected_queue_for_llap_cap = self.__getSelectedQueueTotalCap(capacity_scheduler_properties, + llap_daemon_selected_queue_name, total_cluster_capacity) + assert (current_selected_queue_for_llap_cap >= 1), "Current selected queue '{0}' capacity value : {1}. Expected value : >= 1" \ + .format(llap_daemon_selected_queue_name, current_selected_queue_for_llap_cap) + yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_capacity)) + normalized_tez_am_container_size = self._normalizeUp(tez_am_container_size, yarn_min_container_size) + Logger.info("Calculated normalized_tez_am_container_size : {0}, using following : tez_am_container_size : {1}, " + "total_cluster_capacity : {2}".format(normalized_tez_am_container_size, tez_am_container_size, + total_cluster_capacity)) + normalized_selected_queue_for_llap_cap = long(self._normalizeDown(current_selected_queue_for_llap_cap, yarn_min_container_size)) + + # Get calculated value for Slider AM container Size + slider_am_container_size = self._normalizeUp(self.calculate_slider_am_size(yarn_min_container_size), + yarn_min_container_size) + + # Read 'hive.server2.tez.sessions.per.default.queue' prop if it's in changed-configs, else calculate it. + if not llap_concurrency_in_changed_configs: + # Calculate llap concurrency (i.e. Number of Tez AM's) + llap_concurrency = float(normalized_selected_queue_for_llap_cap * 0.25 / normalized_tez_am_container_size) + llap_concurrency = max(long(llap_concurrency), 1) + Logger.info("Calculated llap_concurrency : {0}, using following : normalized_selected_queue_for_llap_cap : {1}, " + "normalized_tez_am_container_size : {2}".format(llap_concurrency, normalized_selected_queue_for_llap_cap, + normalized_tez_am_container_size)) + # Limit 'llap_concurrency' to reach a max. of 32. + if llap_concurrency > LLAP_MAX_CONCURRENCY: + llap_concurrency = LLAP_MAX_CONCURRENCY + else: + # Read current value + if 'hive.server2.tez.sessions.per.default.queue' in services['configurations'][self.HIVE_INTERACTIVE_SITE][ + 'properties']: + llap_concurrency = long(services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties'][ + 'hive.server2.tez.sessions.per.default.queue']) + assert ( + llap_concurrency >= 1), "'hive.server2.tez.sessions.per.default.queue' current value : {0}. Expected value : >= 1" \ + .format(llap_concurrency) + else: + raise Fail( + "Couldn't retrieve Hive Server interactive's 'hive.server2.tez.sessions.per.default.queue' config.") + + + # Calculate 'total memory available for llap daemons' across cluster + total_am_capacity_required = normalized_tez_am_container_size * llap_concurrency + slider_am_container_size + cap_available_for_daemons = normalized_selected_queue_for_llap_cap - total_am_capacity_required + Logger.info( + "Calculated cap_available_for_daemons : {0}, using following : current_selected_queue_for_llap_cap : {1}, " + "yarn_nm_mem_in_mb : {2}, total_cluster_capacity : {3}, normalized_selected_queue_for_llap_cap : {4}, normalized_tez_am_container_size" + " : {5}, yarn_min_container_size : {6}, llap_concurrency : {7}, total_am_capacity_required : {8}" + .format(cap_available_for_daemons, current_selected_queue_for_llap_cap, yarn_nm_mem_in_mb, + total_cluster_capacity, + normalized_selected_queue_for_llap_cap, normalized_tez_am_container_size, yarn_min_container_size, llap_concurrency, + total_am_capacity_required)) + if cap_available_for_daemons < yarn_min_container_size: + raise Fail( + "'Capacity available for LLAP daemons'({0}) < 'YARN minimum container size'({1}). Invalid configuration detected. " + "Increase LLAP queue size.".format(cap_available_for_daemons, yarn_min_container_size)) + + + + # Calculate value for 'num_llap_nodes', an across cluster config. + # Also, get calculated value for 'hive.llap.daemon.yarn.container.mb' based on 'num_llap_nodes' value, a per node config. + num_llap_nodes_raw = cap_available_for_daemons / yarn_nm_mem_in_mb + if num_llap_nodes_raw < 1.00: + # Set the llap nodes to min. value of 1 and 'llap_container_size' to min. YARN allocation. + num_llap_nodes = 1 + llap_container_size = self._normalizeUp(cap_available_for_daemons, yarn_min_container_size) + Logger.info("Calculated llap_container_size : {0}, using following : cap_available_for_daemons : {1}, " + "yarn_min_container_size : {2}".format(llap_container_size, cap_available_for_daemons, + yarn_min_container_size)) + else: + num_llap_nodes = math.floor(num_llap_nodes_raw) + llap_container_size = self._normalizeDown(yarn_nm_mem_in_mb, yarn_min_container_size) + Logger.info("Calculated llap_container_size : {0}, using following : yarn_nm_mem_in_mb : {1}, " + "yarn_min_container_size : {2}".format(llap_container_size, yarn_nm_mem_in_mb, + yarn_min_container_size)) + Logger.info( + "Calculated num_llap_nodes : {0} using following : yarn_nm_mem_in_mb : {1}, cap_available_for_daemons : {2} " \ + .format(num_llap_nodes, yarn_nm_mem_in_mb, cap_available_for_daemons)) + + + # Calculate value for 'hive.llap.daemon.num.executors', a per node config. + hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) + if 'yarn.nodemanager.resource.cpu-vcores' in services['configurations']['yarn-site']['properties']: + cpu_per_nm_host = float(services['configurations']['yarn-site']['properties'][ + 'yarn.nodemanager.resource.cpu-vcores']) + assert (cpu_per_nm_host > 0), "'yarn.nodemanager.resource.cpu-vcores' current value : {0}. Expected value : > 0" \ + .format(cpu_per_nm_host) + else: + raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.cpu-vcores' config.") + + num_executors_per_node_raw = math.floor(llap_container_size / hive_tez_container_size) + num_executors_per_node = min(num_executors_per_node_raw, cpu_per_nm_host) + Logger.info("calculated num_executors_per_node: {0}, using following : hive_tez_container_size : {1}, " + "cpu_per_nm_host : {2}, num_executors_per_node_raw : {3}, llap_container_size : {4}" + .format(num_executors_per_node, hive_tez_container_size, cpu_per_nm_host, num_executors_per_node_raw, + llap_container_size)) + assert (num_executors_per_node >= 0), "'Number of executors per node' : {0}. Expected value : > 0".format( + num_executors_per_node) + + total_mem_for_executors = num_executors_per_node * hive_tez_container_size + + # Calculate value for 'cache' (hive.llap.io.memory.size), a per node config. + cache_size_per_node = llap_container_size - total_mem_for_executors + Logger.info( + "Calculated cache_size_per_node : {0} using following : hive_container_size : {1}, llap_container_size" + " : {2}, num_executors_per_node : {3}" + .format(cache_size_per_node, hive_tez_container_size, llap_container_size, num_executors_per_node)) + if cache_size_per_node < 0: # Run with '0' cache. + Logger.info( + "Calculated 'cache_size_per_node' : {0}. Setting 'cache_size_per_node' to 0.".format(cache_size_per_node)) + cache_size_per_node = 0 + + + # Calculate value for prop 'llap_heap_size' + llap_xmx = max(total_mem_for_executors * 0.8, total_mem_for_executors - self.get_llap_headroom_space(services, configurations)) + Logger.info("Calculated llap_app_heap_size : {0}, using following : hive_container_size : {1}, " + "total_mem_for_executors : {2}".format(llap_xmx, hive_tez_container_size, total_mem_for_executors)) + + + # Updating calculated configs. + normalized_tez_am_container_size = long(normalized_tez_am_container_size) + putTezInteractiveSiteProperty('tez.am.resource.memory.mb', normalized_tez_am_container_size) + Logger.info("'Tez for Hive2' config 'tez.am.resource.memory.mb' updated. Current: {0}".format( + normalized_tez_am_container_size)) + + if not llap_concurrency_in_changed_configs: + min_llap_concurrency = 1 + putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', llap_concurrency) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", + min_llap_concurrency) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", + LLAP_MAX_CONCURRENCY) + Logger.info( + "Hive2 config 'hive.server2.tez.sessions.per.default.queue' updated. Min : {0}, Current: {1}, Max: {2}" \ + .format(min_llap_concurrency, llap_concurrency, LLAP_MAX_CONCURRENCY)) + + num_llap_nodes = long(num_llap_nodes) + + putHiveInteractiveEnvProperty('num_llap_nodes', num_llap_nodes) + Logger.info("LLAP config 'num_llap_nodes' updated. Current: {0}".format(num_llap_nodes)) + + llap_container_size = long(llap_container_size) + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', llap_container_size) + Logger.info("LLAP config 'hive.llap.daemon.yarn.container.mb' updated. Current: {0}".format(llap_container_size)) + + num_executors_per_node = long(num_executors_per_node) + putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', num_executors_per_node) + Logger.info("LLAP config 'hive.llap.daemon.num.executors' updated. Current: {0}".format(num_executors_per_node)) + # 'hive.llap.io.threadpool.size' config value is to be set same as value calculated for + # 'hive.llap.daemon.num.executors' at all times. + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', num_executors_per_node) + Logger.info("LLAP config 'hive.llap.io.threadpool.size' updated. Current: {0}".format(num_executors_per_node)) + + cache_size_per_node = long(cache_size_per_node) + putHiveInteractiveSiteProperty('hive.llap.io.memory.size', cache_size_per_node) + Logger.info("LLAP config 'hive.llap.io.memory.size' updated. Current: {0}".format(cache_size_per_node)) + llap_io_enabled = 'false' + if cache_size_per_node >= 64: + llap_io_enabled = 'true' + + putHiveInteractiveSiteProperty('hive.llap.io.enabled', llap_io_enabled) + Logger.info("Hive2 config 'hive.llap.io.enabled' updated to '{0}' as part of " + "'hive.llap.io.memory.size' calculation.".format(llap_io_enabled)) + + llap_xmx = long(llap_xmx) + putHiveInteractiveEnvProperty('llap_heap_size', llap_xmx) + Logger.info("LLAP config 'llap_heap_size' updated. Current: {0}".format(llap_xmx)) + + slider_am_container_size = long(slider_am_container_size) + putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) + Logger.info("LLAP config 'slider_am_container_mb' updated. Current: {0}".format(slider_am_container_size)) + + except Exception as e: + # Set default values, if caught an Exception. The 'llap queue capacity' is left untouched, as it can be increased, + # triggerring recalculation. + Logger.info(e.message+" Skipping calculating LLAP configs. Setting them to minimum values.") + traceback.print_exc() + + try: + yarn_min_container_size = long(self.get_yarn_min_container_size(services, configurations)) + slider_am_container_size = long(self.calculate_slider_am_size(yarn_min_container_size)) + + node_manager_host_list = self.get_node_manager_hosts(services, hosts) + node_manager_cnt = len(node_manager_host_list) + + putHiveInteractiveSiteProperty('hive.server2.tez.sessions.per.default.queue', 1) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "minimum", 1) + putHiveInteractiveSitePropertyAttribute('hive.server2.tez.sessions.per.default.queue', "maximum", 32) + + putHiveInteractiveEnvProperty('num_llap_nodes', 0) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "minimum", 1) + putHiveInteractiveEnvPropertyAttribute('num_llap_nodes', "maximum", node_manager_cnt) + + putHiveInteractiveSiteProperty('hive.llap.daemon.yarn.container.mb', yarn_min_container_size) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.yarn.container.mb', "minimum", yarn_min_container_size) + + putHiveInteractiveSiteProperty('hive.llap.daemon.num.executors', 0) + putHiveInteractiveSitePropertyAttribute('hive.llap.daemon.num.executors', "minimum", 1) + + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) + + putHiveInteractiveSiteProperty('hive.llap.io.threadpool.size', 0) + + putHiveInteractiveSiteProperty('hive.llap.io.memory.size', 0) + + putHiveInteractiveEnvProperty('llap_heap_size', 0) + + putHiveInteractiveEnvProperty('slider_am_container_mb', slider_am_container_size) + + except Exception as e: + Logger.info("Problem setting minimum values for LLAP configs in Exception code.") + traceback.print_exc() + + """ + Checks for the presence of passed-in configuration properties in a given config, if they are changed. + Reads from services["changed-configurations"]. + Parameters: + services: Configuration information for the cluster + config_type : Type of the configuration + config_names_set : Set of configuration properties to be checked if they are changed. + all_exists: If True : returns True only if all properties mentioned in 'config_names_set' we found + in services["changed-configurations"]. + Otherwise, returns False. + If False : return True, if any of the properties mentioned in config_names_set we found in + services["changed-configurations"]. + Otherwise, returns False. + """ + def are_config_props_in_changed_configs(self, services, config_type, config_names_set, all_exists): + changedConfigs = services["changed-configurations"] + changed_config_names_set = set() + for changedConfig in changedConfigs: + if changedConfig['type'] == config_type: + changed_config_names_set.update([changedConfig['name']]) + + if changed_config_names_set is None: + return False + else: + configs_intersection = changed_config_names_set.intersection(config_names_set) + if all_exists: + if configs_intersection == config_names_set: + return True + else: + if len(configs_intersection) > 0 : + return True + return False + + """ + Returns all the Node Manager configs in cluster. + """ + def get_node_manager_hosts(self, services, hosts): + if len(hosts["items"]) > 0: + node_manager_hosts = self.getHostsWithComponent("YARN", "NODEMANAGER", services, hosts) + assert (node_manager_hosts is not None), "Information about NODEMANAGER not found in cluster." + return node_manager_hosts + + """ + Returns the current LLAP queue capacity percentage value. (llap_queue_capacity) + """ + def get_llap_cap_percent_slider(self, services, configurations): + llap_slider_cap_percentage = 0 + if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: + llap_slider_cap_percentage = float( + services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) + Logger.error("'llap_queue_capacity' not present in services['configurations']['hive-interactive-env']['properties'].") + if llap_slider_cap_percentage <= 0 : + if 'hive-interactive-env' in configurations and \ + 'llap_queue_capacity' in configurations["hive-interactive-env"]["properties"]: + llap_slider_cap_percentage = float(configurations["hive-interactive-env"]["properties"]["llap_queue_capacity"]) + assert (llap_slider_cap_percentage > 0), "'llap_queue_capacity' is set to : {0}. Should be > 0.".format(llap_slider_cap_percentage) + return llap_slider_cap_percentage + + + """ + Returns current value of number of LLAP nodes in cluster (num_llap_nodes) + """ + def get_num_llap_nodes(self, services): + if 'num_llap_nodes' in services['configurations']['hive-interactive-env']['properties']: + num_llap_nodes = float( + services['configurations']['hive-interactive-env']['properties']['num_llap_nodes']) + assert (num_llap_nodes > 0), "Number of LLAP nodes read : {0}. Expected value : > 0".format( + num_llap_nodes) + return num_llap_nodes + else: + raise Fail("Couldn't retrieve Hive Server interactive's 'num_llap_nodes' config.") + + """ + Gets HIVE Tez container size (hive.tez.container.size). Takes into account if it has been calculated as part of current + Stack Advisor invocation. + """ + def get_hive_tez_container_size(self, services, configurations): + hive_container_size = None + # Check if 'hive.tez.container.size' is modified in current ST invocation. + if 'hive-site' in configurations and 'hive.tez.container.size' in configurations['hive-site']['properties']: + hive_container_size = float(configurations['hive-site']['properties']['hive.tez.container.size']) + Logger.info("'hive.tez.container.size' read from configurations as : {0}".format(hive_container_size)) + + if not hive_container_size: + # Check if 'hive.tez.container.size' is input in services array. + if 'hive.tez.container.size' in services['configurations']['hive-site']['properties']: + hive_container_size = float(services['configurations']['hive-site']['properties']['hive.tez.container.size']) + Logger.info("'hive.tez.container.size' read from services as : {0}".format(hive_container_size)) + if not hive_container_size: + raise Fail("Couldn't retrieve Hive Server 'hive.tez.container.size' config.") + + assert (hive_container_size > 0), "'hive.tez.container.size' current value : {0}. Expected value : > 0".format( + hive_container_size) + + return hive_container_size + + """ + Gets HIVE Server Interactive's 'llap_headroom_space' config. (Default value set to 6144 bytes). + """ + def get_llap_headroom_space(self, services, configurations): + llap_headroom_space = None + # Check if 'llap_headroom_space' is modified in current SA invocation. + if 'hive-interactive-env' in configurations and 'llap_headroom_space' in configurations['hive-interactive-env']['properties']: + hive_container_size = float(configurations['hive-interactive-env']['properties']['llap_headroom_space']) + Logger.info("'llap_headroom_space' read from configurations as : {0}".format(llap_headroom_space)) + + if not llap_headroom_space: + # Check if 'llap_headroom_space' is input in services array. + if 'llap_headroom_space' in services['configurations']['hive-interactive-env']['properties']: + llap_headroom_space = float(services['configurations']['hive-interactive-env']['properties']['llap_headroom_space']) + Logger.info("'llap_headroom_space' read from services as : {0}".format(llap_headroom_space)) + if not llap_headroom_space or llap_headroom_space < 1: + llap_headroom_space = 6144 # 6GB + Logger.info("Couldn't read 'llap_headroom_space' from services or configurations. Returing default value : 6144 bytes") + + return llap_headroom_space + + """ + Gets YARN's minimum container size (yarn.scheduler.minimum-allocation-mb). + Reads from: + - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"] + is empty, else + - services['configurations'] (input). + + services["changed-configurations"] would be empty if Stack Advisor call is made from Blueprints (1st invocation). Subsequent + Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this + value (configurations), it is finally not recommended, making 'input' value to survive. + """ + def get_yarn_min_container_size(self, services, configurations): + yarn_min_container_size = None + # Check if services["changed-configurations"] is empty and 'yarn.scheduler.minimum-allocation-mb' is modified in current ST invocation. + if not services["changed-configurations"] and \ + 'yarn-site' in configurations and 'yarn.scheduler.minimum-allocation-mb' in configurations['yarn-site']['properties']: + yarn_min_container_size = float(configurations['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb']) + Logger.info("'yarn.scheduler.minimum-allocation-mb' read from configurations as : {0}".format(yarn_min_container_size)) + + if not yarn_min_container_size: + # Check if 'yarn.scheduler.minimum-allocation-mb' is input in services array. + if 'yarn-site' in services['configurations'] and \ + 'yarn.scheduler.minimum-allocation-mb' in services['configurations']['yarn-site']['properties']: + yarn_min_container_size = float(services['configurations']['yarn-site']['properties']['yarn.scheduler.minimum-allocation-mb']) + Logger.info("'yarn.scheduler.minimum-allocation-mb' read from services as : {0}".format(yarn_min_container_size)) + + if not yarn_min_container_size: + raise Fail("Couldn't retrieve YARN's 'yarn.scheduler.minimum-allocation-mb' config.") + + assert (yarn_min_container_size > 0), "'yarn.scheduler.minimum-allocation-mb' current value : {0}. " \ + "Expected value : > 0".format(yarn_min_container_size) + return yarn_min_container_size + + """ + Calculates the Slider App Master size based on YARN's Minimum Container Size. + """ + def calculate_slider_am_size(self, yarn_min_container_size): + if yarn_min_container_size > 1024: + return 1024 + if yarn_min_container_size >= 256 and yarn_min_container_size <= 1024: + return yarn_min_container_size + if yarn_min_container_size < 256: + return 256 + + """ + Gets YARN NodeManager memory in MB (yarn.nodemanager.resource.memory-mb). + Reads from: + - configurations (if changed as part of current Stack Advisor invocation (output)), and services["changed-configurations"] + is empty, else + - services['configurations'] (input). + + services["changed-configurations"] would be empty is Stack Advisor call if made from Blueprints (1st invocation). Subsequent + Stack Advisor calls will have it non-empty. We do this because in subsequent invocations, even if Stack Advsior calculates this + value (configurations), it is finally not recommended, making 'input' value to survive. + """ + def get_yarn_nm_mem_in_mb(self, services, configurations): + yarn_nm_mem_in_mb = None + + # Check if services["changed-configurations"] is empty and 'yarn.nodemanager.resource.memory-mb' is modified in current ST invocation. + if not services["changed-configurations"] and\ + 'yarn-site' in configurations and 'yarn.nodemanager.resource.memory-mb' in configurations['yarn-site']['properties']: + yarn_nm_mem_in_mb = float(configurations['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) + Logger.info("'yarn.nodemanager.resource.memory-mb' read from configurations as : {0}".format(yarn_nm_mem_in_mb)) + + if not yarn_nm_mem_in_mb: + # Check if 'yarn.nodemanager.resource.memory-mb' is input in services array. + if 'yarn-site' in services['configurations'] and \ + 'yarn.nodemanager.resource.memory-mb' in services['configurations']['yarn-site']['properties']: + yarn_nm_mem_in_mb = float(services['configurations']['yarn-site']['properties']['yarn.nodemanager.resource.memory-mb']) + Logger.info("'yarn.nodemanager.resource.memory-mb' read from services as : {0}".format(yarn_nm_mem_in_mb)) + + if not yarn_nm_mem_in_mb: + raise Fail("Couldn't retrieve YARN's 'yarn.nodemanager.resource.memory-mb' config.") + + assert (yarn_nm_mem_in_mb > 0.0), "'yarn.nodemanager.resource.memory-mb' current value : {0}. " \ + "Expected value : > 0".format(yarn_nm_mem_in_mb) + + return yarn_nm_mem_in_mb + + """ + Determines Tez App Master container size (tez.am.resource.memory.mb) for tez_hive2/tez-site based on total cluster capacity. + """ + def calculate_tez_am_container_size(self, total_cluster_capacity): + if total_cluster_capacity is None or not isinstance(total_cluster_capacity, long): + raise Fail ("Passed-in 'Total Cluster Capacity' is : '{0}'".format(total_cluster_capacity)) + + if total_cluster_capacity <= 0: + raise Fail ("Passed-in 'Total Cluster Capacity' ({0}) is Invalid.".format(total_cluster_capacity)) + if total_cluster_capacity <= 4096: + return 256 + elif total_cluster_capacity > 4096 and total_cluster_capacity <= 73728: + return 512 + elif total_cluster_capacity > 73728: + return 1536 + + + """ + Calculate minimum queue capacity required in order to get LLAP and HIVE2 app into running state. + """ + def min_queue_perc_reqd_for_llap_and_hive_app(self, services, hosts, configurations): + # Get queue size if sized at 20% + node_manager_hosts = self.get_node_manager_hosts(services, hosts) + yarn_rm_mem_in_mb = self.get_yarn_nm_mem_in_mb(services, configurations) + total_cluster_cap = len(node_manager_hosts) * yarn_rm_mem_in_mb + total_queue_size_at_20_perc = 20.0 / 100 * total_cluster_cap + + # Calculate based on minimum size required by containers. + yarn_min_container_size = self.get_yarn_min_container_size(services, configurations) + slider_am_size = self.calculate_slider_am_size(yarn_min_container_size) + hive_tez_container_size = self.get_hive_tez_container_size(services, configurations) + tez_am_container_size = self.calculate_tez_am_container_size(long(total_cluster_cap)) + normalized_val = self._normalizeUp(slider_am_size, yarn_min_container_size) + self._normalizeUp\ + (hive_tez_container_size, yarn_min_container_size) + self._normalizeUp(tez_am_container_size, yarn_min_container_size) + + min_required = max(total_queue_size_at_20_perc, normalized_val) + + min_required_perc = min_required * 100 / total_cluster_cap + Logger.info("Calculated 'min_queue_perc_required_in_cluster' : {0}% and 'min_queue_cap_required_in_cluster': {1} " + "for LLAP and HIVE2 app using following : yarn_min_container_size : {2}, slider_am_size : {3}, hive_tez_container_size : {4}, " + "hive_am_container_size : {5}, total_cluster_cap : {6}, yarn_rm_mem_in_mb : {7}" + "".format(min_required_perc, min_required, yarn_min_container_size, slider_am_size, hive_tez_container_size, + tez_am_container_size, total_cluster_cap, yarn_rm_mem_in_mb)) + return int(math.ceil(min_required_perc)) + + """ + Normalize down 'val2' with respect to 'val1'. + """ + def _normalizeDown(self, val1, val2): + tmp = math.floor(val1 / val2) + if tmp < 1.00: + return val1 + return tmp * val2 + + """ + Normalize up 'val2' with respect to 'val1'. + """ + def _normalizeUp(self, val1, val2): + tmp = math.ceil(val1 / val2) + return tmp * val2 + + """ + Checks and (1). Creates 'llap' queue if only 'default' queue exist at leaf level and is consuming 100% capacity OR + (2). Updates 'llap' queue capacity and state, if current selected queue is 'llap', and only 2 queues exist + at root level : 'default' and 'llap'. + """ + def checkAndManageLlapQueue(self, services, configurations, hosts, llap_queue_name): + Logger.info("Determining creation/adjustment of 'capacity-scheduler' for 'llap' queue.") + putHiveInteractiveEnvProperty = self.putProperty(configurations, "hive-interactive-env", services) + putHiveInteractiveSiteProperty = self.putProperty(configurations, self.HIVE_INTERACTIVE_SITE, services) + putHiveInteractiveEnvPropertyAttribute = self.putPropertyAttribute(configurations, "hive-interactive-env") + putCapSchedProperty = self.putProperty(configurations, "capacity-scheduler", services) + leafQueueNames = None + + capacity_scheduler_properties, received_as_key_value_pair = self.getCapacitySchedulerProperties(services) + if capacity_scheduler_properties: + leafQueueNames = self.getAllYarnLeafQueues(capacity_scheduler_properties) + # Get the llap Cluster percentage used for 'llap' Queue creation + if 'llap_queue_capacity' in services['configurations']['hive-interactive-env']['properties']: + llap_slider_cap_percentage = int( + services['configurations']['hive-interactive-env']['properties']['llap_queue_capacity']) + min_reqd_queue_cap_perc = self.min_queue_perc_reqd_for_llap_and_hive_app(services, hosts, configurations) + if min_reqd_queue_cap_perc > 100: + min_reqd_queue_cap_perc = 100 + Logger.info("Received 'Minimum Required LLAP queue capacity' : {0}% (out of bounds), adjusted it to : 100%".format(min_reqd_queue_cap_perc)) + + # Adjust 'llap' queue capacity slider value to be minimum required if out of expected bounds. + if llap_slider_cap_percentage <= 0 or llap_slider_cap_percentage > 100: + Logger.info("Adjusting HIVE 'llap_queue_capacity' from {0}% (invalid size) to {1}%".format(llap_slider_cap_percentage, min_reqd_queue_cap_perc)) + putHiveInteractiveEnvProperty('llap_queue_capacity', min_reqd_queue_cap_perc) + llap_slider_cap_percentage = min_reqd_queue_cap_perc + else: + Logger.error("Problem retrieving LLAP Queue Capacity. Skipping creating {0} queue".format(llap_queue_name)) + return + + cap_sched_config_keys = capacity_scheduler_properties.keys() + + yarn_default_queue_capacity = -1 + if 'yarn.scheduler.capacity.root.default.capacity' in cap_sched_config_keys: + yarn_default_queue_capacity = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.default.capacity') + + # Get 'llap' queue state + currLlapQueueState = '' + if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.state' in cap_sched_config_keys: + currLlapQueueState = capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.state') + + # Get 'llap' queue capacity + currLlapQueueCap = -1 + if 'yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity' in cap_sched_config_keys: + currLlapQueueCap = int(float(capacity_scheduler_properties.get('yarn.scheduler.capacity.root.'+llap_queue_name+'.capacity'))) + + if self.HIVE_INTERACTIVE_SITE in services['configurations'] and \ + 'hive.llap.daemon.queue.name' in services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']: + llap_daemon_selected_queue_name = services['configurations'][self.HIVE_INTERACTIVE_SITE]['properties']['hive.llap.daemon.queue.name'] + else: + Logger.error("Couldn't retrive 'hive.llap.daemon.queue.name' property. Skipping adjusting queues.") + return + updated_cap_sched_configs_str = '' + + enabled_hive_int_in_changed_configs = self.are_config_props_in_changed_configs(services, "hive-interactive-env", + set(['enable_hive_interactive']), False) + """ + We create OR "modify 'llap' queue 'state and/or capacity' " based on below conditions: + - if only 1 queue exists at root level and is 'default' queue and has 100% cap -> Create 'llap' queue, OR + - if 2 queues exists at root level ('llap' and 'default') : + - Queue selected is 'llap' and state is STOPPED -> Modify 'llap' queue state to RUNNING, adjust capacity, OR + - Queue selected is 'llap', state is RUNNING and 'llap_queue_capacity' prop != 'llap' queue current running capacity -> + Modify 'llap' queue capacity to 'llap_queue_capacity' + """ + if 'default' in leafQueueNames and \ + ((len(leafQueueNames) == 1 and int(yarn_default_queue_capacity) == 100) or \ + ((len(leafQueueNames) == 2 and llap_queue_name in leafQueueNames) and \ + ((currLlapQueueState == 'STOPPED' and enabled_hive_int_in_changed_configs) or (currLlapQueueState == 'RUNNING' and currLlapQueueCap != llap_slider_cap_percentage)))): + adjusted_default_queue_cap = str(100 - llap_slider_cap_percentage) + + hive_user = '*' # Open to all + if 'hive_user' in services['configurations']['hive-env']['properties']: + hive_user = services['configurations']['hive-env']['properties']['hive_user'] + + llap_slider_cap_percentage = str(llap_slider_cap_percenta
<TRUNCATED>