AMBARI-17457 : Modify the AMS stack scripts to support distributed collector (dsen via avijayan)
Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/875f1efb Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/875f1efb Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/875f1efb Branch: refs/heads/branch-dev-patch-upgrade Commit: 875f1efb63cbbe81259a79c0b1530eba7ef2a3ec Parents: 3b30de0 Author: Aravindan Vijayan <avija...@hortonworks.com> Authored: Tue Sep 20 15:54:32 2016 -0700 Committer: Aravindan Vijayan <avija...@hortonworks.com> Committed: Tue Sep 20 15:54:32 2016 -0700 ---------------------------------------------------------------------- .../ambari_commons/ambari_metrics_helper.py | 5 +- .../ambari_commons/parallel_processing.py | 95 ++++++++ .../package/scripts/metrics_grafana_util.py | 50 ++++- .../0.1.0/package/scripts/params.py | 3 + .../0.1.0/package/scripts/service_check.py | 225 ++++++++++--------- .../metrics_grafana_datasource.json.j2 | 2 +- 6 files changed, 266 insertions(+), 114 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py index 7b4e8f5..2eb0b6d 100644 --- a/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py +++ b/ambari-common/src/main/python/ambari_commons/ambari_metrics_helper.py @@ -33,6 +33,9 @@ def select_metric_collector_for_sink(sink_name): return select_metric_collector_hosts_from_hostnames(all_collectors_list) def select_metric_collector_hosts_from_hostnames(hosts): + return get_random_host(hosts) + +def get_random_host(hosts): return random.choice(hosts) def get_metric_collectors_from_properties_file(sink_name): @@ -53,4 +56,4 @@ def load_properties_from_file(filepath, sep='=', comment_char='#'): key = key_value[0].strip() value = sep.join(key_value[1:]).strip('" \t') props[key] = value - return props \ No newline at end of file + return props http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-common/src/main/python/ambari_commons/parallel_processing.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_commons/parallel_processing.py b/ambari-common/src/main/python/ambari_commons/parallel_processing.py new file mode 100644 index 0000000..c5a95de --- /dev/null +++ b/ambari-common/src/main/python/ambari_commons/parallel_processing.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +from multiprocessing import Process, Queue + +logger = logging.getLogger() + +SUCCESS = "SUCCESS" +FAILED = "FAILED" + +class PrallelProcessResult(object): + def __init__(self, element, status, result): + self.result = result + self.status = status + self.element = element + +class ParallelProcess(Process): + + + def __init__(self, function, element, params, queue): + self.function = function + self.element = element + self.params = params + self.queue = queue + super(ParallelProcess, self).__init__() + + def return_name(self): + ## NOTE: self.name is an attribute of multiprocessing.Process + return "Process running function '%s' for element '%s'" % (self.function, self.element) + + def run(self): + try: + result = self.function(self.element, self.params) + self.queue.put(PrallelProcessResult(self.element, SUCCESS, result)) + except Exception as e: + self.queue.put(PrallelProcessResult(self.element, FAILED, + "Exception while running function '%s' for '%s'. Reason : %s" % (self.function, self.element, str(e)))) + return + +def execute_in_parallel(function, array, params, wait_for_all = False): + logger.info("Started running %s for %s" % (function, array)) + processs = [] + q = Queue() + counter = len(array) + results = {} + + for element in array: + process = ParallelProcess(function, element, params, q) + process.start() + processs.append(process) + + while counter > 0: + tmp = q.get() + counter-=1 + results[tmp.element] = tmp + if tmp.status == SUCCESS and not wait_for_all: + counter = 0 + + for process in processs: + process.terminate() + + logger.info("Finished running %s for %s" % (function, array)) + + return results + +def func (elem, params): + if elem == 'S': + return "lalala" + else : + raise Exception('Exception') + +if __name__ == "__main__": + results = execute_in_parallel(func, ['F', 'BF', 'S'], None) + for result in results: + print results[result].element + print results[result].status + print results[result].result \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py index e8c12ed..b98dc1d 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/metrics_grafana_util.py @@ -18,16 +18,21 @@ limitations under the License. """ import httplib + +from ambari_commons.parallel_processing import PrallelProcessResult, execute_in_parallel, SUCCESS +from service_check import post_metrics_to_collector from resource_management.core.logger import Logger from resource_management.core.base import Fail from resource_management import Template from collections import namedtuple from urlparse import urlparse from base64 import b64encode +import random import time import socket import ambari_simplejson as json import network +import os GRAFANA_CONNECT_TRIES = 5 GRAFANA_CONNECT_TIMEOUT = 10 @@ -171,20 +176,32 @@ def perform_grafana_delete_call(url, server): return response -def is_unchanged_datasource_url(datasource_url): +def is_unchanged_datasource_url(grafana_datasource_url, new_datasource_host): import params - parsed_url = urlparse(datasource_url) + parsed_url = urlparse(grafana_datasource_url) Logger.debug("parsed url: scheme = %s, host = %s, port = %s" % ( parsed_url.scheme, parsed_url.hostname, parsed_url.port)) Logger.debug("collector: scheme = %s, host = %s, port = %s" % - (params.metric_collector_protocol, params.metric_collector_host, + (params.metric_collector_protocol, new_datasource_host, params.metric_collector_port)) return parsed_url.scheme.strip() == params.metric_collector_protocol.strip() and \ - parsed_url.hostname.strip() == params.metric_collector_host.strip() and \ + parsed_url.hostname.strip() == new_datasource_host.strip() and \ str(parsed_url.port) == params.metric_collector_port +def do_ams_collector_post(metric_collector_host, params): + ams_metrics_post_url = "/ws/v1/timeline/metrics/" + random_value1 = random.random() + headers = {"Content-type": "application/json"} + ca_certs = os.path.join(params.ams_collector_conf_dir, + params.metric_truststore_ca_certs) + + current_time = int(time.time()) * 1000 + metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1, + current_time=current_time).get_content() + post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, params.metric_collector_port, params.metric_collector_https_enabled, + metric_json, headers, ca_certs) def create_ams_datasource(): import params server = Server(protocol = params.ams_grafana_protocol.strip(), @@ -196,11 +213,28 @@ def create_ams_datasource(): """ Create AMS datasource in Grafana, if exsists make sure the collector url is accurate """ - ams_datasource_json = Template('metrics_grafana_datasource.json.j2', - ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME).get_content() + Logger.info("Trying to find working metric collector") + results = execute_in_parallel(do_ams_collector_post, params.ams_collector_hosts, params) + new_datasource_host = "" + + for host in params.ams_collector_hosts: + if host in results: + if results[host].status == SUCCESS: + new_datasource_host = host + Logger.info("Found working collector on host %s" % new_datasource_host) + break + else: + Logger.warning(results[host].result) - Logger.info("Checking if AMS Grafana datasource already exists") + if new_datasource_host == "": + Logger.warning("All metric collectors are unavailable. Will use random collector as datasource host.") + new_datasource_host = params.random_metric_collector_host + + Logger.info("New datasource host will be %s" % new_datasource_host) + ams_datasource_json = Template('metrics_grafana_datasource.json.j2', + ams_datasource_name=METRICS_GRAFANA_DATASOURCE_NAME, ams_datasource_host=new_datasource_host).get_content() + Logger.info("Checking if AMS Grafana datasource already exists") response = perform_grafana_get_call(GRAFANA_DATASOURCE_URL, server) create_datasource = True @@ -215,7 +249,7 @@ def create_ams_datasource(): Logger.info("Ambari Metrics Grafana datasource already present. Checking Metrics Collector URL") datasource_url = datasources_json[i]["url"] - if is_unchanged_datasource_url(datasource_url): + if is_unchanged_datasource_url(datasource_url, new_datasource_host): Logger.info("Metrics Collector URL validation succeeded.") return else: # Metrics datasource present, but collector host is wrong. http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py index 22024bb..6934924 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/params.py @@ -120,6 +120,9 @@ if 'cluster-env' in config['configurations'] and \ metric_collector_host = config['configurations']['cluster-env']['metrics_collector_vip_host'] else: metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts) + +random_metric_collector_host = select_metric_collector_hosts_from_hostnames(ams_collector_hosts) + if 'cluster-env' in config['configurations'] and \ 'metrics_collector_vip_port' in config['configurations']['cluster-env']: metric_collector_port = config['configurations']['cluster-env']['metrics_collector_vip_port'] http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py index ddd3e42..56ca4a1 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/scripts/service_check.py @@ -25,6 +25,7 @@ from resource_management import Template from ambari_commons import OSConst from ambari_commons.os_family_impl import OsFamilyFuncImpl, OsFamilyImpl +from ambari_commons.parallel_processing import PrallelProcessResult, execute_in_parallel, SUCCESS import httplib import network @@ -39,10 +40,10 @@ import socket class AMSServiceCheck(Script): AMS_METRICS_POST_URL = "/ws/v1/timeline/metrics/" AMS_METRICS_GET_URL = "/ws/v1/timeline/metrics?%s" - AMS_CONNECT_TRIES = 30 - AMS_CONNECT_TIMEOUT = 15 - AMS_READ_TRIES = 10 - AMS_READ_TIMEOUT = 5 + AMS_CONNECT_TRIES = 10 + AMS_CONNECT_TIMEOUT = 10 + AMS_READ_TRIES = 5 + AMS_READ_TIMEOUT = 10 @OsFamilyFuncImpl(os_family=OSConst.WINSRV_FAMILY) def service_check(self, env): @@ -62,124 +63,140 @@ class AMSServiceCheck(Script): if not check_windows_service_exists(params.ams_collector_win_service_name): raise Fail("Metrics Collector service was not properly installed. Check the logs and retry the installation.") - @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) - def service_check(self, env): - import params - - Logger.info("Ambari Metrics service check was started.") - env.set_params(params) - + def service_check_for_single_host(self, metric_collector_host, params): random_value1 = random.random() headers = {"Content-type": "application/json"} ca_certs = os.path.join(params.ams_collector_conf_dir, params.metric_truststore_ca_certs) - for i in xrange(0, self.AMS_CONNECT_TRIES): - try: - current_time = int(time.time()) * 1000 - metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1, + current_time = int(time.time()) * 1000 + metric_json = Template('smoketest_metrics.json.j2', hostname=params.hostname, random1=random_value1, current_time=current_time).get_content() - Logger.info("Generated metrics:\n%s" % metric_json) - - Logger.info("Connecting (POST) to %s:%s%s" % (params.metric_collector_host, - params.metric_collector_port, - self.AMS_METRICS_POST_URL)) - conn = network.get_http_connection(params.metric_collector_host, + try: + post_metrics_to_collector(self.AMS_METRICS_POST_URL, metric_collector_host, params.metric_collector_port, params.metric_collector_https_enabled, + metric_json, headers, ca_certs, self.AMS_CONNECT_TRIES, self.AMS_CONNECT_TIMEOUT) + + get_metrics_parameters = { + "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric", + "appId": "amssmoketestfake", + "hostname": params.hostname, + "startTime": current_time - 60000, + "endTime": current_time + 61000, + "precision": "seconds", + "grouped": "false", + } + encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) + + Logger.info("Connecting (GET) to %s:%s%s" % (metric_collector_host, + params.metric_collector_port, + self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)) + for i in xrange(0, self.AMS_READ_TRIES): + conn = network.get_http_connection(metric_collector_host, int(params.metric_collector_port), params.metric_collector_https_enabled, ca_certs) - conn.request("POST", self.AMS_METRICS_POST_URL, metric_json, headers) - + conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters) response = conn.getresponse() - Logger.info("Http response: %s %s" % (response.status, response.reason)) - except (httplib.HTTPException, socket.error) as ex: - if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1 - time.sleep(self.AMS_CONNECT_TIMEOUT) - Logger.info("Connection failed. Next retry in %s seconds." - % (self.AMS_CONNECT_TIMEOUT)) - continue - else: - raise Fail("Metrics were not saved. Service check has failed. " - "\nConnection failed.") + Logger.info("Http response for host %s : %s %s" % (metric_collector_host, response.status, response.reason)) - data = response.read() - Logger.info("Http data: %s" % data) - conn.close() + data = response.read() + Logger.info("Http data: %s" % data) + conn.close() - if response.status == 200: - Logger.info("Metrics were saved.") - break - else: - Logger.info("Metrics were not saved. Service check has failed.") - if i < self.AMS_CONNECT_TRIES - 1: #range/xrange returns items from start to end-1 - time.sleep(self.AMS_CONNECT_TIMEOUT) - Logger.info("Next retry in %s seconds." - % (self.AMS_CONNECT_TIMEOUT)) + if response.status == 200: + Logger.info("Metrics were retrieved from host %s" % metric_collector_host) + else: + raise Fail("Metrics were not retrieved from host %s. GET request status: %s %s \n%s" % + (metric_collector_host, response.status, response.reason, data)) + data_json = json.loads(data) + + def floats_eq(f1, f2, delta): + return abs(f1-f2) < delta + + values_are_present = False + for metrics_data in data_json["metrics"]: + if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"] + and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001) + and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)): + Logger.info("Values %s and %s were found in the response from host %s." % (metric_collector_host, random_value1, current_time)) + values_are_present = True + break + pass + + if not values_are_present: + if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1 + Logger.info("Values weren't stored yet. Retrying in %s seconds." + % (self.AMS_READ_TIMEOUT)) + time.sleep(self.AMS_READ_TIMEOUT) + else: + raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time)) else: - raise Fail("Metrics were not saved. Service check has failed. POST request status: %s %s \n%s" % - (response.status, response.reason, data)) - - get_metrics_parameters = { - "metricNames": "AMBARI_METRICS.SmokeTest.FakeMetric", - "appId": "amssmoketestfake", - "hostname": params.hostname, - "startTime": current_time - 60000, - "endTime": current_time + 61000, - "precision": "seconds", - "grouped": "false", - } - encoded_get_metrics_parameters = urllib.urlencode(get_metrics_parameters) - - Logger.info("Connecting (GET) to %s:%s%s" % (params.metric_collector_host, - params.metric_collector_port, - self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters)) - for i in xrange(0, self.AMS_READ_TRIES): - conn = network.get_http_connection(params.metric_collector_host, - int(params.metric_collector_port), - params.metric_collector_https_enabled, - ca_certs) - conn.request("GET", self.AMS_METRICS_GET_URL % encoded_get_metrics_parameters) - response = conn.getresponse() - Logger.info("Http response: %s %s" % (response.status, response.reason)) - - data = response.read() - Logger.info("Http data: %s" % data) - conn.close() - - if response.status == 200: - Logger.info("Metrics were retrieved.") - else: - Logger.info("Metrics were not retrieved. Service check has failed.") - raise Fail("Metrics were not retrieved. Service check has failed. GET request status: %s %s \n%s" % - (response.status, response.reason, data)) - data_json = json.loads(data) - - def floats_eq(f1, f2, delta): - return abs(f1-f2) < delta - - values_are_present = False - for metrics_data in data_json["metrics"]: - if (str(current_time) in metrics_data["metrics"] and str(current_time + 1000) in metrics_data["metrics"] - and floats_eq(metrics_data["metrics"][str(current_time)], random_value1, 0.0000001) - and floats_eq(metrics_data["metrics"][str(current_time + 1000)], current_time, 1)): - Logger.info("Values %s and %s were found in the response." % (random_value1, current_time)) - values_are_present = True break pass + except Fail as ex: + Logger.warning("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex))) + raise Fail("Ambari Metrics service check failed on collector host %s. Reason : %s" % (metric_collector_host, str(ex))) + + @OsFamilyFuncImpl(os_family=OsFamilyImpl.DEFAULT) + def service_check(self, env): + import params + + Logger.info("Ambari Metrics service check was started.") + env.set_params(params) + + results = execute_in_parallel(self.service_check_for_single_host, params.ams_collector_hosts, params) - if not values_are_present: - if i < self.AMS_READ_TRIES - 1: #range/xrange returns items from start to end-1 - Logger.info("Values weren't stored yet. Retrying in %s seconds." - % (self.AMS_READ_TIMEOUT)) - time.sleep(self.AMS_READ_TIMEOUT) + for host in params.ams_collector_hosts: + if host in results: + if results[host].status == SUCCESS: + Logger.info("Ambari Metrics service check passed on host " + host) + return else: - Logger.info("Values %s and %s were not found in the response." % (random_value1, current_time)) - raise Fail("Values %s and %s were not found in the response." % (random_value1, current_time)) - else: - break - pass - Logger.info("Ambari Metrics service check is finished.") + Logger.warning(results[host].result) + raise Fail("All metrics collectors are unavailable.") + +def post_metrics_to_collector(ams_metrics_post_url, metric_collector_host, metric_collector_port, metric_collector_https_enabled, + metric_json, headers, ca_certs, tries = 1, connect_timeout = 10): + for i in xrange(0, tries): + try: + Logger.info("Generated metrics for host %s :\n%s" % (metric_collector_host, metric_json)) + + Logger.info("Connecting (POST) to %s:%s%s" % (metric_collector_host, + metric_collector_port, + ams_metrics_post_url)) + conn = network.get_http_connection(metric_collector_host, + int(metric_collector_port), + metric_collector_https_enabled, + ca_certs) + conn.request("POST", ams_metrics_post_url, metric_json, headers) + response = conn.getresponse() + Logger.info("Http response for host %s: %s %s" % (metric_collector_host, response.status, response.reason)) + except (httplib.HTTPException, socket.error) as ex: + if i < tries - 1: #range/xrange returns items from start to end-1 + time.sleep(connect_timeout) + Logger.info("Connection failed for host %s. Next retry in %s seconds." + % (metric_collector_host, connect_timeout)) + continue + else: + raise Fail("Metrics were not saved. Connection failed.") + + data = response.read() + Logger.info("Http data: %s" % data) + conn.close() + + if response.status == 200: + Logger.info("Metrics were saved.") + break + else: + Logger.info("Metrics were not saved.") + if i < tries - 1: #range/xrange returns items from start to end-1 + time.sleep(tries) + Logger.info("Next retry in %s seconds." + % (tries)) + else: + raise Fail("Metrics were not saved. POST request status: %s %s \n%s" % + (response.status, response.reason, data)) if __name__ == "__main__": AMSServiceCheck().execute() http://git-wip-us.apache.org/repos/asf/ambari/blob/875f1efb/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 ---------------------------------------------------------------------- diff --git a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 index 678d769..30870c4 100644 --- a/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 +++ b/ambari-server/src/main/resources/common-services/AMBARI_METRICS/0.1.0/package/templates/metrics_grafana_datasource.json.j2 @@ -20,7 +20,7 @@ "name": "{{ams_datasource_name}}", "type": "ambarimetrics", "access": "proxy", - "url": "{{metric_collector_protocol}}://{{metric_collector_host}}:{{metric_collector_port}}", + "url": "{{metric_collector_protocol}}://{{ams_datasource_host}}:{{metric_collector_port}}", "password": "", "user": "", "database": "",