AMBARI-20910. HDP 3.0 TP - Unable to install Spark, cannot find package/scripts dir (alejandro)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/4b588a92 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/4b588a92 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/4b588a92 Branch: refs/heads/trunk Commit: 4b588a9237a72465f3ca83c207a8d4234d9c4c12 Parents: b3f7d9e Author: Alejandro Fernandez <afernan...@hortonworks.com> Authored: Mon May 1 19:24:22 2017 -0700 Committer: Alejandro Fernandez <afernan...@hortonworks.com> Committed: Tue May 2 13:51:13 2017 -0700 ---------------------------------------------------------------------- .../2.2.0/package/scripts/job_history_server.py | 108 ++++++++ .../SPARK/2.2.0/package/scripts/livy_server.py | 151 +++++++++++ .../SPARK/2.2.0/package/scripts/livy_service.py | 48 ++++ .../SPARK/2.2.0/package/scripts/params.py | 268 +++++++++++++++++++ .../2.2.0/package/scripts/service_check.py | 62 +++++ .../SPARK/2.2.0/package/scripts/setup_livy.py | 88 ++++++ .../SPARK/2.2.0/package/scripts/setup_spark.py | 116 ++++++++ .../SPARK/2.2.0/package/scripts/spark_client.py | 62 +++++ .../2.2.0/package/scripts/spark_service.py | 146 ++++++++++ .../package/scripts/spark_thrift_server.py | 91 +++++++ .../2.2.0/package/scripts/status_params.py | 45 ++++ .../SPARK/2.2.0/scripts/job_history_server.py | 108 -------- .../SPARK/2.2.0/scripts/livy_server.py | 151 ----------- .../SPARK/2.2.0/scripts/livy_service.py | 48 ---- .../SPARK/2.2.0/scripts/params.py | 268 ------------------- .../SPARK/2.2.0/scripts/service_check.py | 62 ----- .../SPARK/2.2.0/scripts/setup_livy.py | 88 ------ .../SPARK/2.2.0/scripts/setup_spark.py | 116 -------- .../SPARK/2.2.0/scripts/spark_client.py | 62 ----- .../SPARK/2.2.0/scripts/spark_service.py | 146 ---------- .../SPARK/2.2.0/scripts/spark_thrift_server.py | 91 ------- .../SPARK/2.2.0/scripts/status_params.py | 45 ---- 22 files changed, 1185 insertions(+), 1185 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/job_history_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/job_history_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/job_history_server.py new file mode 100644 index 0000000..3937c88 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/job_history_server.py @@ -0,0 +1,108 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.copy_tarball import copy_to_hdfs +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.core.logger import Logger +from resource_management.core import shell +from setup_spark import * +from spark_service import spark_service + + +class JobHistoryServer(Script): + + def install(self, env): + import params + env.set_params(params) + + self.install_packages(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + + setup_spark(env, 'server', upgrade_type=upgrade_type, action = 'config') + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + self.configure(env) + spark_service('jobhistoryserver', upgrade_type=upgrade_type, action='start') + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + spark_service('jobhistoryserver', upgrade_type=upgrade_type, action='stop') + + def status(self, env): + import status_params + env.set_params(status_params) + + check_process_status(status_params.spark_history_server_pid_file) + + + def get_component_name(self): + # TODO, change to "spark" after RPM switches the name + return "spark2-historyserver" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + Logger.info("Executing Spark Job History Server Stack Upgrade pre-restart") + # TODO, change to "spark" after RPM switches the name + conf_select.select(params.stack_name, "spark2", params.version) + stack_select.select("spark2-historyserver", params.version) + + # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not + # need to copy the tarball, otherwise, copy it. + if params.version and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.version): + resource_created = copy_to_hdfs( + "tez", + params.user_group, + params.hdfs_user, + skip=params.sysprep_skip_copy_tarballs_hdfs) + if resource_created: + params.HdfsResource(None, action="execute") + + def get_log_folder(self): + import params + return params.spark_log_dir + + def get_user(self): + import params + return params.spark_user + + def get_pid_files(self): + import status_params + return [status_params.spark_history_server_pid_file] + +if __name__ == "__main__": + JobHistoryServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_server.py new file mode 100644 index 0000000..269c97d --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_server.py @@ -0,0 +1,151 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.core.exceptions import Fail +from resource_management.core.resources.system import Execute +from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil +from resource_management.libraries.providers.hdfs_resource import HdfsResourceProvider +from resource_management import is_empty +from resource_management import shell +from resource_management.libraries.functions.decorator import retry +from resource_management.core.logger import Logger +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions import conf_select, stack_select + +from livy_service import livy_service +from setup_livy import setup_livy + +class LivyServer(Script): + + def install(self, env): + import params + env.set_params(params) + + self.install_packages(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + + setup_livy(env, 'server', upgrade_type=upgrade_type, action = 'config') + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + if params.has_ats and params.has_livyserver: + Logger.info("Verifying DFS directories where ATS stores time line data for active and completed applications.") + self.wait_for_dfs_directories_created([params.entity_groupfs_store_dir, params.entity_groupfs_active_dir]) + + self.configure(env) + livy_service('server', upgrade_type=upgrade_type, action='start') + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + + livy_service('server', upgrade_type=upgrade_type, action='stop') + + def status(self, env): + import status_params + env.set_params(status_params) + + check_process_status(status_params.livy_server_pid_file) + + # TODO move out and compose with similar method in resourcemanager.py + def wait_for_dfs_directories_created(self, dirs): + import params + + ignored_dfs_dirs = HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file) + + if params.security_enabled: + Execute(format("{kinit_path_local} -kt {livy_kerberos_keytab} {livy_principal}"), + user=params.livy_user + ) + Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), + user=params.hdfs_user + ) + + for dir_path in dirs: + self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs) + + def get_pid_files(self): + import status_params + return [status_params.livy_server_pid_file] + + + @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail) + def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): + import params + + if not is_empty(dir_path): + dir_path = HdfsResourceProvider.parse_path(dir_path) + + if dir_path in ignored_dfs_dirs: + Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked to be ignored.") + return + + Logger.info("Verifying if DFS directory '" + dir_path + "' exists.") + + dir_exists = None + + if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): + # check with webhdfs is much faster than executing hdfs dfs -test + util = WebHDFSUtil(params.hdfs_site, params.hdfs_user, params.security_enabled) + list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) + dir_exists = ('FileStatus' in list_status) + else: + # have to do time expensive hdfs dfs -d check. + dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs -test -d " + dir_path), user=params.livy_user)[0] + dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir exists + + if not dir_exists: + raise Fail("DFS directory '" + dir_path + "' does not exist !") + else: + Logger.info("DFS directory '" + dir_path + "' exists.") + + def get_component_name(self): + # TODO, change to "livy" after RPM switches the name + return "livy2-server" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + Logger.info("Executing Livy Server Stack Upgrade pre-restart") + # TODO, change to "spark" and "livy" after RPM switches the name + conf_select.select(params.stack_name, "spark2", params.version) + stack_select.select("livy2-server", params.version) + + def get_log_folder(self): + import params + return params.livy_log_dir + + def get_user(self): + import params + return params.livy_user +if __name__ == "__main__": + LivyServer().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_service.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_service.py new file mode 100644 index 0000000..45201db --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/livy_service.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +from resource_management.libraries.functions import format +from resource_management.core.resources.system import File, Execute +import threading + +def livy_service(name, upgrade_type=None, action=None): + import params + + if action == 'start': + livyserver_no_op_test = format( + 'ls {livy_server_pid_file} >/dev/null 2>&1 && ps -p `cat {livy_server_pid_file}` >/dev/null 2>&1') + Execute(format('{livy_server_start}'), + user=params.livy_user, + environment={'JAVA_HOME': params.java_home}, + not_if=livyserver_no_op_test + ) + + elif action == 'stop': + Execute(format('{livy_server_stop}'), + user=params.livy_user, + environment={'JAVA_HOME': params.java_home} + ) + File(params.livy_server_pid_file, + action="delete" + ) + + + + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/params.py new file mode 100644 index 0000000..e60cab5 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/params.py @@ -0,0 +1,268 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import socket + +import status_params +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.copy_tarball import get_sysprep_skip_copy_tarballs_hdfs +from resource_management.libraries.functions.format import format +from resource_management.libraries.functions.default import default +from resource_management.libraries.functions import get_kinit_path +from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.script.script import Script + +# a map of the Ambari role to the component name +# for use with <stack-root>/current/<component> +# TODO, change to "spark" and "livy" after RPM switches the name +SERVER_ROLE_DIRECTORY_MAP = { + 'SPARK_JOBHISTORYSERVER' : 'spark2-historyserver', + 'SPARK_CLIENT' : 'spark2-client', + 'SPARK_THRIFTSERVER' : 'spark2-thriftserver', + 'LIVY_SERVER' : 'livy2-server', + 'LIVY_CLIENT' : 'livy2-client' + +} + +component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "SPARK_CLIENT") + +config = Script.get_config() +tmp_dir = Script.get_tmp_dir() + +stack_name = status_params.stack_name +stack_root = Script.get_stack_root() +stack_version_unformatted = config['hostLevelParams']['stack_version'] +stack_version_formatted = format_stack_version(stack_version_unformatted) + +sysprep_skip_copy_tarballs_hdfs = get_sysprep_skip_copy_tarballs_hdfs() + +# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade +version = default("/commandParams/version", None) + +# TODO, change to "spark" after RPM switches the name +spark_conf = '/etc/spark2/conf' +hadoop_conf_dir = conf_select.get_hadoop_conf_dir() +hadoop_bin_dir = stack_select.get_hadoop_dir("bin") + +if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): + hadoop_home = stack_select.get_hadoop_dir("home") + spark_conf = format("{stack_root}/current/{component_directory}/conf") + spark_log_dir = config['configurations']['spark-env']['spark_log_dir'] + spark_pid_dir = status_params.spark_pid_dir + spark_home = format("{stack_root}/current/{component_directory}") + +spark_daemon_memory = config['configurations']['spark-env']['spark_daemon_memory'] +spark_thrift_server_conf_file = spark_conf + "/spark-thrift-sparkconf.conf" +java_home = config['hostLevelParams']['java_home'] + +hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] +hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] +hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] +user_group = config['configurations']['cluster-env']['user_group'] + +spark_user = status_params.spark_user +hive_user = status_params.hive_user +spark_group = status_params.spark_group +user_group = status_params.user_group +spark_hdfs_user_dir = format("/user/{spark_user}") +spark_history_dir = default('/configurations/spark-defaults/spark.history.fs.logDirectory', "hdfs:///spark-history") + +spark_history_server_pid_file = status_params.spark_history_server_pid_file +spark_thrift_server_pid_file = status_params.spark_thrift_server_pid_file + +spark_history_server_start = format("{spark_home}/sbin/start-history-server.sh") +spark_history_server_stop = format("{spark_home}/sbin/stop-history-server.sh") + +spark_thrift_server_start = format("{spark_home}/sbin/start-thriftserver.sh") +spark_thrift_server_stop = format("{spark_home}/sbin/stop-thriftserver.sh") +spark_hadoop_lib_native = format("{stack_root}/current/hadoop-client/lib/native:{stack_root}/current/hadoop-client/lib/native/Linux-amd64-64") + +run_example_cmd = format("{spark_home}/bin/run-example") +spark_smoke_example = "SparkPi" +spark_service_check_cmd = format( + "{run_example_cmd} --master yarn --deploy-mode cluster --num-executors 1 --driver-memory 256m --executor-memory 256m --executor-cores 1 {spark_smoke_example} 1") + +spark_jobhistoryserver_hosts = default("/clusterHostInfo/spark_jobhistoryserver_hosts", []) + +if len(spark_jobhistoryserver_hosts) > 0: + spark_history_server_host = spark_jobhistoryserver_hosts[0] +else: + spark_history_server_host = "localhost" + +# spark-defaults params +ui_ssl_enabled = default("configurations/spark-defaults/spark.ssl.enabled", False) + +spark_yarn_historyServer_address = default(spark_history_server_host, "localhost") +spark_history_scheme = "http" +spark_history_ui_port = config['configurations']['spark-defaults']['spark.history.ui.port'] + +if ui_ssl_enabled: + spark_history_ui_port = str(int(spark_history_ui_port) + 400) + spark_history_scheme = "https" + + +spark_env_sh = config['configurations']['spark-env']['content'] +spark_log4j_properties = config['configurations']['spark-log4j-properties']['content'] +spark_metrics_properties = config['configurations']['spark-metrics-properties']['content'] + +hive_server_host = default("/clusterHostInfo/hive_server_host", []) +is_hive_installed = not len(hive_server_host) == 0 + +security_enabled = config['configurations']['cluster-env']['security_enabled'] +kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) +spark_kerberos_keytab = config['configurations']['spark-defaults']['spark.history.kerberos.keytab'] +spark_kerberos_principal = config['configurations']['spark-defaults']['spark.history.kerberos.principal'] +smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab'] +smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name'] + +spark_thriftserver_hosts = default("/clusterHostInfo/spark_thriftserver_hosts", []) +has_spark_thriftserver = not len(spark_thriftserver_hosts) == 0 + +# hive-site params +spark_hive_properties = { + 'hive.metastore.uris': default('/configurations/hive-site/hive.metastore.uris', '') +} + +# security settings +if security_enabled: + spark_principal = spark_kerberos_principal.replace('_HOST',spark_history_server_host.lower()) + + if is_hive_installed: + spark_hive_properties.update({ + 'hive.metastore.sasl.enabled': str(config['configurations']['hive-site']['hive.metastore.sasl.enabled']).lower(), + 'hive.metastore.kerberos.keytab.file': config['configurations']['hive-site']['hive.metastore.kerberos.keytab.file'], + 'hive.server2.authentication.spnego.principal': config['configurations']['hive-site']['hive.server2.authentication.spnego.principal'], + 'hive.server2.authentication.spnego.keytab': config['configurations']['hive-site']['hive.server2.authentication.spnego.keytab'], + 'hive.metastore.kerberos.principal': config['configurations']['hive-site']['hive.metastore.kerberos.principal'], + 'hive.server2.authentication.kerberos.principal': config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'], + 'hive.server2.authentication.kerberos.keytab': config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'], + 'hive.server2.authentication': config['configurations']['hive-site']['hive.server2.authentication'], + }) + + hive_kerberos_keytab = config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'] + hive_kerberos_principal = config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'].replace('_HOST', socket.getfqdn().lower()) + +# thrift server support - available on HDP 2.3 or higher +spark_thrift_sparkconf = None +spark_thrift_cmd_opts_properties = '' +spark_thrift_fairscheduler_content = None +spark_thrift_master = "yarn-client" +if 'nm_hosts' in config['clusterHostInfo'] and len(config['clusterHostInfo']['nm_hosts']) == 1: + # use local mode when there's only one nodemanager + spark_thrift_master = "local[4]" + +if has_spark_thriftserver and 'spark-thrift-sparkconf' in config['configurations']: + spark_thrift_sparkconf = config['configurations']['spark-thrift-sparkconf'] + spark_thrift_cmd_opts_properties = config['configurations']['spark-env']['spark_thrift_cmd_opts'] + if is_hive_installed: + # update default metastore client properties (async wait for metastore component) it is useful in case of + # blueprint provisioning when hive-metastore and spark-thriftserver is not on the same host. + spark_hive_properties.update({ + 'hive.metastore.client.socket.timeout' : config['configurations']['hive-site']['hive.metastore.client.socket.timeout'] + }) + spark_hive_properties.update(config['configurations']['spark-hive-site-override']) + + if 'spark-thrift-fairscheduler' in config['configurations'] and 'fairscheduler_content' in config['configurations']['spark-thrift-fairscheduler']: + spark_thrift_fairscheduler_content = config['configurations']['spark-thrift-fairscheduler']['fairscheduler_content'] + +default_fs = config['configurations']['core-site']['fs.defaultFS'] +hdfs_site = config['configurations']['hdfs-site'] +hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore" + +ats_host = set(default("/clusterHostInfo/app_timeline_server_hosts", [])) +has_ats = len(ats_host) > 0 + +dfs_type = default("/commandParams/dfs_type", "") + +# livy related config + +# livy for spark2 is only supported from HDP 2.6 +has_livyserver = False + +if stack_version_formatted and check_stack_feature(StackFeature.SPARK_LIVY, stack_version_formatted): + livy_component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "LIVY_SERVER") + livy_conf = format("{stack_root}/current/{livy_component_directory}/conf") + livy_log_dir = config['configurations']['livy-env']['livy_log_dir'] + livy_pid_dir = status_params.livy_pid_dir + livy_home = format("{stack_root}/current/{livy_component_directory}") + livy_user = status_params.livy_user + livy_group = status_params.livy_group + user_group = status_params.user_group + livy_hdfs_user_dir = format("/user/{livy_user}") + livy_server_pid_file = status_params.livy_server_pid_file + livy_recovery_dir = default("/configurations/livy-conf/livy.server.recovery.state-store.url", "/livy-recovery") + + livy_server_start = format("{livy_home}/bin/livy-server start") + livy_server_stop = format("{livy_home}/bin/livy-server stop") + livy_logs_dir = format("{livy_home}/logs") + + livy_env_sh = config['configurations']['livy-env']['content'] + livy_log4j_properties = config['configurations']['livy-log4j-properties']['content'] + livy_spark_blacklist_properties = config['configurations']['livy-spark-blacklist']['content'] + + if 'livy.server.kerberos.keytab' in config['configurations']['livy-conf']: + livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.kerberos.keytab'] + else: + livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.launch.kerberos.keytab'] + if 'livy.server.kerberos.principal' in config['configurations']['livy-conf']: + livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.kerberos.principal'] + else: + livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.launch.kerberos.principal'] + + livy_livyserver_hosts = default("/clusterHostInfo/livy_server_hosts", []) + + # ats 1.5 properties + entity_groupfs_active_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.active-dir'] + entity_groupfs_active_dir_mode = 01777 + entity_groupfs_store_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.done-dir'] + entity_groupfs_store_dir_mode = 0700 + is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled'] + + if len(livy_livyserver_hosts) > 0: + has_livyserver = True + if security_enabled: + livy_principal = livy_kerberos_principal.replace('_HOST', config['hostname'].lower()) + + livy_livyserver_port = default('configurations/livy-conf/livy.server.port',8999) + + +import functools +#create partial functions with common arguments for every HdfsResource call +#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code +HdfsResource = functools.partial( + HdfsResource, + user=hdfs_user, + hdfs_resource_ignore_file = hdfs_resource_ignore_file, + security_enabled = security_enabled, + keytab = hdfs_user_keytab, + kinit_path_local = kinit_path_local, + hadoop_bin_dir = hadoop_bin_dir, + hadoop_conf_dir = hadoop_conf_dir, + principal_name = hdfs_principal_name, + hdfs_site = hdfs_site, + default_fs = default_fs, + immutable_paths = get_not_managed_resources(), + dfs_type = dfs_type +) + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/service_check.py new file mode 100644 index 0000000..518c624 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/service_check.py @@ -0,0 +1,62 @@ +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agree in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" +import subprocess +import time + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.format import format +from resource_management.core.resources.system import Execute +from resource_management.core.logger import Logger + +class SparkServiceCheck(Script): + def service_check(self, env): + import params + env.set_params(params) + + if params.security_enabled: + spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") + Execute(spark_kinit_cmd, user=params.spark_user) + if params.has_livyserver: + livy_kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ") + Execute(livy_kinit_cmd, user=params.livy_user) + + Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k {spark_history_scheme}://{spark_history_server_host}:{spark_history_ui_port} | grep 200"), + tries=5, + try_sleep=3, + logoutput=True + ) + if params.has_livyserver: + live_livyserver_host = "" + for livyserver_host in params.livy_livyserver_hosts: + try: + Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k http://{livyserver_host}:{livy_livyserver_port}/sessions | grep 200"), + tries=3, + try_sleep=1, + logoutput=True, + user=params.livy_user + ) + live_livyserver_host = livyserver_host + break + except: + pass + if len(params.livy_livyserver_hosts) > 0 and live_livyserver_host == "": + raise Fail(format("Connection to all Livy servers failed")) + +if __name__ == "__main__": + SparkServiceCheck().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_livy.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_livy.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_livy.py new file mode 100644 index 0000000..adaca87 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_livy.py @@ -0,0 +1,88 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import os +from resource_management import Directory, File, PropertiesFile, InlineTemplate, format + + +def setup_livy(env, type, upgrade_type = None, action = None): + import params + + Directory([params.livy_pid_dir, params.livy_log_dir], + owner=params.livy_user, + group=params.user_group, + mode=0775, + create_parents = True + ) + if type == 'server' and action == 'config': + params.HdfsResource(params.livy_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.livy_user, + mode=0775 + ) + params.HdfsResource(None, action="execute") + + params.HdfsResource(params.livy_recovery_dir, + type="directory", + action="create_on_execute", + owner=params.livy_user, + mode=0700 + ) + params.HdfsResource(None, action="execute") + + # create livy-env.sh in etc/conf dir + File(os.path.join(params.livy_conf, 'livy-env.sh'), + owner=params.livy_user, + group=params.livy_group, + content=InlineTemplate(params.livy_env_sh), + mode=0644, + ) + + # create livy.conf in etc/conf dir + PropertiesFile(format("{livy_conf}/livy.conf"), + properties = params.config['configurations']['livy-conf'], + key_value_delimiter = " ", + owner=params.livy_user, + group=params.livy_group, + ) + + # create log4j.properties in etc/conf dir + File(os.path.join(params.livy_conf, 'log4j.properties'), + owner=params.livy_user, + group=params.livy_group, + content=params.livy_log4j_properties, + mode=0644, + ) + + # create spark-blacklist.properties in etc/conf dir + File(os.path.join(params.livy_conf, 'spark-blacklist.conf'), + owner=params.livy_user, + group=params.livy_group, + content=params.livy_spark_blacklist_properties, + mode=0644, + ) + + Directory(params.livy_logs_dir, + owner=params.livy_user, + group=params.livy_group, + mode=0755, + ) + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_spark.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_spark.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_spark.py new file mode 100644 index 0000000..9329ce0 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/setup_spark.py @@ -0,0 +1,116 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import fileinput +import shutil +import os + +from resource_management.core.exceptions import ComponentIsNotRunning +from resource_management.core.logger import Logger +from resource_management.core import shell +from resource_management.core.source import InlineTemplate +from resource_management.core.resources.system import Directory, File +from resource_management.libraries.resources.properties_file import PropertiesFile +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.format import format +from resource_management.libraries.resources.xml_config import XmlConfig + +def setup_spark(env, type, upgrade_type = None, action = None): + import params + + Directory([params.spark_pid_dir, params.spark_log_dir], + owner=params.spark_user, + group=params.user_group, + mode=0775, + create_parents = True + ) + if type == 'server' and action == 'config': + params.HdfsResource(params.spark_hdfs_user_dir, + type="directory", + action="create_on_execute", + owner=params.spark_user, + mode=0775 + ) + params.HdfsResource(None, action="execute") + + PropertiesFile(format("{spark_conf}/spark-defaults.conf"), + properties = params.config['configurations']['spark-defaults'], + key_value_delimiter = " ", + owner=params.spark_user, + group=params.spark_group, + mode=0644 + ) + + # create spark-env.sh in etc/conf dir + File(os.path.join(params.spark_conf, 'spark-env.sh'), + owner=params.spark_user, + group=params.spark_group, + content=InlineTemplate(params.spark_env_sh), + mode=0644, + ) + + #create log4j.properties in etc/conf dir + File(os.path.join(params.spark_conf, 'log4j.properties'), + owner=params.spark_user, + group=params.spark_group, + content=params.spark_log4j_properties, + mode=0644, + ) + + #create metrics.properties in etc/conf dir + File(os.path.join(params.spark_conf, 'metrics.properties'), + owner=params.spark_user, + group=params.spark_group, + content=InlineTemplate(params.spark_metrics_properties), + mode=0644 + ) + + if params.is_hive_installed: + XmlConfig("hive-site.xml", + conf_dir=params.spark_conf, + configurations=params.spark_hive_properties, + owner=params.spark_user, + group=params.spark_group, + mode=0644) + + if params.has_spark_thriftserver: + PropertiesFile(params.spark_thrift_server_conf_file, + properties = params.config['configurations']['spark-thrift-sparkconf'], + owner = params.hive_user, + group = params.user_group, + key_value_delimiter = " ", + mode=0644 + ) + + effective_version = params.version if upgrade_type is not None else params.stack_version_formatted + if effective_version: + effective_version = format_stack_version(effective_version) + + if params.spark_thrift_fairscheduler_content and effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version): + # create spark-thrift-fairscheduler.xml + File(os.path.join(params.spark_conf,"spark-thrift-fairscheduler.xml"), + owner=params.spark_user, + group=params.spark_group, + mode=0755, + content=InlineTemplate(params.spark_thrift_fairscheduler_content) + ) http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_client.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_client.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_client.py new file mode 100644 index 0000000..3acde4e --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_client.py @@ -0,0 +1,62 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.core.exceptions import ClientComponentHasNoStatus +from resource_management.core.logger import Logger +from resource_management.core import shell +from setup_spark import setup_spark + + +class SparkClient(Script): + def install(self, env): + self.install_packages(env) + self.configure(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + + setup_spark(env, 'client', upgrade_type=upgrade_type, action = 'config') + + def status(self, env): + raise ClientComponentHasNoStatus() + + def get_component_name(self): + # TODO, change to "spark" after RPM switches the name + return "spark2-client" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): + Logger.info("Executing Spark Client Stack Upgrade pre-restart") + # TODO, change to "spark" after RPM switches the name + conf_select.select(params.stack_name, "spark2", params.version) + stack_select.select("spark2-client", params.version) + +if __name__ == "__main__": + SparkClient().execute() + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_service.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_service.py new file mode 100644 index 0000000..536d798 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_service.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' +import socket +import tarfile +import os +from contextlib import closing + +from resource_management.libraries.script.script import Script +from resource_management.libraries.resources.hdfs_resource import HdfsResource +from resource_management.libraries.functions.copy_tarball import copy_to_hdfs, get_tarball_paths +from resource_management.libraries.functions import format +from resource_management.core.resources.system import File, Execute +from resource_management.libraries.functions.version import format_stack_version +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.show_logs import show_logs + + +def make_tarfile(output_filename, source_dir): + try: + os.remove(output_filename) + except OSError: + pass + parent_dir=os.path.dirname(output_filename) + if not os.path.exists(parent_dir): + os.makedirs(parent_dir) + os.chmod(parent_dir, 0711) + with closing(tarfile.open(output_filename, "w:gz")) as tar: + for file in os.listdir(source_dir): + tar.add(os.path.join(source_dir,file),arcname=file) + os.chmod(output_filename, 0644) + + +def spark_service(name, upgrade_type=None, action=None): + import params + + if action == 'start': + + effective_version = params.version if upgrade_type is not None else params.stack_version_formatted + if effective_version: + effective_version = format_stack_version(effective_version) + + if name == 'jobhistoryserver' and effective_version and check_stack_feature(StackFeature.SPARK_16PLUS, effective_version): + # TODO, change to "spark" after RPM switches the name + # create & copy spark2-hdp-yarn-archive.tar.gz to hdfs + if not params.sysprep_skip_copy_tarballs_hdfs: + source_dir=params.spark_home+"/jars" + tmp_archive_file=get_tarball_paths("spark2")[1] + make_tarfile(tmp_archive_file, source_dir) + copy_to_hdfs("spark2", params.user_group, params.hdfs_user, skip=params.sysprep_skip_copy_tarballs_hdfs, replace_existing_files=True) + # create spark history directory + params.HdfsResource(params.spark_history_dir, + type="directory", + action="create_on_execute", + owner=params.spark_user, + group=params.user_group, + mode=0777, + recursive_chmod=True + ) + params.HdfsResource(None, action="execute") + + if params.security_enabled: + spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") + Execute(spark_kinit_cmd, user=params.spark_user) + + # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not + # need to copy the tarball, otherwise, copy it. + if params.stack_version_formatted and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.stack_version_formatted): + resource_created = copy_to_hdfs("tez", params.user_group, params.hdfs_user, skip=params.sysprep_skip_copy_tarballs_hdfs) + if resource_created: + params.HdfsResource(None, action="execute") + + if name == 'jobhistoryserver': + historyserver_no_op_test = format( + 'ls {spark_history_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_history_server_pid_file}` >/dev/null 2>&1') + try: + Execute(format('{spark_history_server_start}'), + user=params.spark_user, + environment={'JAVA_HOME': params.java_home}, + not_if=historyserver_no_op_test) + except: + show_logs(params.spark_log_dir, user=params.spark_user) + raise + + elif name == 'sparkthriftserver': + if params.security_enabled: + hive_principal = params.hive_kerberos_principal + hive_kinit_cmd = format("{kinit_path_local} -kt {hive_kerberos_keytab} {hive_principal}; ") + Execute(hive_kinit_cmd, user=params.hive_user) + + thriftserver_no_op_test = format( + 'ls {spark_thrift_server_pid_file} >/dev/null 2>&1 && ps -p `cat {spark_thrift_server_pid_file}` >/dev/null 2>&1') + try: + Execute(format('{spark_thrift_server_start} --properties-file {spark_thrift_server_conf_file} {spark_thrift_cmd_opts_properties}'), + user=params.hive_user, + environment={'JAVA_HOME': params.java_home}, + not_if=thriftserver_no_op_test + ) + except: + show_logs(params.spark_log_dir, user=params.hive_user) + raise + elif action == 'stop': + if name == 'jobhistoryserver': + try: + Execute(format('{spark_history_server_stop}'), + user=params.spark_user, + environment={'JAVA_HOME': params.java_home} + ) + except: + show_logs(params.spark_log_dir, user=params.spark_user) + raise + File(params.spark_history_server_pid_file, + action="delete" + ) + + elif name == 'sparkthriftserver': + try: + Execute(format('{spark_thrift_server_stop}'), + user=params.hive_user, + environment={'JAVA_HOME': params.java_home} + ) + except: + show_logs(params.spark_log_dir, user=params.hive_user) + raise + File(params.spark_thrift_server_pid_file, + action="delete" + ) + + http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_thrift_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_thrift_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_thrift_server.py new file mode 100644 index 0000000..8953b35 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/spark_thrift_server.py @@ -0,0 +1,91 @@ +#!/usr/bin/python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +import sys +import os + +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions import conf_select, stack_select +from resource_management.libraries.functions.stack_features import check_stack_feature +from resource_management.libraries.functions.constants import StackFeature +from resource_management.libraries.functions.check_process_status import check_process_status +from resource_management.core.logger import Logger +from resource_management.core import shell +from setup_spark import setup_spark +from spark_service import spark_service + + +class SparkThriftServer(Script): + + def install(self, env): + import params + env.set_params(params) + + self.install_packages(env) + + def configure(self, env, upgrade_type=None, config_dir=None): + import params + env.set_params(params) + setup_spark(env, 'server', upgrade_type = upgrade_type, action = 'config') + + def start(self, env, upgrade_type=None): + import params + env.set_params(params) + + self.configure(env) + spark_service('sparkthriftserver', upgrade_type=upgrade_type, action='start') + + def stop(self, env, upgrade_type=None): + import params + env.set_params(params) + spark_service('sparkthriftserver', upgrade_type=upgrade_type, action='stop') + + def status(self, env): + import status_params + env.set_params(status_params) + check_process_status(status_params.spark_thrift_server_pid_file) + + def get_component_name(self): + # TODO, change to "spark" after RPM switches the name + return "spark2-thriftserver" + + def pre_upgrade_restart(self, env, upgrade_type=None): + import params + + env.set_params(params) + Logger.info("Executing Spark Thrift Server Stack Upgrade pre-restart") + # TODO, change to "spark" after RPM switches the name + conf_select.select(params.stack_name, "spark2", params.version) + stack_select.select("spark2-thriftserver", params.version) + + def get_log_folder(self): + import params + return params.spark_log_dir + + def get_user(self): + import params + return params.hive_user + + def get_pid_files(self): + import status_params + return [status_params.spark_thrift_server_pid_file] + +if __name__ == "__main__": + SparkThriftServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/status_params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/status_params.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/status_params.py new file mode 100644 index 0000000..07dcc47 --- /dev/null +++ b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/package/scripts/status_params.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python +""" +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +""" + +from resource_management.libraries.functions.format import format +from resource_management.libraries.script.script import Script +from resource_management.libraries.functions.default import default + +config = Script.get_config() + +spark_user = config['configurations']['spark-env']['spark_user'] +spark_group = config['configurations']['spark-env']['spark_group'] +user_group = config['configurations']['cluster-env']['user_group'] + +if 'hive-env' in config['configurations']: + hive_user = config['configurations']['hive-env']['hive_user'] +else: + hive_user = "hive" + +spark_pid_dir = config['configurations']['spark-env']['spark_pid_dir'] +spark_history_server_pid_file = format("{spark_pid_dir}/spark-{spark_user}-org.apache.spark.deploy.history.HistoryServer-1.pid") +spark_thrift_server_pid_file = format("{spark_pid_dir}/spark-{hive_user}-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1.pid") +stack_name = default("/hostLevelParams/stack_name", None) + +if "livy-env" in config['configurations']: + livy_user = config['configurations']['livy-env']['livy_user'] + livy_group = config['configurations']['livy-env']['livy_group'] + livy_pid_dir = config['configurations']['livy-env']['livy_pid_dir'] + livy_server_pid_file = format("{livy_pid_dir}/livy-{livy_user}-server.pid") \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/job_history_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/job_history_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/job_history_server.py deleted file mode 100644 index 3937c88..0000000 --- a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/job_history_server.py +++ /dev/null @@ -1,108 +0,0 @@ -#!/usr/bin/python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import sys -import os - -from resource_management.libraries.script.script import Script -from resource_management.libraries.functions import conf_select, stack_select -from resource_management.libraries.functions.copy_tarball import copy_to_hdfs -from resource_management.libraries.functions.check_process_status import check_process_status -from resource_management.libraries.functions.stack_features import check_stack_feature -from resource_management.libraries.functions.constants import StackFeature -from resource_management.core.logger import Logger -from resource_management.core import shell -from setup_spark import * -from spark_service import spark_service - - -class JobHistoryServer(Script): - - def install(self, env): - import params - env.set_params(params) - - self.install_packages(env) - - def configure(self, env, upgrade_type=None, config_dir=None): - import params - env.set_params(params) - - setup_spark(env, 'server', upgrade_type=upgrade_type, action = 'config') - - def start(self, env, upgrade_type=None): - import params - env.set_params(params) - - self.configure(env) - spark_service('jobhistoryserver', upgrade_type=upgrade_type, action='start') - - def stop(self, env, upgrade_type=None): - import params - env.set_params(params) - - spark_service('jobhistoryserver', upgrade_type=upgrade_type, action='stop') - - def status(self, env): - import status_params - env.set_params(status_params) - - check_process_status(status_params.spark_history_server_pid_file) - - - def get_component_name(self): - # TODO, change to "spark" after RPM switches the name - return "spark2-historyserver" - - def pre_upgrade_restart(self, env, upgrade_type=None): - import params - - env.set_params(params) - if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): - Logger.info("Executing Spark Job History Server Stack Upgrade pre-restart") - # TODO, change to "spark" after RPM switches the name - conf_select.select(params.stack_name, "spark2", params.version) - stack_select.select("spark2-historyserver", params.version) - - # Spark 1.3.1.2.3, and higher, which was included in HDP 2.3, does not have a dependency on Tez, so it does not - # need to copy the tarball, otherwise, copy it. - if params.version and check_stack_feature(StackFeature.TEZ_FOR_SPARK, params.version): - resource_created = copy_to_hdfs( - "tez", - params.user_group, - params.hdfs_user, - skip=params.sysprep_skip_copy_tarballs_hdfs) - if resource_created: - params.HdfsResource(None, action="execute") - - def get_log_folder(self): - import params - return params.spark_log_dir - - def get_user(self): - import params - return params.spark_user - - def get_pid_files(self): - import status_params - return [status_params.spark_history_server_pid_file] - -if __name__ == "__main__": - JobHistoryServer().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_server.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_server.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_server.py deleted file mode 100644 index 269c97d..0000000 --- a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_server.py +++ /dev/null @@ -1,151 +0,0 @@ -#!/usr/bin/python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -from resource_management.libraries.script.script import Script -from resource_management.libraries.functions.check_process_status import check_process_status -from resource_management.libraries.functions.stack_features import check_stack_feature -from resource_management.libraries.functions.constants import StackFeature -from resource_management.core.exceptions import Fail -from resource_management.core.resources.system import Execute -from resource_management.libraries.providers.hdfs_resource import WebHDFSUtil -from resource_management.libraries.providers.hdfs_resource import HdfsResourceProvider -from resource_management import is_empty -from resource_management import shell -from resource_management.libraries.functions.decorator import retry -from resource_management.core.logger import Logger -from resource_management.libraries.functions.format import format -from resource_management.libraries.functions import conf_select, stack_select - -from livy_service import livy_service -from setup_livy import setup_livy - -class LivyServer(Script): - - def install(self, env): - import params - env.set_params(params) - - self.install_packages(env) - - def configure(self, env, upgrade_type=None, config_dir=None): - import params - env.set_params(params) - - setup_livy(env, 'server', upgrade_type=upgrade_type, action = 'config') - - def start(self, env, upgrade_type=None): - import params - env.set_params(params) - - if params.has_ats and params.has_livyserver: - Logger.info("Verifying DFS directories where ATS stores time line data for active and completed applications.") - self.wait_for_dfs_directories_created([params.entity_groupfs_store_dir, params.entity_groupfs_active_dir]) - - self.configure(env) - livy_service('server', upgrade_type=upgrade_type, action='start') - - def stop(self, env, upgrade_type=None): - import params - env.set_params(params) - - livy_service('server', upgrade_type=upgrade_type, action='stop') - - def status(self, env): - import status_params - env.set_params(status_params) - - check_process_status(status_params.livy_server_pid_file) - - # TODO move out and compose with similar method in resourcemanager.py - def wait_for_dfs_directories_created(self, dirs): - import params - - ignored_dfs_dirs = HdfsResourceProvider.get_ignored_resources_list(params.hdfs_resource_ignore_file) - - if params.security_enabled: - Execute(format("{kinit_path_local} -kt {livy_kerberos_keytab} {livy_principal}"), - user=params.livy_user - ) - Execute(format("{kinit_path_local} -kt {hdfs_user_keytab} {hdfs_principal_name}"), - user=params.hdfs_user - ) - - for dir_path in dirs: - self.wait_for_dfs_directory_created(dir_path, ignored_dfs_dirs) - - def get_pid_files(self): - import status_params - return [status_params.livy_server_pid_file] - - - @retry(times=8, sleep_time=20, backoff_factor=1, err_class=Fail) - def wait_for_dfs_directory_created(self, dir_path, ignored_dfs_dirs): - import params - - if not is_empty(dir_path): - dir_path = HdfsResourceProvider.parse_path(dir_path) - - if dir_path in ignored_dfs_dirs: - Logger.info("Skipping DFS directory '" + dir_path + "' as it's marked to be ignored.") - return - - Logger.info("Verifying if DFS directory '" + dir_path + "' exists.") - - dir_exists = None - - if WebHDFSUtil.is_webhdfs_available(params.is_webhdfs_enabled, params.default_fs): - # check with webhdfs is much faster than executing hdfs dfs -test - util = WebHDFSUtil(params.hdfs_site, params.hdfs_user, params.security_enabled) - list_status = util.run_command(dir_path, 'GETFILESTATUS', method='GET', ignore_status_codes=['404'], assertable_result=False) - dir_exists = ('FileStatus' in list_status) - else: - # have to do time expensive hdfs dfs -d check. - dfs_ret_code = shell.call(format("hdfs --config {hadoop_conf_dir} dfs -test -d " + dir_path), user=params.livy_user)[0] - dir_exists = not dfs_ret_code #dfs -test -d returns 0 in case the dir exists - - if not dir_exists: - raise Fail("DFS directory '" + dir_path + "' does not exist !") - else: - Logger.info("DFS directory '" + dir_path + "' exists.") - - def get_component_name(self): - # TODO, change to "livy" after RPM switches the name - return "livy2-server" - - def pre_upgrade_restart(self, env, upgrade_type=None): - import params - - env.set_params(params) - if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): - Logger.info("Executing Livy Server Stack Upgrade pre-restart") - # TODO, change to "spark" and "livy" after RPM switches the name - conf_select.select(params.stack_name, "spark2", params.version) - stack_select.select("livy2-server", params.version) - - def get_log_folder(self): - import params - return params.livy_log_dir - - def get_user(self): - import params - return params.livy_user -if __name__ == "__main__": - LivyServer().execute() - http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_service.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_service.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_service.py deleted file mode 100644 index 45201db..0000000 --- a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/livy_service.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python - -''' -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -''' - -from resource_management.libraries.functions import format -from resource_management.core.resources.system import File, Execute -import threading - -def livy_service(name, upgrade_type=None, action=None): - import params - - if action == 'start': - livyserver_no_op_test = format( - 'ls {livy_server_pid_file} >/dev/null 2>&1 && ps -p `cat {livy_server_pid_file}` >/dev/null 2>&1') - Execute(format('{livy_server_start}'), - user=params.livy_user, - environment={'JAVA_HOME': params.java_home}, - not_if=livyserver_no_op_test - ) - - elif action == 'stop': - Execute(format('{livy_server_stop}'), - user=params.livy_user, - environment={'JAVA_HOME': params.java_home} - ) - File(params.livy_server_pid_file, - action="delete" - ) - - - - http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py deleted file mode 100644 index e60cab5..0000000 --- a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/params.py +++ /dev/null @@ -1,268 +0,0 @@ -#!/usr/bin/python -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -""" - -import socket - -import status_params -from resource_management.libraries.functions.stack_features import check_stack_feature -from resource_management.libraries.functions.constants import StackFeature -from resource_management.libraries.functions import conf_select, stack_select -from resource_management.libraries.functions.version import format_stack_version -from resource_management.libraries.functions.copy_tarball import get_sysprep_skip_copy_tarballs_hdfs -from resource_management.libraries.functions.format import format -from resource_management.libraries.functions.default import default -from resource_management.libraries.functions import get_kinit_path -from resource_management.libraries.functions.get_not_managed_resources import get_not_managed_resources -from resource_management.libraries.resources.hdfs_resource import HdfsResource -from resource_management.libraries.script.script import Script - -# a map of the Ambari role to the component name -# for use with <stack-root>/current/<component> -# TODO, change to "spark" and "livy" after RPM switches the name -SERVER_ROLE_DIRECTORY_MAP = { - 'SPARK_JOBHISTORYSERVER' : 'spark2-historyserver', - 'SPARK_CLIENT' : 'spark2-client', - 'SPARK_THRIFTSERVER' : 'spark2-thriftserver', - 'LIVY_SERVER' : 'livy2-server', - 'LIVY_CLIENT' : 'livy2-client' - -} - -component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "SPARK_CLIENT") - -config = Script.get_config() -tmp_dir = Script.get_tmp_dir() - -stack_name = status_params.stack_name -stack_root = Script.get_stack_root() -stack_version_unformatted = config['hostLevelParams']['stack_version'] -stack_version_formatted = format_stack_version(stack_version_unformatted) - -sysprep_skip_copy_tarballs_hdfs = get_sysprep_skip_copy_tarballs_hdfs() - -# New Cluster Stack Version that is defined during the RESTART of a Stack Upgrade -version = default("/commandParams/version", None) - -# TODO, change to "spark" after RPM switches the name -spark_conf = '/etc/spark2/conf' -hadoop_conf_dir = conf_select.get_hadoop_conf_dir() -hadoop_bin_dir = stack_select.get_hadoop_dir("bin") - -if stack_version_formatted and check_stack_feature(StackFeature.ROLLING_UPGRADE, stack_version_formatted): - hadoop_home = stack_select.get_hadoop_dir("home") - spark_conf = format("{stack_root}/current/{component_directory}/conf") - spark_log_dir = config['configurations']['spark-env']['spark_log_dir'] - spark_pid_dir = status_params.spark_pid_dir - spark_home = format("{stack_root}/current/{component_directory}") - -spark_daemon_memory = config['configurations']['spark-env']['spark_daemon_memory'] -spark_thrift_server_conf_file = spark_conf + "/spark-thrift-sparkconf.conf" -java_home = config['hostLevelParams']['java_home'] - -hdfs_user = config['configurations']['hadoop-env']['hdfs_user'] -hdfs_principal_name = config['configurations']['hadoop-env']['hdfs_principal_name'] -hdfs_user_keytab = config['configurations']['hadoop-env']['hdfs_user_keytab'] -user_group = config['configurations']['cluster-env']['user_group'] - -spark_user = status_params.spark_user -hive_user = status_params.hive_user -spark_group = status_params.spark_group -user_group = status_params.user_group -spark_hdfs_user_dir = format("/user/{spark_user}") -spark_history_dir = default('/configurations/spark-defaults/spark.history.fs.logDirectory', "hdfs:///spark-history") - -spark_history_server_pid_file = status_params.spark_history_server_pid_file -spark_thrift_server_pid_file = status_params.spark_thrift_server_pid_file - -spark_history_server_start = format("{spark_home}/sbin/start-history-server.sh") -spark_history_server_stop = format("{spark_home}/sbin/stop-history-server.sh") - -spark_thrift_server_start = format("{spark_home}/sbin/start-thriftserver.sh") -spark_thrift_server_stop = format("{spark_home}/sbin/stop-thriftserver.sh") -spark_hadoop_lib_native = format("{stack_root}/current/hadoop-client/lib/native:{stack_root}/current/hadoop-client/lib/native/Linux-amd64-64") - -run_example_cmd = format("{spark_home}/bin/run-example") -spark_smoke_example = "SparkPi" -spark_service_check_cmd = format( - "{run_example_cmd} --master yarn --deploy-mode cluster --num-executors 1 --driver-memory 256m --executor-memory 256m --executor-cores 1 {spark_smoke_example} 1") - -spark_jobhistoryserver_hosts = default("/clusterHostInfo/spark_jobhistoryserver_hosts", []) - -if len(spark_jobhistoryserver_hosts) > 0: - spark_history_server_host = spark_jobhistoryserver_hosts[0] -else: - spark_history_server_host = "localhost" - -# spark-defaults params -ui_ssl_enabled = default("configurations/spark-defaults/spark.ssl.enabled", False) - -spark_yarn_historyServer_address = default(spark_history_server_host, "localhost") -spark_history_scheme = "http" -spark_history_ui_port = config['configurations']['spark-defaults']['spark.history.ui.port'] - -if ui_ssl_enabled: - spark_history_ui_port = str(int(spark_history_ui_port) + 400) - spark_history_scheme = "https" - - -spark_env_sh = config['configurations']['spark-env']['content'] -spark_log4j_properties = config['configurations']['spark-log4j-properties']['content'] -spark_metrics_properties = config['configurations']['spark-metrics-properties']['content'] - -hive_server_host = default("/clusterHostInfo/hive_server_host", []) -is_hive_installed = not len(hive_server_host) == 0 - -security_enabled = config['configurations']['cluster-env']['security_enabled'] -kinit_path_local = get_kinit_path(default('/configurations/kerberos-env/executable_search_paths', None)) -spark_kerberos_keytab = config['configurations']['spark-defaults']['spark.history.kerberos.keytab'] -spark_kerberos_principal = config['configurations']['spark-defaults']['spark.history.kerberos.principal'] -smoke_user_keytab = config['configurations']['cluster-env']['smokeuser_keytab'] -smokeuser_principal = config['configurations']['cluster-env']['smokeuser_principal_name'] - -spark_thriftserver_hosts = default("/clusterHostInfo/spark_thriftserver_hosts", []) -has_spark_thriftserver = not len(spark_thriftserver_hosts) == 0 - -# hive-site params -spark_hive_properties = { - 'hive.metastore.uris': default('/configurations/hive-site/hive.metastore.uris', '') -} - -# security settings -if security_enabled: - spark_principal = spark_kerberos_principal.replace('_HOST',spark_history_server_host.lower()) - - if is_hive_installed: - spark_hive_properties.update({ - 'hive.metastore.sasl.enabled': str(config['configurations']['hive-site']['hive.metastore.sasl.enabled']).lower(), - 'hive.metastore.kerberos.keytab.file': config['configurations']['hive-site']['hive.metastore.kerberos.keytab.file'], - 'hive.server2.authentication.spnego.principal': config['configurations']['hive-site']['hive.server2.authentication.spnego.principal'], - 'hive.server2.authentication.spnego.keytab': config['configurations']['hive-site']['hive.server2.authentication.spnego.keytab'], - 'hive.metastore.kerberos.principal': config['configurations']['hive-site']['hive.metastore.kerberos.principal'], - 'hive.server2.authentication.kerberos.principal': config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'], - 'hive.server2.authentication.kerberos.keytab': config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'], - 'hive.server2.authentication': config['configurations']['hive-site']['hive.server2.authentication'], - }) - - hive_kerberos_keytab = config['configurations']['hive-site']['hive.server2.authentication.kerberos.keytab'] - hive_kerberos_principal = config['configurations']['hive-site']['hive.server2.authentication.kerberos.principal'].replace('_HOST', socket.getfqdn().lower()) - -# thrift server support - available on HDP 2.3 or higher -spark_thrift_sparkconf = None -spark_thrift_cmd_opts_properties = '' -spark_thrift_fairscheduler_content = None -spark_thrift_master = "yarn-client" -if 'nm_hosts' in config['clusterHostInfo'] and len(config['clusterHostInfo']['nm_hosts']) == 1: - # use local mode when there's only one nodemanager - spark_thrift_master = "local[4]" - -if has_spark_thriftserver and 'spark-thrift-sparkconf' in config['configurations']: - spark_thrift_sparkconf = config['configurations']['spark-thrift-sparkconf'] - spark_thrift_cmd_opts_properties = config['configurations']['spark-env']['spark_thrift_cmd_opts'] - if is_hive_installed: - # update default metastore client properties (async wait for metastore component) it is useful in case of - # blueprint provisioning when hive-metastore and spark-thriftserver is not on the same host. - spark_hive_properties.update({ - 'hive.metastore.client.socket.timeout' : config['configurations']['hive-site']['hive.metastore.client.socket.timeout'] - }) - spark_hive_properties.update(config['configurations']['spark-hive-site-override']) - - if 'spark-thrift-fairscheduler' in config['configurations'] and 'fairscheduler_content' in config['configurations']['spark-thrift-fairscheduler']: - spark_thrift_fairscheduler_content = config['configurations']['spark-thrift-fairscheduler']['fairscheduler_content'] - -default_fs = config['configurations']['core-site']['fs.defaultFS'] -hdfs_site = config['configurations']['hdfs-site'] -hdfs_resource_ignore_file = "/var/lib/ambari-agent/data/.hdfs_resource_ignore" - -ats_host = set(default("/clusterHostInfo/app_timeline_server_hosts", [])) -has_ats = len(ats_host) > 0 - -dfs_type = default("/commandParams/dfs_type", "") - -# livy related config - -# livy for spark2 is only supported from HDP 2.6 -has_livyserver = False - -if stack_version_formatted and check_stack_feature(StackFeature.SPARK_LIVY, stack_version_formatted): - livy_component_directory = Script.get_component_from_role(SERVER_ROLE_DIRECTORY_MAP, "LIVY_SERVER") - livy_conf = format("{stack_root}/current/{livy_component_directory}/conf") - livy_log_dir = config['configurations']['livy-env']['livy_log_dir'] - livy_pid_dir = status_params.livy_pid_dir - livy_home = format("{stack_root}/current/{livy_component_directory}") - livy_user = status_params.livy_user - livy_group = status_params.livy_group - user_group = status_params.user_group - livy_hdfs_user_dir = format("/user/{livy_user}") - livy_server_pid_file = status_params.livy_server_pid_file - livy_recovery_dir = default("/configurations/livy-conf/livy.server.recovery.state-store.url", "/livy-recovery") - - livy_server_start = format("{livy_home}/bin/livy-server start") - livy_server_stop = format("{livy_home}/bin/livy-server stop") - livy_logs_dir = format("{livy_home}/logs") - - livy_env_sh = config['configurations']['livy-env']['content'] - livy_log4j_properties = config['configurations']['livy-log4j-properties']['content'] - livy_spark_blacklist_properties = config['configurations']['livy-spark-blacklist']['content'] - - if 'livy.server.kerberos.keytab' in config['configurations']['livy-conf']: - livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.kerberos.keytab'] - else: - livy_kerberos_keytab = config['configurations']['livy-conf']['livy.server.launch.kerberos.keytab'] - if 'livy.server.kerberos.principal' in config['configurations']['livy-conf']: - livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.kerberos.principal'] - else: - livy_kerberos_principal = config['configurations']['livy-conf']['livy.server.launch.kerberos.principal'] - - livy_livyserver_hosts = default("/clusterHostInfo/livy_server_hosts", []) - - # ats 1.5 properties - entity_groupfs_active_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.active-dir'] - entity_groupfs_active_dir_mode = 01777 - entity_groupfs_store_dir = config['configurations']['yarn-site']['yarn.timeline-service.entity-group-fs-store.done-dir'] - entity_groupfs_store_dir_mode = 0700 - is_webhdfs_enabled = hdfs_site['dfs.webhdfs.enabled'] - - if len(livy_livyserver_hosts) > 0: - has_livyserver = True - if security_enabled: - livy_principal = livy_kerberos_principal.replace('_HOST', config['hostname'].lower()) - - livy_livyserver_port = default('configurations/livy-conf/livy.server.port',8999) - - -import functools -#create partial functions with common arguments for every HdfsResource call -#to create/delete hdfs directory/file/copyfromlocal we need to call params.HdfsResource in code -HdfsResource = functools.partial( - HdfsResource, - user=hdfs_user, - hdfs_resource_ignore_file = hdfs_resource_ignore_file, - security_enabled = security_enabled, - keytab = hdfs_user_keytab, - kinit_path_local = kinit_path_local, - hadoop_bin_dir = hadoop_bin_dir, - hadoop_conf_dir = hadoop_conf_dir, - principal_name = hdfs_principal_name, - hdfs_site = hdfs_site, - default_fs = default_fs, - immutable_paths = get_not_managed_resources(), - dfs_type = dfs_type -) - http://git-wip-us.apache.org/repos/asf/ambari/blob/4b588a92/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py b/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py deleted file mode 100644 index 518c624..0000000 --- a/ambari-server/src/main/resources/common-services/SPARK/2.2.0/scripts/service_check.py +++ /dev/null @@ -1,62 +0,0 @@ -""" -Licensed to the Apache Software Foundation (ASF) under one -or more contributor license agreements. See the NOTICE file -distributed with this work for additional information -regarding copyright ownership. The ASF licenses this file -to you under the Apache License, Version 2.0 (the -"License"); you may not use this file except in compliance -with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agree in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -""" -import subprocess -import time - -from resource_management.libraries.script.script import Script -from resource_management.libraries.functions.format import format -from resource_management.core.resources.system import Execute -from resource_management.core.logger import Logger - -class SparkServiceCheck(Script): - def service_check(self, env): - import params - env.set_params(params) - - if params.security_enabled: - spark_kinit_cmd = format("{kinit_path_local} -kt {spark_kerberos_keytab} {spark_principal}; ") - Execute(spark_kinit_cmd, user=params.spark_user) - if params.has_livyserver: - livy_kinit_cmd = format("{kinit_path_local} -kt {smoke_user_keytab} {smokeuser_principal}; ") - Execute(livy_kinit_cmd, user=params.livy_user) - - Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k {spark_history_scheme}://{spark_history_server_host}:{spark_history_ui_port} | grep 200"), - tries=5, - try_sleep=3, - logoutput=True - ) - if params.has_livyserver: - live_livyserver_host = "" - for livyserver_host in params.livy_livyserver_hosts: - try: - Execute(format("curl -s -o /dev/null -w'%{{http_code}}' --negotiate -u: -k http://{livyserver_host}:{livy_livyserver_port}/sessions | grep 200"), - tries=3, - try_sleep=1, - logoutput=True, - user=params.livy_user - ) - live_livyserver_host = livyserver_host - break - except: - pass - if len(params.livy_livyserver_hosts) > 0 and live_livyserver_host == "": - raise Fail(format("Connection to all Livy servers failed")) - -if __name__ == "__main__": - SparkServiceCheck().execute() -