Repository: incubator-eagle Updated Branches: refs/heads/master b12abb38d -> b27998f82
[EAGLE-838] Resolve defunct process in hadoop jmx script by kill -9 https://issues.apache.org/jira/browse/EAGLE-838 - Resolve defunct process in hadoop jmx script by kill -9 - Support configurable log file path - Improve JMX reader to multiple-threading Author: Hao Chen <h...@apache.org> Closes #733 from haoch/FixDefunctProcess. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/b27998f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/b27998f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/b27998f8 Branch: refs/heads/master Commit: b27998f822a492c33ed587d729ba894280660ab1 Parents: b12abb3 Author: Hao Chen <h...@apache.org> Authored: Tue Dec 13 10:41:30 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Tue Dec 13 10:41:30 2016 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_config-sample.json | 4 +- .../hadoop_jmx_collector/hadoop_jmx_kafka.py | 2 +- .../hbase_jmx_config-sample.json | 4 +- .../hadoop_jmx_collector/metric_collector.py | 143 +++++++++++++------ 4 files changed, 105 insertions(+), 48 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json index f5c16fd..a6ddf7d 100755 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_config-sample.json @@ -1,6 +1,8 @@ { "env": { - "site": "sandbox" + "site": "sandbox", + "metric_prefix": "hadoop.", + "log_file": "/tmp/hadoop-jmx-collector.log" }, "input": [ { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py index 799d351..be7b9c7 100644 --- a/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py +++ b/eagle-external/hadoop_jmx_collector/hadoop_jmx_kafka.py @@ -17,7 +17,7 @@ # from metric_collector import JmxMetricCollector,JmxMetricListener,Runner -import json, logging, fnmatch +import json, logging, fnmatch, sys class NNSafeModeMetric(JmxMetricListener): def on_metric(self, metric): http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json index 2275ccc..c37a9ae 100644 --- a/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json +++ b/eagle-external/hadoop_jmx_collector/hbase_jmx_config-sample.json @@ -1,6 +1,8 @@ { "env": { - "site": "sandbox" + "site": "sandbox", + "metric_prefix": "hadoop.", + "log_file": "/tmp/hadoop-jmx-collector.log" }, "input": [ { http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/b27998f8/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index c4fc457..6205e1f 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -16,18 +16,18 @@ # limitations under the License. # -import os import re import time import json import urllib2 import sys import socket -import types import httplib import logging import threading import fnmatch +import subprocess +import os # load six sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/six')) @@ -37,11 +37,6 @@ import six sys.path.append(os.path.join(os.path.dirname(os.path.abspath(__file__)), '', 'lib/kafka-python')) from kafka import KafkaClient, SimpleProducer, SimpleConsumer -logging.basicConfig(level=logging.INFO, - format='%(asctime)s %(name)-12s %(levelname)-6s %(message)s', - datefmt='%m-%d %H:%M') - - class Helper: def __init__(self): pass @@ -49,24 +44,32 @@ class Helper: @staticmethod def load_config(config_file="config.json"): """ - :param config_file: :return: """ - abs_file_path = config_file + abs_file_path = config_file if not os.path.isfile(abs_file_path): script_dir = os.path.dirname(__file__) rel_path = "./" + config_file abs_file_path = os.path.join(script_dir, rel_path) if not os.path.isfile(abs_file_path): raise Exception(abs_file_path + " doesn't exist, please rename config-sample.json to config.json") - - logging.info("Using configuration file " + abs_file_path) f = open(abs_file_path, 'r') json_file = f.read() f.close() config = json.loads(json_file) + + if config["env"].has_key("log_file"): + logging.basicConfig(filename=config["env"]["log_file"], filemode='w',level=logging.INFO, + format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s', + datefmt='%m-%d %H:%M') + else: + logging.basicConfig(level=logging.INFO, + format='%(asctime)s %(name)s %(threadName)s %(levelname)s %(message)s', + datefmt='%m-%d %H:%M') + + logging.info("Loaded config from %s", abs_file_path) return config @staticmethod @@ -103,12 +106,12 @@ class Helper: try: if https: logging.info("Reading https://" + str(url) + path) - c = httplib.HTTPSConnection(url, timeout=28) + c = httplib.HTTPSConnection(url, timeout=30) c.request("GET", path) response = c.getresponse() else: logging.info("Reading http://" + str(url) + path) - response = urllib2.urlopen("http://" + str(url) + path, timeout=28) + response = urllib2.urlopen("http://" + str(url) + path, timeout=30) logging.debug("Got response") result = response.read() break @@ -197,6 +200,9 @@ class MetricSender(object): class KafkaMetricSender(MetricSender): + start_time = time.time() + end_time = time.time() + def __init__(self, config): super(KafkaMetricSender, self).__init__(config) kafka_config = config["output"]["kafka"] @@ -234,9 +240,11 @@ class KafkaMetricSender(MetricSender): return self.default_topic def open(self): - self.kafka_client = KafkaClient(self.broker_list, timeout=59) + logging.info("Opening kafka connection for producer") + self.kafka_client = KafkaClient(self.broker_list, timeout=55) self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=True, batch_send_every_n=500, batch_send_every_t=30) + self.start_time = time.time() def send(self, msg): if self.debug_enabled: @@ -245,15 +253,19 @@ class KafkaMetricSender(MetricSender): self.kafka_producer.send_messages(self.get_topic_id(msg), json.dumps(msg)) def close(self): - logging.info("Totally sent " + str(self.sent_count) + " metric events") + logging.info("Closing kafka connection and producer") if self.kafka_producer is not None: self.kafka_producer.stop() if self.kafka_client is not None: self.kafka_client.close() + self.end_time = time.time() + logging.info("Totally sent " + str(self.sent_count) + " metric events in "+str(self.end_time - self.start_time)+" sec") + class MetricCollector(threading.Thread): filters = [] config = None + closed = False def __init__(self, config=None): threading.Thread.__init__(self) @@ -303,11 +315,14 @@ class MetricCollector(threading.Thread): def close(self): self.sender.close() + self.closed = True + + def is_closed(self): + return self.closed def run(self): raise Exception("`run` method should be overrode by sub-class before being called") - class Runner(object): @staticmethod def run(*collectors): @@ -317,29 +332,44 @@ class Runner(object): :param threads: :return: """ - argv = sys.argv - if len(argv) == 1: - config = Helper.load_config() - elif len(argv) == 2: - config = Helper.load_config(argv[1]) - else: - raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) - - for collector in collectors: - try: - collector.init(config) - collector.start() - except Exception as e: - logging.exception(e) - - for collector in collectors: - try: - collector.join() - except Exception as e: + current_pid = os.getpid() + try: + argv = sys.argv + if len(argv) == 1: + config = Helper.load_config() + elif len(argv) == 2: + config = Helper.load_config(argv[1]) + else: + raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) + + logging.info("PID: %s", current_pid) + for collector in collectors: + try: + collector.init(config) + collector.start() + except Exception as e: + logging.exception(e) + + for collector in collectors: + try: + collector.join(timeout = 55) + collector.close() + except BaseException as e: + logging.exception(e) + exit(0) + except BaseException as e: + if isinstance(e, SystemExit): + logging.info("Exit code: %s", e) + else: logging.exception(e) - finally: - collector.close() - + exit(1) + logging.info("Exit code: 1") + finally: + for collector in collectors: + if not collector.is_closed(): + collector.close() + logging.info("Ensuring process stopped: kill -9 %s", current_pid) + subprocess.call(["kill","-9",str(current_pid)]) class JmxMetricCollector(MetricCollector): selected_domain = None @@ -373,13 +403,34 @@ class JmxMetricCollector(MetricCollector): listener.init(self) self.listeners.append(listener) + def jmx_reader(self, source): + host = source["host"] + port=source["port"] + https=source["https"] + protocol = "https" if https else "http" + try: + beans = JmxReader(host, port, https).open().get_jmx_beans() + self.on_beans(source, beans) + except Exception as e: + jmx_url = protocol+"://"+str(host) + ":" + str(port) + logging.error("Failed to read jmx for " + jmx_url) + logging.exception(e) + def run(self): - for input in self.input_components: - try: - beans = JmxReader(input["host"], input["port"], input["https"]).open().get_jmx_beans() - self.on_beans(input, beans) - except Exception as e: - logging.exception("Failed to read jmx for " + str(input)) + size=str(len(self.input_components)) + logging.info("Starting jmx reading threads (num: " + size + ")") + reader_threads = [] + for source in self.input_components: + reader_thread=threading.Thread(target=self.jmx_reader, args=[source]) + reader_thread.daemon = True + logging.info(reader_thread.name + " starting") + reader_thread.start() + reader_threads.append(reader_thread) + for reader_thread in reader_threads: + logging.info(reader_thread.name + " stopping") + reader_thread.join(timeout = 55) + + logging.info("Jmx reading threads (num: "+size+") finished") def filter_bean(self, bean, mbean_domain): return mbean_domain in self.selected_domain @@ -436,6 +487,9 @@ class JmxMetricCollector(MetricCollector): metric_prefix_name = '.'.join([i[1] for i in mbean_list]) return (self.metric_prefix + metric_prefix_name).replace(" ", "").lower() + def close(self): + super(JmxMetricCollector, self).close() + # ======================== # Metric Listeners # ======================== @@ -450,7 +504,6 @@ class JmxMetricListener: def on_metric(self, metric): pass - # ======================== # Metric Filters # ========================