This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit 1aae0de04bb29b9e14603722be18fe8393c4fbd8 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Wed Sep 26 11:26:44 2018 +0300 AMBARI-24689. All component statuses should be re-send on registration (aonishuk) --- .../python/ambari_agent/ComponentStatusExecutor.py | 47 +++++++++++++++++----- .../main/python/ambari_agent/HeartbeatThread.py | 6 ++- .../python/resource_management/TestLinkResource.py | 2 +- 3 files changed, 43 insertions(+), 12 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py index df72c88..7bf00df 100644 --- a/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py +++ b/ambari-agent/src/main/python/ambari_agent/ComponentStatusExecutor.py @@ -43,6 +43,7 @@ class ComponentStatusExecutor(threading.Thread): self.logger = logging.getLogger(__name__) self.reports_to_discard = [] self.reports_to_discard_lock = threading.RLock() + self.reported_component_status_lock = threading.RLock() threading.Thread.__init__(self) def run(self): @@ -178,7 +179,7 @@ class ComponentStatusExecutor(threading.Thread): 'clusterId': cluster_id, } - if status != self.reported_component_status[cluster_id][component_name][command_name]: + if status != self.reported_component_status[cluster_id]["{0}/{1}".format(service_name, component_name)][command_name]: logging.info("Status for {0} has changed to {1}".format(component_name, status)) self.recovery_manager.handle_status_change(component_name, status) @@ -191,6 +192,29 @@ class ComponentStatusExecutor(threading.Thread): return result return None + def force_send_component_statuses(self): + """ + Forcefully resends all component statuses which are currently in cache. + """ + cluster_reports = defaultdict(lambda:[]) + + with self.reported_component_status_lock: + for cluster_id, component_to_command_dict in self.reported_component_status.iteritems(): + for service_and_component_name, commands_status in component_to_command_dict.iteritems(): + service_name, component_name = service_and_component_name.split("/") + for command_name, status in commands_status.iteritems(): + report = { + 'serviceName': service_name, + 'componentName': component_name, + 'command': command_name, + 'status': status, + 'clusterId': cluster_id, + } + + cluster_reports[cluster_id].append(report) + + self.send_updates_to_server(cluster_reports) + def send_updates_to_server(self, cluster_reports): if not cluster_reports or not self.initializer_module.is_registered: return @@ -199,18 +223,21 @@ class ComponentStatusExecutor(threading.Thread): self.server_responses_listener.listener_functions_on_success[correlation_id] = lambda headers, message: self.save_reported_component_status(cluster_reports) def save_reported_component_status(self, cluster_reports): - for cluster_id, reports in cluster_reports.iteritems(): - for report in reports: - component_name = report['componentName'] - command = report['command'] - status = report['status'] + with self.reported_component_status_lock: + for cluster_id, reports in cluster_reports.iteritems(): + for report in reports: + component_name = report['componentName'] + service_name = report['serviceName'] + command = report['command'] + status = report['status'] - self.reported_component_status[cluster_id][component_name][command] = status + self.reported_component_status[cluster_id]["{0}/{1}".format(service_name, component_name)][command] = status def clean_not_existing_clusters_info(self): """ This needs to be done to remove information about clusters which where deleted (e.g. ambari-server reset) """ - for cluster_id in self.reported_component_status.keys(): - if cluster_id not in self.topology_cache.get_cluster_ids(): - del self.reported_component_status[cluster_id] + with self.reported_component_status_lock: + for cluster_id in self.reported_component_status.keys(): + if cluster_id not in self.topology_cache.get_cluster_ids(): + del self.reported_component_status[cluster_id] diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index ded5edd..9210e79 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -67,6 +67,7 @@ class HeartbeatThread(threading.Thread): self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module) self.alert_definitions_events_listener = AlertDefinitionsEventListener(initializer_module) self.agent_actions_events_listener = AgentActionsListener(initializer_module) + self.component_status_executor = initializer_module.component_status_executor self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener, self.alert_definitions_events_listener, self.agent_actions_events_listener] self.post_registration_requests = [ @@ -146,7 +147,6 @@ class HeartbeatThread(threading.Thread): raise finally: with listener.event_queue_lock: - logger.info("Enabling events for listener {0}".format(listener)) listener.enabled = True # Process queued messages if any listener.dequeue_unprocessed_events() @@ -160,6 +160,7 @@ class HeartbeatThread(threading.Thread): self.initializer_module._connection = self.connection self.report_components_initial_versions() + self.force_component_status_update() def run_post_registration_actions(self): for post_registration_action in self.post_registration_actions: @@ -168,6 +169,9 @@ class HeartbeatThread(threading.Thread): def report_components_initial_versions(self): ComponentVersionReporter(self.initializer_module).start() + def force_component_status_update(self): + self.component_status_executor.force_send_component_statuses() + def unregister(self): """ Disconnect and remove connection object from initializer_module so other threads cannot use it diff --git a/ambari-agent/src/test/python/resource_management/TestLinkResource.py b/ambari-agent/src/test/python/resource_management/TestLinkResource.py index 221bf6b..6a1377b 100644 --- a/ambari-agent/src/test/python/resource_management/TestLinkResource.py +++ b/ambari-agent/src/test/python/resource_management/TestLinkResource.py @@ -34,7 +34,7 @@ class TestLinkResource(TestCase): @patch.object(os.path, "realpath") @patch("resource_management.core.sudo.path_lexists") - @patch("resource_management.core.sudo.path_lexists") + @patch("resource_management.core.sudo.path_islink") @patch("resource_management.core.sudo.unlink") @patch("resource_management.core.sudo.symlink") def test_action_create_relink(self, symlink_mock, unlink_mock,