Repository: aurora Updated Branches: refs/heads/master b8f72d146 -> 33acb899b
Make Thermos observer resource collection intervals configurable We have noticed that on hosts with lots of active tasks (~100) the observer UI is not usable. Thermos fully utilizes one core but does not render any requests. Dumping `/threads` indicates the observer might be backlogged by the hundred concurrent `TaskResourceMonitor` threads. Due to the Python GIL only one can make progress at a time though. This patch is now adding options to control the resource collection interval, giving operators a possibility to reduce the CPU pressure. Testing Done: ./pants test.pytest src/{test,main}/python:: -- -v Bugs closed: AURORA-1907 Reviewed at https://reviews.apache.org/r/57757/ Project: http://git-wip-us.apache.org/repos/asf/aurora/repo Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/33acb899 Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/33acb899 Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/33acb899 Branch: refs/heads/master Commit: 33acb899b8cbfd9914f028524cdd9428beeb06e3 Parents: b8f72d1 Author: Stephan Erb <s...@apache.org> Authored: Tue Mar 21 09:29:41 2017 +0100 Committer: Stephan Erb <s...@apache.org> Committed: Tue Mar 21 09:29:41 2017 +0100 ---------------------------------------------------------------------- RELEASE-NOTES.md | 12 +-- docs/README.md | 1 + docs/reference/observer-configuration.md | 89 ++++++++++++++++++++ .../apache/aurora/tools/thermos_observer.py | 24 +++++- .../apache/thermos/monitoring/resource.py | 25 ++++-- .../apache/thermos/observer/task_observer.py | 21 +++-- .../tools/test_thermos_observer_entry_point.py | 40 --------- 7 files changed, 149 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/RELEASE-NOTES.md ---------------------------------------------------------------------- diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index d58d2bd..5babea5 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -1,12 +1,12 @@ -0.18.0 -===== +0.18.0 (unreleased) +=================== ### New/updated: - Add message parameter to `killTasks` RPC. -- Add prune_tasks endpoint to aurora_admin. See aurora_admin prune_tasks -h for usage information. +- Add `prune_tasks` endpoint to `aurora_admin`. See `aurora_admin prune_tasks -h` for usage information. - Add support for per-task volume mounts for Mesos containers to the Aurora config DSL. -* Added the `-mesos_driver` flag to the scheduler with three possible options: +- Added the `-mesos_driver` flag to the scheduler with three possible options: `SCHEDULER_DRIVER`, `V0_MESOS`, `V1_MESOS`. The first uses the original driver and the latter two use two new drivers from `libmesos`. `V0_MESOS` uses the `SCHEDULER_DRIVER` under the hood and `V1_MESOS` uses a new HTTP API aware @@ -14,7 +14,9 @@ Performance sensitive users should stick with the `SCHEDULER_DRIVER` or `V0_MESOS` drivers. - Add support for new MesosContainerizer rolled out in Mesos 1.2.0. -* Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0. + Please upgrade Aurora to 0.18 before upgrading Mesos to 1.2.0. +- Add observer command line options to control the resource collection interval + for observed tasks. See [here](docs/reference/observer-configuration.md) for details. 0.17.0 ====== http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/README.md ---------------------------------------------------------------------- diff --git a/docs/README.md b/docs/README.md index 1d679e2..dfd3a23 100644 --- a/docs/README.md +++ b/docs/README.md @@ -54,6 +54,7 @@ The complete reference of commands, configuration options, and scheduler interna - [Client Hooks](reference/client-hooks.md) - [Client Cluster Configuration](reference/client-cluster-configuration.md) * [Scheduler Configuration](reference/scheduler-configuration.md) + * [Observer Configuration](reference/observer-configuration.md) ## Additional Resources * [Tools integrating with Aurora](additional-resources/tools.md) http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/docs/reference/observer-configuration.md ---------------------------------------------------------------------- diff --git a/docs/reference/observer-configuration.md b/docs/reference/observer-configuration.md new file mode 100644 index 0000000..8a443c9 --- /dev/null +++ b/docs/reference/observer-configuration.md @@ -0,0 +1,89 @@ +# Observer Configuration Reference + +The Aurora/Thermos observer can take a variety of configuration options through command-line arguments. +A list of the available options can be seen by running `thermos_observer --long-help`. + +Please refer to the [Operator Configuration Guide](../operations/configuration.md) for details on how +to properly set the most important options. + +``` +$ thermos_observer.pex --long-help +Options: + -h, --help, --short-help + show this help message and exit. + --long-help show options from all registered modules, not just the + __main__ module. + --mesos-root=MESOS_ROOT + The mesos root directory to search for Thermos + executor sandboxes [default: /var/lib/mesos] + --ip=IP The IP address the observer will bind to. [default: + 0.0.0.0] + --port=PORT The port on which the observer should listen. + [default: 1338] + --polling_interval_secs=POLLING_INTERVAL_SECS + The number of seconds between observer refresh + attempts. [default: 5] + --task_process_collection_interval_secs=TASK_PROCESS_COLLECTION_INTERVAL_SECS + The number of seconds between per task process + resource collections. [default: 20] + --task_disk_collection_interval_secs=TASK_DISK_COLLECTION_INTERVAL_SECS + The number of seconds between per task disk resource + collections. [default: 60] + + From module twitter.common.app: + --app_daemonize Daemonize this application. [default: False] + --app_profile_output=FILENAME + Dump the profiling output to a binary profiling + format. [default: None] + --app_daemon_stderr=TWITTER_COMMON_APP_DAEMON_STDERR + Direct this app's stderr to this file if daemonized. + [default: /dev/null] + --app_debug Print extra debugging information during application + initialization. [default: False] + --app_rc_filename Print the filename for the rc file and quit. [default: + False] + --app_daemon_stdout=TWITTER_COMMON_APP_DAEMON_STDOUT + Direct this app's stdout to this file if daemonized. + [default: /dev/null] + --app_profiling Run profiler on the code while it runs. Note this can + cause slowdowns. [default: False] + --app_ignore_rc_file + Ignore default arguments from the rc file. [default: + False] + --app_pidfile=TWITTER_COMMON_APP_PIDFILE + The pidfile to use if --app_daemonize is specified. + [default: None] + + From module twitter.common.log.options: + --log_to_stdout=[scheme:]LEVEL + OBSOLETE - legacy flag, use --log_to_stderr instead. + [default: ERROR] + --log_to_stderr=[scheme:]LEVEL + The level at which logging to stderr [default: ERROR]. + Takes either LEVEL or scheme:LEVEL, where LEVEL is one + of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL'] + and scheme is one of ['google', 'plain']. + --log_to_disk=[scheme:]LEVEL + The level at which logging to disk [default: INFO]. + Takes either LEVEL or scheme:LEVEL, where LEVEL is one + of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL'] + and scheme is one of ['google', 'plain']. + --log_dir=DIR The directory into which log files will be generated + [default: /var/tmp]. + --log_simple Write a single log file rather than one log file per + log level [default: False]. + --log_to_scribe=[scheme:]LEVEL + The level at which logging to scribe [default: NONE]. + Takes either LEVEL or scheme:LEVEL, where LEVEL is one + of ['INFO', 'NONE', 'WARN', 'ERROR', 'DEBUG', 'FATAL'] + and scheme is one of ['google', 'plain']. + --scribe_category=CATEGORY + The category used when logging to the scribe daemon. + [default: python_default]. + --scribe_buffer Buffer messages when scribe is unavailable rather than + dropping them. [default: False]. + --scribe_host=HOST The host running the scribe daemon. [default: + localhost]. + --scribe_port=PORT The port used to connect to the scribe daemon. + [default: 1463]. +``` http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/aurora/tools/thermos_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py b/src/main/python/apache/aurora/tools/thermos_observer.py index 4bba019..0318f99 100644 --- a/src/main/python/apache/aurora/tools/thermos_observer.py +++ b/src/main/python/apache/aurora/tools/thermos_observer.py @@ -25,6 +25,7 @@ from twitter.common.log.options import LogOptions from twitter.common.quantity import Amount, Time from apache.aurora.executor.common.path_detector import MesosPathDetector +from apache.thermos.monitoring.resource import TaskResourceMonitor from apache.thermos.observer.http.configure import configure_server from apache.thermos.observer.task_observer import TaskObserver @@ -60,6 +61,22 @@ app.add_option( help='The number of seconds between observer refresh attempts.') +app.add_option( + '--task_process_collection_interval_secs', + dest='task_process_collection_interval_secs', + type='int', + default=int(TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL.as_(Time.SECONDS)), + help='The number of seconds between per task process resource collections.') + + +app.add_option( + '--task_disk_collection_interval_secs', + dest='task_disk_collection_interval_secs', + type='int', + default=int(TaskResourceMonitor.DISK_COLLECTION_INTERVAL.as_(Time.SECONDS)), + help='The number of seconds between per task disk resource collections.') + + # Allow an interruptible sleep so that ^C works. def sleep_forever(): while True: @@ -68,8 +85,11 @@ def sleep_forever(): def initialize(options): path_detector = MesosPathDetector(options.mesos_root) - polling_interval = Amount(options.polling_interval_secs, Time.SECONDS) - return TaskObserver(path_detector, interval=polling_interval) + return TaskObserver( + path_detector, + Amount(options.polling_interval_secs, Time.SECONDS), + Amount(options.task_process_collection_interval_secs, Time.SECONDS), + Amount(options.task_disk_collection_interval_secs, Time.SECONDS)) def handle_error(exc_type, value, traceback): http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/monitoring/resource.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py index 53d0ff1..4346666 100644 --- a/src/main/python/apache/thermos/monitoring/resource.py +++ b/src/main/python/apache/thermos/monitoring/resource.py @@ -118,15 +118,17 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): """ MAX_HISTORY = 10000 # magic number + PROCESS_COLLECTION_INTERVAL = Amount(20, Time.SECONDS) + DISK_COLLECTION_INTERVAL = Amount(60, Time.SECONDS) + HISTORY_TIME = Amount(1, Time.HOURS) def __init__(self, task_id, task_monitor, - process_collector=ProcessTreeCollector, disk_collector=DiskCollector, - process_collection_interval=Amount(20, Time.SECONDS), - disk_collection_interval=Amount(1, Time.MINUTES), - history_time=Amount(1, Time.HOURS)): + process_collection_interval=PROCESS_COLLECTION_INTERVAL, + disk_collection_interval=DISK_COLLECTION_INTERVAL, + history_time=HISTORY_TIME): """ task_monitor: TaskMonitor object specifying the task whose resources should be monitored sandbox: Directory for which to monitor disk utilisation @@ -135,7 +137,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): self._task_id = task_id log.debug('Initialising resource collection for task %s' % self._task_id) self._process_collectors = dict() # ProcessStatus => ProcessTreeCollector - self._process_collector_factory = process_collector self._disk_collector_class = disk_collector self._disk_collector = None self._process_collection_interval = process_collection_interval.as_(Time.SECONDS) @@ -167,7 +168,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): else: # Since this might be called out of band (before the main loop is aware of the process) if process not in self._process_collectors: - self._process_collectors[process] = self._process_collector_factory(process.pid) + self._process_collectors[process] = ProcessTreeCollector(process.pid) self._process_collectors[process].sample() return self._process_collectors[process].value @@ -189,7 +190,6 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): next_disk_collection = 0 while not self._kill_signal.is_set(): - now = time.time() if now > next_process_collection: @@ -199,7 +199,7 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): for process in current - actives: self._process_collectors.pop(process) for process in actives - current: - self._process_collectors[process] = self._process_collector_factory(process.pid) + self._process_collectors[process] = ProcessTreeCollector(process.pid) for process, collector in self._process_collectors.items(): collector.sample() @@ -223,6 +223,9 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): except ValueError as err: log.warning("Error recording resource sample: %s" % err) + log.debug("TaskResourceMonitor: finished collection of %s in %.2fs" % ( + self._task_id, (time.time() - now))) + # Sleep until any of the following conditions are met: # - it's time for the next disk collection # - it's time for the next process collection @@ -236,6 +239,10 @@ class TaskResourceMonitor(ResourceMonitorBase, ExceptionalThread): else: waiter = self._kill_signal - waiter.wait(timeout=max(0, next_collection)) + if next_collection > 0: + waiter.wait(timeout=next_collection) + else: + log.warning('Task resource collection is backlogged. Consider increasing ' + 'process_collection_interval and disk_collection_interval.') log.debug('Stopping resource monitoring for task "%s"' % self._task_id) http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/main/python/apache/thermos/observer/task_observer.py ---------------------------------------------------------------------- diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py index 1485de8..4bb5d23 100644 --- a/src/main/python/apache/thermos/observer/task_observer.py +++ b/src/main/python/apache/thermos/observer/task_observer.py @@ -21,6 +21,7 @@ polls a designated Thermos checkpoint root and collates information about all ta """ import os import threading +import time from operator import attrgetter from twitter.common import log @@ -31,7 +32,7 @@ from twitter.common.quantity import Amount, Time from apache.thermos.common.path import TaskPath from apache.thermos.monitoring.monitor import TaskMonitor from apache.thermos.monitoring.process import ProcessSample -from apache.thermos.monitoring.resource import ResourceMonitorBase, TaskResourceMonitor +from apache.thermos.monitoring.resource import TaskResourceMonitor from .detector import ObserverTaskDetector from .observed_task import ActiveObservedTask, FinishedObservedTask @@ -55,17 +56,17 @@ class TaskObserver(ExceptionalThread, Lockable): def __init__(self, path_detector, - resource_monitor_class=TaskResourceMonitor, - interval=POLLING_INTERVAL): + interval=POLLING_INTERVAL, + task_process_collection_interval=TaskResourceMonitor.PROCESS_COLLECTION_INTERVAL, + task_disk_collection_interval=TaskResourceMonitor.DISK_COLLECTION_INTERVAL): self._detector = ObserverTaskDetector( path_detector, self.__on_active, self.__on_finished, self.__on_removed) - if not issubclass(resource_monitor_class, ResourceMonitorBase): - raise ValueError("resource monitor class must implement ResourceMonitorBase!") - self._resource_monitor_class = resource_monitor_class self._interval = interval + self._task_process_collection_interval = task_process_collection_interval + self._task_disk_collection_interval = task_disk_collection_interval self._active_tasks = {} # task_id => ActiveObservedTask self._finished_tasks = {} # task_id => FinishedObservedTask self._stop_event = threading.Event() @@ -100,7 +101,11 @@ class TaskObserver(ExceptionalThread, Lockable): log.error('Found an active task (%s) in finished tasks?' % task_id) return task_monitor = TaskMonitor(root, task_id) - resource_monitor = self._resource_monitor_class(task_id, task_monitor) + resource_monitor = TaskResourceMonitor( + task_id, + task_monitor, + process_collection_interval=self._task_process_collection_interval, + disk_collection_interval=self._task_disk_collection_interval) resource_monitor.start() self._active_tasks[task_id] = ActiveObservedTask( root, @@ -132,7 +137,9 @@ class TaskObserver(ExceptionalThread, Lockable): while not self._stop_event.is_set(): self._stop_event.wait(self._interval.as_(Time.SECONDS)) with self.lock: + start = time.time() self._detector.refresh() + log.debug("TaskObserver: finished checkpoint refresh in %.2fs" % (time.time() - start)) @Lockable.sync def process_from_name(self, task_id, process_id): http://git-wip-us.apache.org/repos/asf/aurora/blob/33acb899/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py ---------------------------------------------------------------------- diff --git a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py deleted file mode 100644 index e1c8dec..0000000 --- a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py +++ /dev/null @@ -1,40 +0,0 @@ -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import os -import unittest - -from mock import Mock, create_autospec, patch -from twitter.common.quantity import Amount, Time - -from apache.aurora.tools.thermos_observer import initialize -from apache.thermos.observer.task_observer import TaskObserver - - -class ThermosObserverMainTest(unittest.TestCase): - def test_initialize(self): - expected_interval = Amount(15, Time.SECONDS) - mock_options = Mock(spec_set=['root', 'mesos_root', 'polling_interval_secs']) - mock_options.root = '' - mock_options.mesos_root = os.path.abspath('.') - mock_options.polling_interval_secs = int(expected_interval.as_(Time.SECONDS)) - mock_task_observer = create_autospec(spec=TaskObserver) - with patch( - 'apache.aurora.tools.thermos_observer.TaskObserver', - return_value=mock_task_observer) as mock_observer: - - initialize(mock_options) - - assert len(mock_observer.mock_calls) == 1 - args = mock_observer.mock_calls[0][2] - assert expected_interval == args['interval']