This is an automated email from the ASF dual-hosted git repository. aonishuk pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/ambari.git
commit d08278dde2e5fc6bc779fe8e9ecbb32c94284bd2 Author: Andrew Onishuk <aonis...@hortonworks.com> AuthorDate: Thu Jun 7 11:28:19 2018 +0300 AMBARI-24049. Ambari-agent should report when message is received (aonishuk) --- .../python/ambari_agent/ClusterTopologyCache.py | 10 +++--- .../src/main/python/ambari_agent/Constants.py | 2 ++ .../main/python/ambari_agent/HeartbeatThread.py | 14 ++++---- .../main/python/ambari_agent/InitializerModule.py | 2 +- ambari-agent/src/main/python/ambari_agent/Utils.py | 5 +++ .../ambari_agent/listeners/AgentActionsListener.py | 2 +- .../listeners/AlertDefinitionsEventListener.py | 7 ++-- .../listeners/CommandsEventListener.py | 5 +-- .../listeners/ConfigurationEventListener.py | 9 ++--- .../listeners/HostLevelParamsEventListener.py | 7 ++-- .../listeners/MetadataEventListener.py | 7 ++-- .../listeners/ServerResponsesListener.py | 3 +- .../listeners/TopologyEventListener.py | 5 +-- .../main/python/ambari_agent/listeners/__init__.py | 42 ++++++++++++++++++++-- .../src/main/python/ambari_ws4py/websocket.py | 6 ++-- 15 files changed, 92 insertions(+), 34 deletions(-) diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py index f23c7f7..830f202 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py @@ -79,10 +79,12 @@ class ClusterTopologyCache(ClusterCache): continue current_host_id = self.current_host_ids_to_cluster[cluster_id] - for component_dict in self[cluster_id].components: - if 'hostIds' in component_dict and current_host_id in component_dict.hostIds: - if current_host_id in component_dict.hostIds: - self.cluster_local_components[cluster_id].append(component_dict.componentName) + + if 'components' in self[cluster_id]: + for component_dict in self[cluster_id].components: + if 'hostIds' in component_dict and current_host_id in component_dict.hostIds: + if current_host_id in component_dict.hostIds: + self.cluster_local_components[cluster_id].append(component_dict.componentName) self.hosts_to_id = ImmutableDictionary(hosts_to_id) diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 367e8c4..09381eb 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -31,6 +31,7 @@ AGENT_ACTIONS_TOPIC = '/user/agent_actions' PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, AGENT_ACTIONS_TOPIC] POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC] +AGENT_RESPONSES_TOPIC = '/agents/responses' TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies' METADATA_REQUEST_ENDPOINT = '/agents/metadata' CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs' @@ -45,3 +46,4 @@ HEARTBEAT_ENDPOINT = '/heartbeat' REGISTRATION_ENDPOINT = '/register' CORRELATION_ID_STRING = 'correlationId' +MESSAGE_ID = 'messageId' diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index 179cf6b..49e0ce5 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -59,12 +59,12 @@ class HeartbeatThread(threading.Thread): # listeners self.server_responses_listener = initializer_module.server_responses_listener - self.commands_events_listener = CommandsEventListener(initializer_module.action_queue) - self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache, initializer_module.config) - self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) - self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) - self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module.host_level_params_cache, initializer_module.recovery_manager) - self.alert_definitions_events_listener = AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, initializer_module.alert_scheduler_handler) + self.commands_events_listener = CommandsEventListener(initializer_module) + self.metadata_events_listener = MetadataEventListener(initializer_module) + self.topology_events_listener = TopologyEventListener(initializer_module) + self.configuration_events_listener = ConfigurationEventListener(initializer_module) + self.host_level_params_events_listener = HostLevelParamsEventListener(initializer_module) + self.alert_definitions_events_listener = AlertDefinitionsEventListener(initializer_module) self.agent_actions_events_listener = AgentActionsListener(initializer_module) self.listeners = [self.server_responses_listener, self.commands_events_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener, self.host_level_params_events_listener, self.alert_definitions_events_listener, self.agent_actions_events_listener] @@ -125,6 +125,7 @@ class HeartbeatThread(threading.Thread): logger.info("Sending registration request") logger.debug("Registration request is {0}".format(registration_request)) + self.server_responses_listener.connection = self.connection response = self.blocking_request(registration_request, Constants.REGISTRATION_ENDPOINT) logger.info("Registration response received") @@ -133,6 +134,7 @@ class HeartbeatThread(threading.Thread): self.handle_registration_response(response) for endpoint, cache, listener, subscribe_to in self.post_registration_requests: + listener.connection = self.connection # should not hang forever on these requests response = self.blocking_request({'hash': cache.hash}, endpoint, log_handler=listener.get_log_message) try: diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py index 787df16..b15ad9b 100644 --- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -89,7 +89,7 @@ class InitializerModule: self.alert_definitions_cache = ClusterAlertDefinitionsCache(self.config.cluster_cache_dir) self.configuration_builder = ConfigurationBuilder(self) self.stale_alerts_monitor = StaleAlertsMonitor(self) - self.server_responses_listener = ServerResponsesListener() + self.server_responses_listener = ServerResponsesListener(self) self.file_cache = FileCache(self.config) self.customServiceOrchestrator = CustomServiceOrchestrator(self) self.recovery_manager = RecoveryManager() diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index 774e957..33851f3 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -21,6 +21,7 @@ import os import time import threading import collections +import traceback from functools import wraps from ambari_agent.ExitHelper import ExitHelper @@ -161,6 +162,10 @@ class Utils(object): t = threading.Timer( graceful_stop_timeout, ExitHelper().exit, [AGENT_AUTO_RESTART_EXIT_CODE]) t.start() + @staticmethod + def get_traceback_as_text(ex): + return ''.join(traceback.format_exception(etype=type(ex), value=ex, tb=ex.__traceback__)) + class ImmutableDictionary(dict): def __init__(self, dictionary): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py index 07cf8ed..2d3cb51 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py @@ -35,7 +35,7 @@ class AgentActionsListener(EventListener): RESTART_AGENT_ACTION = 'RESTART_AGENT' def __init__(self, initializer_module): - self.initializer_module = initializer_module + super(AgentActionsListener, self).__init__(initializer_module) self.stop_event = initializer_module.stop_event def on_event(self, headers, message): diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py index 4ef9982..0aa65ef 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py @@ -30,9 +30,10 @@ class AlertDefinitionsEventListener(EventListener): """ Listener of Constants.ALERTS_DEFINITIONS_TOPIC events from server. """ - def __init__(self, alert_definitions_cache, alert_scheduler_handler): - self.alert_definitions_cache = alert_definitions_cache - self.alert_scheduler_handler = alert_scheduler_handler + def __init__(self, initializer_module): + super(AlertDefinitionsEventListener, self).__init__(initializer_module) + self.alert_definitions_cache = initializer_module.alert_definitions_cache + self.alert_scheduler_handler = initializer_module.alert_scheduler_handler def on_event(self, headers, message): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py index 49ba20f..b25ec69 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py @@ -30,8 +30,9 @@ class CommandsEventListener(EventListener): """ Listener of Constants.CONFIGURATIONS_TOPIC events from server. """ - def __init__(self, action_queue): - self.action_queue = action_queue + def __init__(self, initializer_module): + super(CommandsEventListener, self).__init__(initializer_module) + self.action_queue = initializer_module.action_queue def on_event(self, headers, message): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py index 880ffd3..9887a8a 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py @@ -30,8 +30,9 @@ class ConfigurationEventListener(EventListener): """ Listener of Constants.CONFIGURATIONS_TOPIC events from server. """ - def __init__(self, configuration_cache): - self.configuration_cache = configuration_cache + def __init__(self, initializer_module): + super(ConfigurationEventListener, self).__init__(initializer_module) + self.configurations_cache = initializer_module.configurations_cache def on_event(self, headers, message): """ @@ -40,13 +41,13 @@ class ConfigurationEventListener(EventListener): @param headers: headers dictionary @param message: message payload dictionary """ - self.configuration_cache.timestamp = message.pop('timestamp') + self.configurations_cache.timestamp = message.pop('timestamp') # this kind of response is received if hash was identical. And server does not need to change anything if message == {}: return - self.configuration_cache.rewrite_cache(message['clusters'], message['hash']) + self.configurations_cache.rewrite_cache(message['clusters'], message['hash']) def get_handled_path(self): return Constants.CONFIGURATIONS_TOPIC diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py index de5cd03..d4b4088 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py @@ -30,9 +30,10 @@ class HostLevelParamsEventListener(EventListener): """ Listener of Constants.HOST_LEVEL_PARAMS_TOPIC events from server. """ - def __init__(self, host_level_params_cache, recovery_manager): - self.host_level_params_cache = host_level_params_cache - self.recovery_manager = recovery_manager + def __init__(self, initializer_module): + super(HostLevelParamsEventListener, self).__init__(initializer_module) + self.host_level_params_cache = initializer_module.host_level_params_cache + self.recovery_manager = initializer_module.recovery_manager def on_event(self, headers, message): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py index d80e40b..83f4b23 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py @@ -32,9 +32,10 @@ class MetadataEventListener(EventListener): """ Listener of Constants.METADATA_TOPIC events from server. """ - def __init__(self, metadata_cache, config): - self.metadata_cache = metadata_cache - self.config = config + def __init__(self, initializer_module): + super(MetadataEventListener, self).__init__(initializer_module) + self.metadata_cache = initializer_module.metadata_cache + self.config = initializer_module.config def on_event(self, headers, message): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py index 5320ab7..02d60a5 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py @@ -34,7 +34,8 @@ class ServerResponsesListener(EventListener): RESPONSE_STATUS_STRING = 'status' RESPONSE_STATUS_SUCCESS = 'OK' - def __init__(self): + def __init__(self, initializer_module): + super(ServerResponsesListener, self).__init__(initializer_module) self.reset_responses() def on_event(self, headers, message): diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py index df4ea03..b695d8a 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py @@ -30,8 +30,9 @@ class TopologyEventListener(EventListener): """ Listener of Constants.TOPOLOGIES_TOPIC events from server. """ - def __init__(self, topology_cache): - self.topology_cache = topology_cache + def __init__(self, initializer_module): + super(TopologyEventListener, self).__init__(initializer_module) + self.topology_cache = initializer_module.topology_cache def on_event(self, headers, message): """ diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py index 1134cb9..7e66197 100644 --- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py +++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py @@ -20,7 +20,11 @@ limitations under the License. import ambari_simplejson as json import ambari_stomp import logging +import traceback import copy +from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed +from ambari_agent import Constants +from ambari_agent.Utils import Utils logger = logging.getLogger(__name__) @@ -28,6 +32,9 @@ class EventListener(ambari_stomp.ConnectionListener): """ Base abstract class for event listeners on specific topics. """ + def __init__(self, initializer_module): + self.initializer_module = initializer_module + def on_message(self, headers, message): """ This method is triggered by stomp when message from serve is received. @@ -42,15 +49,46 @@ class EventListener(ambari_stomp.ConnectionListener): if destination.rstrip('/') == self.get_handled_path().rstrip('/'): try: message_json = json.loads(message) - except ValueError: + except ValueError as ex: logger.exception("Received from server event is not a valid message json. Message is:\n{0}".format(message)) + self.report_status_to_sender(headers, message, ex) return logger.info("Event from server at {0}{1}".format(destination, self.get_log_message(headers, copy.deepcopy(message_json)))) try: self.on_event(headers, message_json) - except: + except Exception as ex: logger.exception("Exception while handing event from {0} {1}".format(destination, headers, message)) + self.report_status_to_sender(headers, message, ex) + else: + self.report_status_to_sender(headers, message) + + def report_status_to_sender(self, headers, message, ex=None): + """ + Reports the status of delivery of the message to a sender + + @param headers: headers dictionary + @param message: message payload dictionary + @params ex: optional exception object for errors + """ + if not Constants.MESSAGE_ID in headers: + return + + if ex: + confirmation_of_received = {Constants.MESSAGE_ID:headers[Constants.MESSAGE_ID], 'status':'ERROR', 'reason':Utils.get_traceback_as_text(ex)} + else: + confirmation_of_received = {Constants.MESSAGE_ID:headers[Constants.MESSAGE_ID], 'status':'OK'} + + try: + connection = self.initializer_module.connection + except ConnectionIsAlreadyClosed: + # access early copy of connection before it is exposed globally + connection = self.initializer_module.heartbeat_thread.connection + + try: + connection.send(message=confirmation_of_received, destination=Constants.AGENT_RESPONSES_TOPIC) + except: + logger.exception("Could not send a confirmation '{0}' to server".format(confirmation_of_received)) def on_event(self, headers, message): """ diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py b/ambari-common/src/main/python/ambari_ws4py/websocket.py index 936f333..78d233e 100644 --- a/ambari-common/src/main/python/ambari_ws4py/websocket.py +++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py @@ -483,7 +483,7 @@ class WebSocket(object): self.reading_buffer_size = s.parser.send(bytes) or DEFAULT_READING_SIZE if s.closing is not None: - logger.debug("Closing message received (%d) '%s'" % (s.closing.code, s.closing.reason)) + logger.info("Closing message received (%d) '%s'" % (s.closing.code, s.closing.reason)) if not self.server_terminated: self.close(s.closing.code, s.closing.reason) else: @@ -492,7 +492,7 @@ class WebSocket(object): if s.errors: for error in s.errors: - logger.debug("Error message received (%d) '%s'" % (error.code, error.reason)) + logger.warn("Error message received (%d) '%s'" % (error.code, error.reason)) self.close(error.code, error.reason) s.errors = [] return False @@ -548,6 +548,8 @@ class WebSocket(object): while not self.terminated: if not self.once(): break + except: + logger.exception("Websocket connection was closed with an exception") finally: self.terminate() -- To stop receiving notification emails like this one, please contact aonis...@apache.org.