Repository: ambari Updated Branches: refs/heads/branch-3.0-perf ad1264127 -> 79a37e510
AMBARI-21979. Fixes to CommandStatusDict (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79a37e51 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79a37e51 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79a37e51 Branch: refs/heads/branch-3.0-perf Commit: 79a37e51088d84bd09857b02e2d9500be5ffbe73 Parents: ad12641 Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Tue Sep 19 10:59:13 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Tue Sep 19 10:59:13 2017 +0300 ---------------------------------------------------------------------- ambari-agent/conf/unix/ambari-agent.ini | 8 +++++--- .../python/ambari_agent/AlertStatusReporter.py | 1 + .../python/ambari_agent/CommandStatusDict.py | 20 ++++++++++++++++---- .../ambari_agent/CommandStatusReporter.py | 1 + .../ambari_agent/ComponentStatusExecutor.py | 9 +++++++-- .../src/main/python/ambari_agent/Constants.py | 4 +--- .../python/ambari_agent/InitializerModule.py | 2 ++ 7 files changed, 33 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/conf/unix/ambari-agent.ini ---------------------------------------------------------------------- diff --git a/ambari-agent/conf/unix/ambari-agent.ini b/ambari-agent/conf/unix/ambari-agent.ini index e1df1d2..c6cf803 100644 --- a/ambari-agent/conf/unix/ambari-agent.ini +++ b/ambari-agent/conf/unix/ambari-agent.ini @@ -34,11 +34,13 @@ cache_dir=/var/lib/ambari-agent/cache tolerate_download_failures=true run_as_user=root parallel_execution=0 -alert_grace_period=5 -status_command_timeout=5 -alert_reports_interval=5 ; 0 - don't report commands output periodically. Reduces bandwidth on big cluster +command_update_output=1 +alert_reports_interval=5 command_reports_interval=5 +status_commands_run_interval=20 +alert_grace_period=5 +status_command_timeout=5 alert_kinit_timeout=14400000 system_resource_overrides=/etc/resource_overrides ; memory_threshold_soft_mb=400 http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py index 084a342..2bd2383 100644 --- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py @@ -38,6 +38,7 @@ class AlertStatusReporter(threading.Thread): Run an endless loop which reports all the alert statuses got from collector """ if self.alert_reports_interval == 0: + logger.warn("AlertStatusReporter is turned off. Some functionality might not work correctly.") return while not self.stop_event.is_set(): http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py index fa71d15..c681550 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py @@ -45,19 +45,31 @@ class CommandStatusDict(): self.current_state = {} # Contains all statuses self.lock = threading.RLock() self.initializer_module = initializer_module + self.command_update_output = initializer_module.command_update_output self.reported_reports = set() - def put_command_status(self, command, new_report): + def put_command_status(self, command, report): """ Stores new version of report for command (replaces previous) """ + from ActionQueue import ActionQueue + key = command['taskId'] - with self.lock: # Synchronized - self.current_state[key] = (command, new_report) + + # delete stale data about this command + with self.lock: self.reported_reports.discard(key) + self.current_state.pop(key, None) + + is_sent = self.force_update_to_server({command['clusterId']: [report]}) + updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and self.command_update_output - self.force_update_to_server({command['clusterId']: [new_report]}) + if not is_sent or updatable: + # if sending is not successful send later + with self.lock: + self.current_state[key] = (command, report) + self.reported_reports.discard(key) def force_update_to_server(self, reports_dict): if not self.initializer_module.is_registered: http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py index f46c47a..652574f 100644 --- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py +++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py @@ -36,6 +36,7 @@ class CommandStatusReporter(threading.Thread): Run an endless loop which reports all the commands results (IN_PROGRESS, FAILED, COMPLETE) every self.command_reports_interval seconds. """ if self.command_reports_interval == 0: + logger.warn("CommandStatusReporter is turned off. Some functionality might not work correctly.") return while not self.stop_event.is_set(): http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index d074786c..f85ba42 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -31,6 +31,7 @@ logger = logging.getLogger(__name__) class ComponentStatusExecutor(threading.Thread): def __init__(self, initializer_module): self.initializer_module = initializer_module + self.status_commands_run_interval = initializer_module.status_commands_run_interval self.metadata_cache = initializer_module.metadata_cache self.topology_cache = initializer_module.topology_cache self.customServiceOrchestrator = initializer_module.customServiceOrchestrator @@ -41,8 +42,12 @@ class ComponentStatusExecutor(threading.Thread): def run(self): """ - Run an endless loop which executes all status commands every Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS seconds. + Run an endless loop which executes all status commands every 'status_commands_run_interval' seconds. """ + if self.status_commands_run_interval == 0: + logger.warn("ComponentStatusExecutor is turned off. Some functionality might not work correctly.") + return + while not self.stop_event.is_set(): try: self.clean_not_existing_clusters_info() @@ -118,7 +123,7 @@ class ComponentStatusExecutor(threading.Thread): except: logger.exception("Exception in ComponentStatusExecutor. Re-running it") - self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS) + self.stop_event.wait(self.status_commands_run_interval) logger.info("ComponentStatusExecutor has successfully finished") def send_updates_to_server(self, cluster_reports): http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index e36eda5..6a258d2 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -44,6 +44,4 @@ HEARTBEAT_ENDPOINT = '/heartbeat' REGISTRATION_ENDPOINT = '/register' AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp" -CORRELATION_ID_STRING = 'correlationId' - -STATUS_COMMANDS_PACK_INTERVAL_SECONDS = 20 \ No newline at end of file +CORRELATION_ID_STRING = 'correlationId' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 4fbef65..0126250 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -61,6 +61,8 @@ class InitializerModule: self.cache_dir = self.config.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') self.command_reports_interval = int(self.config.get('agent', 'command_reports_interval', default='5')) self.alert_reports_interval = int(self.config.get('agent', 'alert_reports_interval', default='5')) + self.status_commands_run_interval = int(self.config.get('agent', 'status_commands_run_interval', default='20')) + self.command_update_output = bool(int(self.config.get('agent', 'command_update_output', default='1'))) self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) self.recovery_cache_dir = os.path.join(self.cache_dir, FileCache.RECOVERY_CACHE_DIRECTORY)