This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit 1aae0de04bb29b9e14603722be18fe8393c4fbd8
Author: Andrew Onishuk <aonis...@hortonworks.com>
AuthorDate: Wed Sep 26 11:26:44 2018 +0300

    AMBARI-24689. All component statuses should be re-send on registration 
(aonishuk)
---
 .../python/ambari_agent/ComponentStatusExecutor.py | 47 +++++++++++++++++-----
 .../main/python/ambari_agent/HeartbeatThread.py    |  6 ++-
 .../python/resource_management/TestLinkResource.py |  2 +-
 3 files changed, 43 insertions(+), 12 deletions(-)

diff --git 
a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py 
b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
index df72c88..7bf00df 100644
--- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
+++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py
@@ -43,6 +43,7 @@ class ComponentStatusExecutor(threading.Thread):
     self.logger = logging.getLogger(__name__)
     self.reports_to_discard = []
     self.reports_to_discard_lock = threading.RLock()
+    self.reported_component_status_lock = threading.RLock()
     threading.Thread.__init__(self)
 
   def run(self):
@@ -178,7 +179,7 @@ class ComponentStatusExecutor(threading.Thread):
       'clusterId': cluster_id,
     }
 
-    if status != 
self.reported_component_status[cluster_id][component_name][command_name]:
+    if status != 
self.reported_component_status[cluster_id]["{0}/{1}".format(service_name, 
component_name)][command_name]:
       logging.info("Status for {0} has changed to {1}".format(component_name, 
status))
       self.recovery_manager.handle_status_change(component_name, status)
 
@@ -191,6 +192,29 @@ class ComponentStatusExecutor(threading.Thread):
       return result
     return None
 
+  def force_send_component_statuses(self):
+    """
+    Forcefully resends all component statuses which are currently in cache.
+    """
+    cluster_reports = defaultdict(lambda:[])
+
+    with self.reported_component_status_lock:
+      for cluster_id, component_to_command_dict in 
self.reported_component_status.iteritems():
+        for service_and_component_name, commands_status in 
component_to_command_dict.iteritems():
+          service_name, component_name = service_and_component_name.split("/")
+          for command_name, status in commands_status.iteritems():
+            report = {
+              'serviceName': service_name,
+              'componentName': component_name,
+              'command': command_name,
+              'status': status,
+              'clusterId': cluster_id,
+            }
+
+            cluster_reports[cluster_id].append(report)
+
+    self.send_updates_to_server(cluster_reports)
+
   def send_updates_to_server(self, cluster_reports):
     if not cluster_reports or not self.initializer_module.is_registered:
       return
@@ -199,18 +223,21 @@ class ComponentStatusExecutor(threading.Thread):
     
self.server_responses_listener.listener_functions_on_success[correlation_id] = 
lambda headers, message: self.save_reported_component_status(cluster_reports)
 
   def save_reported_component_status(self, cluster_reports):
-    for cluster_id, reports in cluster_reports.iteritems():
-      for report in reports:
-        component_name = report['componentName']
-        command = report['command']
-        status = report['status']
+    with self.reported_component_status_lock:
+      for cluster_id, reports in cluster_reports.iteritems():
+        for report in reports:
+          component_name = report['componentName']
+          service_name = report['serviceName']
+          command = report['command']
+          status = report['status']
 
-        self.reported_component_status[cluster_id][component_name][command] = 
status
+          
self.reported_component_status[cluster_id]["{0}/{1}".format(service_name, 
component_name)][command] = status
 
   def clean_not_existing_clusters_info(self):
     """
     This needs to be done to remove information about clusters which where 
deleted (e.g. ambari-server reset)
     """
-    for cluster_id in self.reported_component_status.keys():
-      if cluster_id not in self.topology_cache.get_cluster_ids():
-        del self.reported_component_status[cluster_id]
+    with self.reported_component_status_lock:
+      for cluster_id in self.reported_component_status.keys():
+        if cluster_id not in self.topology_cache.get_cluster_ids():
+          del self.reported_component_status[cluster_id]
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index ded5edd..9210e79 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -67,6 +67,7 @@ class HeartbeatThread(threading.Thread):
     self.host_level_params_events_listener = 
HostLevelParamsEventListener(initializer_module)
     self.alert_definitions_events_listener = 
AlertDefinitionsEventListener(initializer_module)
     self.agent_actions_events_listener = 
AgentActionsListener(initializer_module)
+    self.component_status_executor = 
initializer_module.component_status_executor
     self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener, self.alert_definitions_events_listener, 
self.agent_actions_events_listener]
 
     self.post_registration_requests = [
@@ -146,7 +147,6 @@ class HeartbeatThread(threading.Thread):
           raise
       finally:
         with listener.event_queue_lock:
-          logger.info("Enabling events for listener {0}".format(listener))
           listener.enabled = True
           # Process queued messages if any
           listener.dequeue_unprocessed_events()
@@ -160,6 +160,7 @@ class HeartbeatThread(threading.Thread):
     self.initializer_module._connection = self.connection
 
     self.report_components_initial_versions()
+    self.force_component_status_update()
 
   def run_post_registration_actions(self):
     for post_registration_action in self.post_registration_actions:
@@ -168,6 +169,9 @@ class HeartbeatThread(threading.Thread):
   def report_components_initial_versions(self):
     ComponentVersionReporter(self.initializer_module).start()
 
+  def force_component_status_update(self):
+    self.component_status_executor.force_send_component_statuses()
+
   def unregister(self):
     """
     Disconnect and remove connection object from initializer_module so other 
threads cannot use it
diff --git 
a/ambari-agent/src/test/python/resource_management/TestLinkResource.py 
b/ambari-agent/src/test/python/resource_management/TestLinkResource.py
index 221bf6b..6a1377b 100644
--- a/ambari-agent/src/test/python/resource_management/TestLinkResource.py
+++ b/ambari-agent/src/test/python/resource_management/TestLinkResource.py
@@ -34,7 +34,7 @@ class TestLinkResource(TestCase):
 
   @patch.object(os.path, "realpath")
   @patch("resource_management.core.sudo.path_lexists")
-  @patch("resource_management.core.sudo.path_lexists")
+  @patch("resource_management.core.sudo.path_islink")
   @patch("resource_management.core.sudo.unlink")
   @patch("resource_management.core.sudo.symlink")
   def test_action_create_relink(self, symlink_mock, unlink_mock, 

Reply via email to