Repository: incubator-eagle Updated Branches: refs/heads/master 9ca2cebae -> 880ba738c
[MINOR] Modify timeout settings and support single-process mode Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/880ba738 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/880ba738 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/880ba738 Branch: refs/heads/master Commit: 880ba738c54251734da14411cf873d5aebd13d76 Parents: 9ca2ceb Author: Hao Chen <h...@apache.org> Authored: Wed Dec 14 16:23:45 2016 +0800 Committer: Hao Chen <h...@apache.org> Committed: Wed Dec 14 16:23:45 2016 +0800 ---------------------------------------------------------------------- .../hadoop_jmx_collector/metric_collector.py | 32 +++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/880ba738/eagle-external/hadoop_jmx_collector/metric_collector.py ---------------------------------------------------------------------- diff --git a/eagle-external/hadoop_jmx_collector/metric_collector.py b/eagle-external/hadoop_jmx_collector/metric_collector.py index c7a5599..0f09a9e 100644 --- a/eagle-external/hadoop_jmx_collector/metric_collector.py +++ b/eagle-external/hadoop_jmx_collector/metric_collector.py @@ -241,7 +241,7 @@ class KafkaMetricSender(MetricSender): def open(self): logging.info("Opening kafka connection for producer") - self.kafka_client = KafkaClient(self.broker_list, timeout=55) + self.kafka_client = KafkaClient(self.broker_list, timeout=50) self.kafka_producer = SimpleProducer(self.kafka_client, batch_send=False, batch_send_every_n=500, batch_send_every_t=30) self.start_time = time.time() @@ -340,18 +340,17 @@ class Runner(object): logging.exception(e) for collector in collectors: collector.join(timeout=55) - exit(0) + collector.close() except BaseException as e: if not isinstance(e, SystemExit): logging.exception(e) - exit(1) finally: for collector in collectors: if not collector.is_closed(): collector.close() @staticmethod - def run(*collectors): + def run_async(*collectors): config = None argv = sys.argv if len(argv) == 1: @@ -368,7 +367,7 @@ class Runner(object): logging.info("Starting %s", sub_process) sub_process.start() logging.info("Current PID: %s, subprocess PID: %s", current_process.pid, sub_process.pid) - sub_process.join(timeout = 55) + sub_process.join(timeout = 56) except BaseException as e: logging.exception(e) finally: @@ -378,6 +377,25 @@ class Runner(object): logging.info("%s exit code: %s", sub_process, sub_process.exitcode) exit(0) + @staticmethod + def run(*collectors): + config = None + argv = sys.argv + current_process=multiprocessing.current_process() + if len(argv) == 1: + config = Helper.load_config() + elif len(argv) == 2: + config = Helper.load_config(argv[1]) + else: + raise Exception("Usage: " + argv[0] + " CONFIG_FILE_PATH, but given too many arguments: " + str(argv)) + try: + Runner.worker(collectors, config) + except BaseException as e: + logging.exception(e) + finally: + logging.info("%s (PID: %s) exit", current_process.name, current_process.pid) + exit(0) + class JmxMetricCollector(MetricCollector): selected_domain = None listeners = [] @@ -429,13 +447,13 @@ class JmxMetricCollector(MetricCollector): reader_threads = [] for source in self.input_components: reader_thread=threading.Thread(target=self.jmx_reader, args=[source]) - reader_thread.daemon = True + reader_thread.daemon = False logging.info(reader_thread.name + " starting") reader_thread.start() reader_threads.append(reader_thread) for reader_thread in reader_threads: logging.info(reader_thread.name + " stopping") - reader_thread.join(timeout = 55) + reader_thread.join(timeout = 45) logging.info("Jmx reading threads (num: "+size+") finished")