Repository: ambari
Updated Branches:
  refs/heads/branch-3.0-perf ad1264127 -> 79a37e510


AMBARI-21979. Fixes to CommandStatusDict (aonishuk)


Project: http://git-wip-us.apache.org/repos/asf/ambari/repo
Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/79a37e51
Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/79a37e51
Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/79a37e51

Branch: refs/heads/branch-3.0-perf
Commit: 79a37e51088d84bd09857b02e2d9500be5ffbe73
Parents: ad12641
Author: Andrew Onishuk <aonis...@hortonworks.com>
Authored: Tue Sep 19 10:59:13 2017 +0300
Committer: Andrew Onishuk <aonis...@hortonworks.com>
Committed: Tue Sep 19 10:59:13 2017 +0300

----------------------------------------------------------------------
 ambari-agent/conf/unix/ambari-agent.ini         |  8 +++++---
 .../python/ambari_agent/AlertStatusReporter.py  |  1 +
 .../python/ambari_agent/CommandStatusDict.py    | 20 ++++++++++++++++----
 .../ambari_agent/CommandStatusReporter.py       |  1 +
 .../ambari_agent/ComponentStatusExecutor.py     |  9 +++++++--
 .../src/main/python/ambari_agent/Constants.py   |  4 +---
 .../python/ambari_agent/InitializerModule.py    |  2 ++
 7 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/conf/unix/ambari-agent.ini
----------------------------------------------------------------------
diff --git a/ambari-agent/conf/unix/ambari-agent.ini 
b/ambari-agent/conf/unix/ambari-agent.ini
index e1df1d2..c6cf803 100644
--- a/ambari-agent/conf/unix/ambari-agent.ini
+++ b/ambari-agent/conf/unix/ambari-agent.ini
@@ -34,11 +34,13 @@ cache_dir=/var/lib/ambari-agent/cache
 tolerate_download_failures=true
 run_as_user=root
 parallel_execution=0
-alert_grace_period=5
-status_command_timeout=5
-alert_reports_interval=5
 ; 0 - don't report commands output periodically. Reduces bandwidth on big 
cluster
+command_update_output=1
+alert_reports_interval=5
 command_reports_interval=5
+status_commands_run_interval=20
+alert_grace_period=5
+status_command_timeout=5
 alert_kinit_timeout=14400000
 system_resource_overrides=/etc/resource_overrides
 ; memory_threshold_soft_mb=400

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
index 084a342..2bd2383 100644
--- a/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/AlertStatusReporter.py
@@ -38,6 +38,7 @@ class AlertStatusReporter(threading.Thread):
     Run an endless loop which reports all the alert statuses got from collector
     """
     if self.alert_reports_interval == 0:
+      logger.warn("AlertStatusReporter is turned off. Some functionality might 
not work correctly.")
       return
 
     while not self.stop_event.is_set():

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
index fa71d15..c681550 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusDict.py
@@ -45,19 +45,31 @@ class CommandStatusDict():
     self.current_state = {} # Contains all statuses
     self.lock = threading.RLock()
     self.initializer_module = initializer_module
+    self.command_update_output = initializer_module.command_update_output
     self.reported_reports = set()
 
 
-  def put_command_status(self, command, new_report):
+  def put_command_status(self, command, report):
     """
     Stores new version of report for command (replaces previous)
     """
+    from ActionQueue import ActionQueue
+
     key = command['taskId']
-    with self.lock: # Synchronized
-      self.current_state[key] = (command, new_report)
+
+    # delete stale data about this command
+    with self.lock:
       self.reported_reports.discard(key)
+      self.current_state.pop(key, None)
+
+    is_sent = self.force_update_to_server({command['clusterId']: [report]})
+    updatable = report['status'] == ActionQueue.IN_PROGRESS_STATUS and 
self.command_update_output
 
-    self.force_update_to_server({command['clusterId']: [new_report]})
+    if not is_sent or updatable:
+      # if sending is not successful send later
+      with self.lock:
+        self.current_state[key] = (command, report)
+        self.reported_reports.discard(key)
 
   def force_update_to_server(self, reports_dict):
     if not self.initializer_module.is_registered:

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py 
b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
index f46c47a..652574f 100644
--- a/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
+++ b/ambari-agent/src/main/python/ambari_agent/CommandStatusReporter.py
@@ -36,6 +36,7 @@ class CommandStatusReporter(threading.Thread):
     Run an endless loop which reports all the commands results (IN_PROGRESS, 
FAILED, COMPLETE) every self.command_reports_interval seconds.
     """
     if self.command_reports_interval == 0:
+      logger.warn("CommandStatusReporter is turned off. Some functionality 
might not work correctly.")
       return
 
     while not self.stop_event.is_set():

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
----------------------------------------------------------------------
diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index d074786c..f85ba42 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -31,6 +31,7 @@ logger = logging.getLogger(__name__)
 class ComponentStatusExecutor(threading.Thread):
   def __init__(self, initializer_module):
     self.initializer_module = initializer_module
+    self.status_commands_run_interval = 
initializer_module.status_commands_run_interval
     self.metadata_cache = initializer_module.metadata_cache
     self.topology_cache = initializer_module.topology_cache
     self.customServiceOrchestrator = 
initializer_module.customServiceOrchestrator
@@ -41,8 +42,12 @@ class ComponentStatusExecutor(threading.Thread):
 
   def run(self):
     """
-    Run an endless loop which executes all status commands every 
Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS seconds.
+    Run an endless loop which executes all status commands every 
'status_commands_run_interval' seconds.
     """
+    if self.status_commands_run_interval == 0:
+      logger.warn("ComponentStatusExecutor is turned off. Some functionality 
might not work correctly.")
+      return
+
     while not self.stop_event.is_set():
       try:
         self.clean_not_existing_clusters_info()
@@ -118,7 +123,7 @@ class ComponentStatusExecutor(threading.Thread):
       except:
         logger.exception("Exception in ComponentStatusExecutor. Re-running it")
 
-      self.stop_event.wait(Constants.STATUS_COMMANDS_PACK_INTERVAL_SECONDS)
+      self.stop_event.wait(self.status_commands_run_interval)
     logger.info("ComponentStatusExecutor has successfully finished")
 
   def send_updates_to_server(self, cluster_reports):

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/Constants.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index e36eda5..6a258d2 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -44,6 +44,4 @@ HEARTBEAT_ENDPOINT = '/heartbeat'
 REGISTRATION_ENDPOINT = '/register'
 
 AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp"
-CORRELATION_ID_STRING = 'correlationId'
-
-STATUS_COMMANDS_PACK_INTERVAL_SECONDS = 20
\ No newline at end of file
+CORRELATION_ID_STRING = 'correlationId'
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ambari/blob/79a37e51/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
----------------------------------------------------------------------
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 4fbef65..0126250 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -61,6 +61,8 @@ class InitializerModule:
     self.cache_dir = self.config.get('agent', 'cache_dir', 
default='/var/lib/ambari-agent/cache')
     self.command_reports_interval = int(self.config.get('agent', 
'command_reports_interval', default='5'))
     self.alert_reports_interval = int(self.config.get('agent', 
'alert_reports_interval', default='5'))
+    self.status_commands_run_interval = int(self.config.get('agent', 
'status_commands_run_interval', default='20'))
+    self.command_update_output = bool(int(self.config.get('agent', 
'command_update_output', default='1')))
 
     self.cluster_cache_dir = os.path.join(self.cache_dir, 
FileCache.CLUSTER_CACHE_DIRECTORY)
     self.recovery_cache_dir = os.path.join(self.cache_dir, 
FileCache.RECOVERY_CACHE_DIRECTORY)

Reply via email to