This is an automated email from the ASF dual-hosted git repository.

aonishuk pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/ambari.git

commit d08278dde2e5fc6bc779fe8e9ecbb32c94284bd2
Author: Andrew Onishuk <aonis...@hortonworks.com>
AuthorDate: Thu Jun 7 11:28:19 2018 +0300

    AMBARI-24049. Ambari-agent should report when message is received (aonishuk)
---
 .../python/ambari_agent/ClusterTopologyCache.py    | 10 +++---
 .../src/main/python/ambari_agent/Constants.py      |  2 ++
 .../main/python/ambari_agent/HeartbeatThread.py    | 14 ++++----
 .../main/python/ambari_agent/InitializerModule.py  |  2 +-
 ambari-agent/src/main/python/ambari_agent/Utils.py |  5 +++
 .../ambari_agent/listeners/AgentActionsListener.py |  2 +-
 .../listeners/AlertDefinitionsEventListener.py     |  7 ++--
 .../listeners/CommandsEventListener.py             |  5 +--
 .../listeners/ConfigurationEventListener.py        |  9 ++---
 .../listeners/HostLevelParamsEventListener.py      |  7 ++--
 .../listeners/MetadataEventListener.py             |  7 ++--
 .../listeners/ServerResponsesListener.py           |  3 +-
 .../listeners/TopologyEventListener.py             |  5 +--
 .../main/python/ambari_agent/listeners/__init__.py | 42 ++++++++++++++++++++--
 .../src/main/python/ambari_ws4py/websocket.py      |  6 ++--
 15 files changed, 92 insertions(+), 34 deletions(-)

diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py 
b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
index f23c7f7..830f202 100644
--- a/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
+++ b/ambari-agent/src/main/python/ambari_agent/ClusterTopologyCache.py
@@ -79,10 +79,12 @@ class ClusterTopologyCache(ClusterCache):
         continue
 
       current_host_id = self.current_host_ids_to_cluster[cluster_id]
-      for component_dict in self[cluster_id].components:
-        if 'hostIds' in component_dict and current_host_id in 
component_dict.hostIds:
-          if current_host_id in component_dict.hostIds:
-            
self.cluster_local_components[cluster_id].append(component_dict.componentName)
+
+      if 'components' in self[cluster_id]:
+        for component_dict in self[cluster_id].components:
+          if 'hostIds' in component_dict and current_host_id in 
component_dict.hostIds:
+            if current_host_id in component_dict.hostIds:
+              
self.cluster_local_components[cluster_id].append(component_dict.componentName)
 
 
     self.hosts_to_id = ImmutableDictionary(hosts_to_id)
diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py 
b/ambari-agent/src/main/python/ambari_agent/Constants.py
index 367e8c4..09381eb 100644
--- a/ambari-agent/src/main/python/ambari_agent/Constants.py
+++ b/ambari-agent/src/main/python/ambari_agent/Constants.py
@@ -31,6 +31,7 @@ AGENT_ACTIONS_TOPIC = '/user/agent_actions'
 PRE_REGISTRATION_TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, 
AGENT_ACTIONS_TOPIC]
 POST_REGISTRATION_TOPICS_TO_SUBSCRIBE = [COMMANDS_TOPIC]
 
+AGENT_RESPONSES_TOPIC = '/agents/responses'
 TOPOLOGY_REQUEST_ENDPOINT = '/agents/topologies'
 METADATA_REQUEST_ENDPOINT = '/agents/metadata'
 CONFIGURATIONS_REQUEST_ENDPOINT = '/agents/configs'
@@ -45,3 +46,4 @@ HEARTBEAT_ENDPOINT = '/heartbeat'
 REGISTRATION_ENDPOINT = '/register'
 
 CORRELATION_ID_STRING = 'correlationId'
+MESSAGE_ID = 'messageId'
diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py 
b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
index 179cf6b..49e0ce5 100644
--- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
+++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py
@@ -59,12 +59,12 @@ class HeartbeatThread(threading.Thread):
 
     # listeners
     self.server_responses_listener = 
initializer_module.server_responses_listener
-    self.commands_events_listener = 
CommandsEventListener(initializer_module.action_queue)
-    self.metadata_events_listener = 
MetadataEventListener(initializer_module.metadata_cache, 
initializer_module.config)
-    self.topology_events_listener = 
TopologyEventListener(initializer_module.topology_cache)
-    self.configuration_events_listener = 
ConfigurationEventListener(initializer_module.configurations_cache)
-    self.host_level_params_events_listener = 
HostLevelParamsEventListener(initializer_module.host_level_params_cache, 
initializer_module.recovery_manager)
-    self.alert_definitions_events_listener = 
AlertDefinitionsEventListener(initializer_module.alert_definitions_cache, 
initializer_module.alert_scheduler_handler)
+    self.commands_events_listener = CommandsEventListener(initializer_module)
+    self.metadata_events_listener = MetadataEventListener(initializer_module)
+    self.topology_events_listener = TopologyEventListener(initializer_module)
+    self.configuration_events_listener = 
ConfigurationEventListener(initializer_module)
+    self.host_level_params_events_listener = 
HostLevelParamsEventListener(initializer_module)
+    self.alert_definitions_events_listener = 
AlertDefinitionsEventListener(initializer_module)
     self.agent_actions_events_listener = 
AgentActionsListener(initializer_module)
     self.listeners = [self.server_responses_listener, 
self.commands_events_listener, self.metadata_events_listener, 
self.topology_events_listener, self.configuration_events_listener, 
self.host_level_params_events_listener, self.alert_definitions_events_listener, 
self.agent_actions_events_listener]
 
@@ -125,6 +125,7 @@ class HeartbeatThread(threading.Thread):
     logger.info("Sending registration request")
     logger.debug("Registration request is {0}".format(registration_request))
 
+    self.server_responses_listener.connection = self.connection
     response = self.blocking_request(registration_request, 
Constants.REGISTRATION_ENDPOINT)
 
     logger.info("Registration response received")
@@ -133,6 +134,7 @@ class HeartbeatThread(threading.Thread):
     self.handle_registration_response(response)
 
     for endpoint, cache, listener, subscribe_to in 
self.post_registration_requests:
+      listener.connection = self.connection
       # should not hang forever on these requests
       response = self.blocking_request({'hash': cache.hash}, endpoint, 
log_handler=listener.get_log_message)
       try:
diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py 
b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
index 787df16..b15ad9b 100644
--- a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
+++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py
@@ -89,7 +89,7 @@ class InitializerModule:
     self.alert_definitions_cache = 
ClusterAlertDefinitionsCache(self.config.cluster_cache_dir)
     self.configuration_builder = ConfigurationBuilder(self)
     self.stale_alerts_monitor = StaleAlertsMonitor(self)
-    self.server_responses_listener = ServerResponsesListener()
+    self.server_responses_listener = ServerResponsesListener(self)
     self.file_cache = FileCache(self.config)
     self.customServiceOrchestrator = CustomServiceOrchestrator(self)
     self.recovery_manager = RecoveryManager()
diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py 
b/ambari-agent/src/main/python/ambari_agent/Utils.py
index 774e957..33851f3 100644
--- a/ambari-agent/src/main/python/ambari_agent/Utils.py
+++ b/ambari-agent/src/main/python/ambari_agent/Utils.py
@@ -21,6 +21,7 @@ import os
 import time
 import threading
 import collections
+import traceback
 from functools import wraps
 from ambari_agent.ExitHelper import ExitHelper
 
@@ -161,6 +162,10 @@ class Utils(object):
     t = threading.Timer( graceful_stop_timeout, ExitHelper().exit, 
[AGENT_AUTO_RESTART_EXIT_CODE])
     t.start()
 
+  @staticmethod
+  def get_traceback_as_text(ex):
+    return ''.join(traceback.format_exception(etype=type(ex), value=ex, 
tb=ex.__traceback__))
+
 class ImmutableDictionary(dict):
   def __init__(self, dictionary):
     """
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py
index 07cf8ed..2d3cb51 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/AgentActionsListener.py
@@ -35,7 +35,7 @@ class AgentActionsListener(EventListener):
   RESTART_AGENT_ACTION = 'RESTART_AGENT'
   
   def __init__(self, initializer_module):
-    self.initializer_module = initializer_module
+    super(AgentActionsListener, self).__init__(initializer_module)
     self.stop_event = initializer_module.stop_event
 
   def on_event(self, headers, message):
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
index 4ef9982..0aa65ef 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/AlertDefinitionsEventListener.py
@@ -30,9 +30,10 @@ class AlertDefinitionsEventListener(EventListener):
   """
   Listener of Constants.ALERTS_DEFINITIONS_TOPIC events from server.
   """
-  def __init__(self, alert_definitions_cache, alert_scheduler_handler):
-    self.alert_definitions_cache = alert_definitions_cache
-    self.alert_scheduler_handler = alert_scheduler_handler
+  def __init__(self, initializer_module):
+    super(AlertDefinitionsEventListener, self).__init__(initializer_module)
+    self.alert_definitions_cache = initializer_module.alert_definitions_cache
+    self.alert_scheduler_handler = initializer_module.alert_scheduler_handler
 
   def on_event(self, headers, message):
     """
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
index 49ba20f..b25ec69 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/CommandsEventListener.py
@@ -30,8 +30,9 @@ class CommandsEventListener(EventListener):
   """
   Listener of Constants.CONFIGURATIONS_TOPIC events from server.
   """
-  def __init__(self, action_queue):
-    self.action_queue = action_queue
+  def __init__(self, initializer_module):
+    super(CommandsEventListener, self).__init__(initializer_module)
+    self.action_queue = initializer_module.action_queue
 
   def on_event(self, headers, message):
     """
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
index 880ffd3..9887a8a 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/ConfigurationEventListener.py
@@ -30,8 +30,9 @@ class ConfigurationEventListener(EventListener):
   """
   Listener of Constants.CONFIGURATIONS_TOPIC events from server.
   """
-  def __init__(self, configuration_cache):
-    self.configuration_cache = configuration_cache
+  def __init__(self, initializer_module):
+    super(ConfigurationEventListener, self).__init__(initializer_module)
+    self.configurations_cache = initializer_module.configurations_cache
 
   def on_event(self, headers, message):
     """
@@ -40,13 +41,13 @@ class ConfigurationEventListener(EventListener):
     @param headers: headers dictionary
     @param message: message payload dictionary
     """
-    self.configuration_cache.timestamp = message.pop('timestamp')
+    self.configurations_cache.timestamp = message.pop('timestamp')
 
     # this kind of response is received if hash was identical. And server does 
not need to change anything
     if message == {}:
       return
 
-    self.configuration_cache.rewrite_cache(message['clusters'], 
message['hash'])
+    self.configurations_cache.rewrite_cache(message['clusters'], 
message['hash'])
 
   def get_handled_path(self):
     return Constants.CONFIGURATIONS_TOPIC
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
index de5cd03..d4b4088 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/HostLevelParamsEventListener.py
@@ -30,9 +30,10 @@ class HostLevelParamsEventListener(EventListener):
   """
   Listener of Constants.HOST_LEVEL_PARAMS_TOPIC events from server.
   """
-  def __init__(self, host_level_params_cache, recovery_manager):
-    self.host_level_params_cache = host_level_params_cache
-    self.recovery_manager = recovery_manager
+  def __init__(self, initializer_module):
+    super(HostLevelParamsEventListener, self).__init__(initializer_module)
+    self.host_level_params_cache = initializer_module.host_level_params_cache
+    self.recovery_manager = initializer_module.recovery_manager
 
   def on_event(self, headers, message):
     """
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
index d80e40b..83f4b23 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/MetadataEventListener.py
@@ -32,9 +32,10 @@ class MetadataEventListener(EventListener):
   """
   Listener of Constants.METADATA_TOPIC events from server.
   """
-  def __init__(self, metadata_cache, config):
-    self.metadata_cache = metadata_cache
-    self.config = config
+  def __init__(self, initializer_module):
+    super(MetadataEventListener, self).__init__(initializer_module)
+    self.metadata_cache = initializer_module.metadata_cache
+    self.config = initializer_module.config
 
   def on_event(self, headers, message):
     """
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
 
b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
index 5320ab7..02d60a5 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/ServerResponsesListener.py
@@ -34,7 +34,8 @@ class ServerResponsesListener(EventListener):
   RESPONSE_STATUS_STRING = 'status'
   RESPONSE_STATUS_SUCCESS = 'OK'
 
-  def __init__(self):
+  def __init__(self, initializer_module):
+    super(ServerResponsesListener, self).__init__(initializer_module)
     self.reset_responses()
 
   def on_event(self, headers, message):
diff --git 
a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
index df4ea03..b695d8a 100644
--- 
a/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
+++ 
b/ambari-agent/src/main/python/ambari_agent/listeners/TopologyEventListener.py
@@ -30,8 +30,9 @@ class TopologyEventListener(EventListener):
   """
   Listener of Constants.TOPOLOGIES_TOPIC events from server.
   """
-  def __init__(self, topology_cache):
-    self.topology_cache = topology_cache
+  def __init__(self, initializer_module):
+    super(TopologyEventListener, self).__init__(initializer_module)
+    self.topology_cache = initializer_module.topology_cache
 
   def on_event(self, headers, message):
     """
diff --git a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py 
b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
index 1134cb9..7e66197 100644
--- a/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
+++ b/ambari-agent/src/main/python/ambari_agent/listeners/__init__.py
@@ -20,7 +20,11 @@ limitations under the License.
 import ambari_simplejson as json
 import ambari_stomp
 import logging
+import traceback
 import copy
+from ambari_stomp.adapter.websocket import ConnectionIsAlreadyClosed
+from ambari_agent import Constants
+from ambari_agent.Utils import Utils
 
 logger = logging.getLogger(__name__)
 
@@ -28,6 +32,9 @@ class EventListener(ambari_stomp.ConnectionListener):
   """
   Base abstract class for event listeners on specific topics.
   """
+  def __init__(self, initializer_module):
+    self.initializer_module = initializer_module
+
   def on_message(self, headers, message):
     """
     This method is triggered by stomp when message from serve is received.
@@ -42,15 +49,46 @@ class EventListener(ambari_stomp.ConnectionListener):
     if destination.rstrip('/') == self.get_handled_path().rstrip('/'):
       try:
         message_json = json.loads(message)
-      except ValueError:
+      except ValueError as ex:
         logger.exception("Received from server event is not a valid message 
json. Message is:\n{0}".format(message))
+        self.report_status_to_sender(headers, message, ex)
         return
 
       logger.info("Event from server at {0}{1}".format(destination, 
self.get_log_message(headers, copy.deepcopy(message_json))))
       try:
         self.on_event(headers, message_json)
-      except:
+      except Exception as ex:
         logger.exception("Exception while handing event from {0} 
{1}".format(destination, headers, message))
+        self.report_status_to_sender(headers, message, ex)
+      else:
+        self.report_status_to_sender(headers, message)
+
+  def report_status_to_sender(self, headers, message, ex=None):
+    """
+    Reports the status of delivery of the message to a sender
+
+    @param headers: headers dictionary
+    @param message: message payload dictionary
+    @params ex: optional exception object for errors
+    """
+    if not Constants.MESSAGE_ID in headers:
+      return
+
+    if ex:
+      confirmation_of_received = 
{Constants.MESSAGE_ID:headers[Constants.MESSAGE_ID], 'status':'ERROR', 
'reason':Utils.get_traceback_as_text(ex)}
+    else:
+      confirmation_of_received = 
{Constants.MESSAGE_ID:headers[Constants.MESSAGE_ID], 'status':'OK'}
+
+    try:
+      connection = self.initializer_module.connection
+    except ConnectionIsAlreadyClosed:
+      # access early copy of connection before it is exposed globally
+      connection = self.initializer_module.heartbeat_thread.connection
+
+    try:
+      connection.send(message=confirmation_of_received, 
destination=Constants.AGENT_RESPONSES_TOPIC)
+    except:
+      logger.exception("Could not send a confirmation '{0}' to 
server".format(confirmation_of_received))
 
   def on_event(self, headers, message):
     """
diff --git a/ambari-common/src/main/python/ambari_ws4py/websocket.py 
b/ambari-common/src/main/python/ambari_ws4py/websocket.py
index 936f333..78d233e 100644
--- a/ambari-common/src/main/python/ambari_ws4py/websocket.py
+++ b/ambari-common/src/main/python/ambari_ws4py/websocket.py
@@ -483,7 +483,7 @@ class WebSocket(object):
         self.reading_buffer_size = s.parser.send(bytes) or DEFAULT_READING_SIZE
 
         if s.closing is not None:
-            logger.debug("Closing message received (%d) '%s'" % 
(s.closing.code, s.closing.reason))
+            logger.info("Closing message received (%d) '%s'" % 
(s.closing.code, s.closing.reason))
             if not self.server_terminated:
                 self.close(s.closing.code, s.closing.reason)
             else:
@@ -492,7 +492,7 @@ class WebSocket(object):
 
         if s.errors:
             for error in s.errors:
-                logger.debug("Error message received (%d) '%s'" % (error.code, 
error.reason))
+                logger.warn("Error message received (%d) '%s'" % (error.code, 
error.reason))
                 self.close(error.code, error.reason)
             s.errors = []
             return False
@@ -548,6 +548,8 @@ class WebSocket(object):
                 while not self.terminated:
                     if not self.once():
                         break
+            except:
+                logger.exception("Websocket connection was closed with an 
exception")
             finally:
                 self.terminate()
 

-- 
To stop receiving notification emails like this one, please contact
aonis...@apache.org.

Reply via email to