Repository: ambari Updated Branches: refs/heads/branch-3.0-perf 8cc384cae -> 2a493704f
AMBARI-20828. Heartbeat and register with real server instead of mock server (aonishuk) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/2a493704 Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/2a493704 Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/2a493704 Branch: refs/heads/branch-3.0-perf Commit: 2a493704fdf16e8cc38f27c062c9c9bbf374734a Parents: 8cc384c Author: Andrew Onishuk <aonis...@hortonworks.com> Authored: Mon Apr 24 15:02:08 2017 +0300 Committer: Andrew Onishuk <aonis...@hortonworks.com> Committed: Mon Apr 24 15:02:08 2017 +0300 ---------------------------------------------------------------------- .../main/python/ambari_agent/AmbariConfig.py | 2 +- .../main/python/ambari_agent/ClusterCache.py | 8 +- .../src/main/python/ambari_agent/Constants.py | 6 +- .../main/python/ambari_agent/HeartbeatThread.py | 30 ++-- .../python/ambari_agent/InitializerModule.py | 80 +++++++++ .../src/main/python/ambari_agent/Utils.py | 22 ++- .../main/python/ambari_agent/client_example.py | 8 +- .../src/main/python/ambari_agent/security.py | 33 +--- .../ambari_agent/BaseStompServerTestCase.py | 164 ++++++++++--------- .../ambari_agent/TestAgentStompResponses.py | 37 ++--- .../python/ambari_stomp/adapter/websocket.py | 2 +- 11 files changed, 234 insertions(+), 158 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py index cf48189..fe48870 100644 --- a/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py +++ b/ambari-agent/src/main/python/ambari_agent/AmbariConfig.py @@ -43,7 +43,7 @@ data_cleanup_interval=86400 data_cleanup_max_age=2592000 data_cleanup_max_size_MB = 100 ping_port=8670 -cache_dir={ps}var{ps}lib{ps}ambari-agent{ps}cache +cache_dir={ps}tmp parallel_execution=0 system_resource_overrides={ps}etc{ps}resource_overrides http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/ClusterCache.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py index f2ac8ed..4bd94ae 100644 --- a/ambari-agent/src/main/python/ambari_agent/ClusterCache.py +++ b/ambari-agent/src/main/python/ambari_agent/ClusterCache.py @@ -46,10 +46,6 @@ class ClusterCache(dict): self._cache_lock = threading.RLock() self.__current_cache_json_file = os.path.join(self.cluster_cache_dir, self.get_cache_name()+'.json') - # ensure that our cache directory exists - if not os.path.exists(cluster_cache_dir): - os.makedirs(cluster_cache_dir) - # if the file exists, then load it cache_dict = {} if os.path.isfile(self.__current_cache_json_file): @@ -84,6 +80,10 @@ class ClusterCache(dict): with self.__file_lock: + # ensure that our cache directory exists + if not os.path.exists(self.cluster_cache_dir): + os.makedirs(self.cluster_cache_dir) + with os.fdopen(os.open(self.__current_cache_json_file, os.O_WRONLY | os.O_CREAT, 0o600), "w") as f: json.dump(self, f, indent=2) http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Constants.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Constants.py b/ambari-agent/src/main/python/ambari_agent/Constants.py index 9ad5f2e..50fac9e 100644 --- a/ambari-agent/src/main/python/ambari_agent/Constants.py +++ b/ambari-agent/src/main/python/ambari_agent/Constants.py @@ -23,12 +23,12 @@ COMMANDS_TOPIC = '/events/commands' CONFIGURATIONS_TOPIC = '/events/configurations' METADATA_TOPIC = '/events/metadata' TOPOLOGIES_TOPIC = '/events/topologies' -SERVER_RESPONSES_TOPIC = '/user' +SERVER_RESPONSES_TOPIC = '/user/' TOPICS_TO_SUBSCRIBE = [SERVER_RESPONSES_TOPIC, COMMANDS_TOPIC, CONFIGURATIONS_TOPIC, METADATA_TOPIC, TOPOLOGIES_TOPIC] -HEARTBEAT_ENDPOINT = '/agent/heartbeat' -REGISTRATION_ENDPOINT = '/agent/registration' +HEARTBEAT_ENDPOINT = '/heartbeat' +REGISTRATION_ENDPOINT = '/register' AGENT_TMP_DIR = "/var/lib/ambari-agent/tmp" CORRELATION_ID_STRING = 'correlationId' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py index b2469d2..63674d5 100644 --- a/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py +++ b/ambari-agent/src/main/python/ambari_agent/HeartbeatThread.py @@ -27,9 +27,7 @@ import security from collections import defaultdict from ambari_agent import Constants -from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache -from ambari_agent.ClusterTopologyCache import ClusterTopologyCache -from ambari_agent.ClusterMetadataCache import ClusterMetadataCache +from ambari_agent.InitializerModule import initializer_module from ambari_agent.listeners.ServerResponsesListener import ServerResponsesListener from ambari_agent.listeners.TopologyEventListener import TopologyEventListener from ambari_agent.listeners.ConfigurationEventListener import ConfigurationEventListener @@ -45,25 +43,17 @@ class HeartbeatThread(threading.Thread): """ def __init__(self): threading.Thread.__init__(self) - self.stomp_connector = security.StompConnector() self.is_registered = False self.heartbeat_interval = HEARTBEAT_INTERVAL self._stop = threading.Event() - # TODO STOMP: change this once is integrated with ambari config - cluster_cache_dir = '/tmp' - - # caches - self.metadata_cache = ClusterMetadataCache(cluster_cache_dir) - self.topology_cache = ClusterTopologyCache(cluster_cache_dir) - self.configurations_cache = ClusterConfigurationCache(cluster_cache_dir) - self.caches = [self.metadata_cache, self.topology_cache, self.configurations_cache] + self.caches = [initializer_module.metadata_cache, initializer_module.topology_cache, initializer_module.configurations_cache] # listeners self.server_responses_listener = ServerResponsesListener() - self.metadata_events_listener = MetadataEventListener(self.metadata_cache) - self.topology_events_listener = TopologyEventListener(self.topology_cache) - self.configuration_events_listener = ConfigurationEventListener(self.configurations_cache) + self.metadata_events_listener = MetadataEventListener(initializer_module.metadata_cache) + self.topology_events_listener = TopologyEventListener(initializer_module.topology_cache) + self.configuration_events_listener = ConfigurationEventListener(initializer_module.configurations_cache) self.listeners = [self.server_responses_listener, self.metadata_events_listener, self.topology_events_listener, self.configuration_events_listener] def run(self): @@ -97,7 +87,7 @@ class HeartbeatThread(threading.Thread): self.subscribe_and_listen() registration_request = self.get_registration_request() - logger.info("Registration request received") + logger.info("Sending registration request") logger.debug("Registration request is {0}".format(registration_request)) response = self.blocking_request(registration_request, Constants.REGISTRATION_ENDPOINT) @@ -132,14 +122,14 @@ class HeartbeatThread(threading.Thread): Subscribe to topics and set listener classes. """ for listener in self.listeners: - self.stomp_connector.add_listener(listener) + initializer_module.connection.add_listener(listener) for topic_name in Constants.TOPICS_TO_SUBSCRIBE: - self.stomp_connector.subscribe(destination=topic_name, id='sub', ack='client-individual') + initializer_module.connection.subscribe(destination=topic_name, id='sub', ack='client-individual') def blocking_request(self, body, destination): """ Send a request to server and waits for the response from it. The response it detected by the correspondence of correlation_id. """ - self.stomp_connector.send(body=json.dumps(body), destination=destination) - return self.server_responses_listener.responses.blocking_pop(str(self.stomp_connector.correlation_id)) \ No newline at end of file + initializer_module.connection.send(body=json.dumps(body), destination=destination) + return self.server_responses_listener.responses.blocking_pop(str(initializer_module.connection.correlation_id)) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/InitializerModule.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/InitializerModule.py b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py new file mode 100644 index 0000000..76d14e5 --- /dev/null +++ b/ambari-agent/src/main/python/ambari_agent/InitializerModule.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python + +''' +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +''' + +import logging +import os +from ambari_agent.FileCache import FileCache +from ambari_agent.AmbariConfig import AmbariConfig +from ambari_agent.ClusterConfigurationCache import ClusterConfigurationCache +from ambari_agent.ClusterTopologyCache import ClusterTopologyCache +from ambari_agent.ClusterMetadataCache import ClusterMetadataCache +from ambari_agent.Utils import lazy_property +from ambari_agent.security import AmbariStompConnection + +logger = logging.getLogger() + +class InitializerModule: + """ + - Instantiate some singleton classes or widely used instances along with providing their dependencies. + - Reduce cross modules dependencies. + - Make other components code cleaner. + - Provide an easier way to mock some dependencies. + """ + def __init__(self): + self.initConfigs() + self.init() + + def initConfigs(self): + """ + Initialize every property got from ambari-agent.ini + """ + self.ambariConfig = AmbariConfig.get_resolved_config() + + self.server_hostname = self.ambariConfig.get('server', 'hostname') + self.secured_url_port = self.ambariConfig.get('server', 'secured_url_port') + + self.cache_dir = self.ambariConfig.get('agent', 'cache_dir', default='/var/lib/ambari-agent/cache') + self.cluster_cache_dir = os.path.join(self.cache_dir, FileCache.CLUSTER_CACHE_DIRECTORY) + + def init(self): + """ + Initialize properties + """ + self.metadata_cache = ClusterMetadataCache(self.cluster_cache_dir) + self.topology_cache = ClusterTopologyCache(self.cluster_cache_dir) + self.configurations_cache = ClusterConfigurationCache(self.cluster_cache_dir) + + @lazy_property + def connection(self): + """ + Create a stomp connection + """ + # TODO STOMP: handle if agent.ssl=false? + connection_url = 'wss://{0}:{1}/agent/stomp/v1'.format(self.server_hostname, self.secured_url_port) + + logging.info("Connecting to {0}".format(connection_url)) + + conn = AmbariStompConnection(connection_url) + conn.start() + conn.connect(wait=True) + + return conn + +initializer_module = InitializerModule() http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/Utils.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/Utils.py b/ambari-agent/src/main/python/ambari_agent/Utils.py index 8078ad2..6e919c0 100644 --- a/ambari-agent/src/main/python/ambari_agent/Utils.py +++ b/ambari-agent/src/main/python/ambari_agent/Utils.py @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. """ import threading +from functools import wraps class BlockingDictionary(): """ @@ -86,4 +87,23 @@ ImmutableDictionary.__setitem__ = raise_immutable_error ImmutableDictionary.__delitem__ = raise_immutable_error ImmutableDictionary.clear = raise_immutable_error ImmutableDictionary.pop = raise_immutable_error -ImmutableDictionary.update = raise_immutable_error \ No newline at end of file +ImmutableDictionary.update = raise_immutable_error + + +def lazy_property(undecorated): + """ + Only run the function decorated once. Next time return cached value. + """ + name = '_' + undecorated.__name__ + + @property + @wraps(undecorated) + def decorated(self): + try: + return getattr(self, name) + except AttributeError: + v = undecorated(self) + setattr(self, name, v) + return v + + return decorated \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/client_example.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/client_example.py b/ambari-agent/src/main/python/ambari_agent/client_example.py index 96e76be..4be0d47 100644 --- a/ambari-agent/src/main/python/ambari_agent/client_example.py +++ b/ambari-agent/src/main/python/ambari_agent/client_example.py @@ -29,7 +29,6 @@ def get_headers(): global correlationId correlationId += 1 headers = { - "content-type": "text/plain", "correlationId": correlationId } return headers @@ -50,8 +49,9 @@ class MyStatsListener(ambari_stomp.StatsListener): read_messages = [] -conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1') -conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))] +#conn = websocket.WsConnection('ws://gc6401:8080/api/stomp/v1') +#conn.transport.ws.extra_headers = [("Authorization", "Basic " + base64.b64encode('admin:admin'))] +conn = websocket.WsConnection('wss://gc6401:8441/agent/stomp/v1') conn.set_listener('my_listener', MyListener()) conn.set_listener('stats_listener', MyStatsListener()) conn.start() @@ -61,7 +61,7 @@ conn.connect(wait=True, headers=get_headers()) conn.subscribe(destination='/user/', id='sub-0', ack='client-individual') #conn.send(body="", destination='/test/time', headers=get_headers()) -conn.send(body="some message", destination='/test/echo', headers=get_headers()) +conn.send(body="{}", destination='/register', headers=get_headers()) time.sleep(1) for message in read_messages: conn.ack(message['id'], message['subscription']) http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/main/python/ambari_agent/security.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/main/python/ambari_agent/security.py b/ambari-agent/src/main/python/ambari_agent/security.py index b1599be..b3cb16e 100644 --- a/ambari-agent/src/main/python/ambari_agent/security.py +++ b/ambari-agent/src/main/python/ambari_agent/security.py @@ -30,6 +30,7 @@ import traceback import hostname import platform import ambari_stomp +from ambari_stomp.adapter.websocket import WsConnection logger = logging.getLogger(__name__) @@ -100,39 +101,17 @@ class VerifiedHTTPSConnection(httplib.HTTPSConnection): return sock -# TODO STOMP: When server part is ready re-write this class by extending WsConnection. -class StompConnector: - def __init__(self): +class AmbariStompConnection(WsConnection): + def __init__(self, url): self.correlation_id = -1 - self._connection = None - - # TODO STOMP: re-init this on_disconnect - def _get_connection(self): - if not self._connection: - self._connection = self._create_new_connection() - return self._connection - - def _create_new_connection(self): - # Connection for unit tests. TODO STOMP: fix this - hosts = [('127.0.0.1', 21613)] - connection = ambari_stomp.Connection(host_and_ports=hosts) - connection.start() - connection.connect(wait=True) - - return connection + WsConnection.__init__(self, url) def send(self, destination, body, content_type=None, headers=None, **keyword_headers): self.correlation_id += 1 - self._get_connection().send(destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers) - - def subscribe(self, *args, **kwargs): - self._get_connection().subscribe(*args, **kwargs) - - def untrack_connection(self): - self.conn = None + WsConnection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers) def add_listener(self, listener): - self._get_connection().set_listener(listener.__class__.__name__, listener) + self.set_listener(listener.__class__.__name__, listener) class CachedHTTPSConnection: """ Caches a ssl socket and uses a single https connection to the server. """ http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py index 194002f..671387a 100644 --- a/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py +++ b/ambari-agent/src/test/python/ambari_agent/BaseStompServerTestCase.py @@ -18,6 +18,7 @@ See the License for the specific language governing permissions and limitations under the License. ''' +import ambari_stomp import os import sys import time @@ -42,85 +43,85 @@ from coilmq.scheduler import FavorReliableSubscriberScheduler, RandomQueueSchedu from coilmq.protocol import STOMP10 class BaseStompServerTestCase(unittest.TestCase): + """ + Base class for test cases provides the fixtures for setting up the multi-threaded + unit test infrastructure. + We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate + inter-thread communication and lock-stepping the assertions. + """ + + def setUp(self): + + self.clients = [] + self.server = None # This gets set in the server thread. + self.server_address = None # This gets set in the server thread. + self.ready_event = threading.Event() + + addr_bound = threading.Event() + + def start_server(): + self.server = TestStompServer(('127.0.0.1', 21613), + ready_event=self.ready_event, + authenticator=None, + queue_manager=self._queuemanager(), + topic_manager=self._topicmanager()) + self.server_address = self.server.socket.getsockname() + addr_bound.set() + self.server.serve_forever() + + self.server_thread = threading.Thread( + target=start_server, name='server') + self.server_thread.start() + self.ready_event.wait() + addr_bound.wait() + + def _queuemanager(self): """ - Base class for test cases provides the fixtures for setting up the multi-threaded - unit test infrastructure. - We use a combination of C{threading.Event} and C{Queue.Queue} objects to faciliate - inter-thread communication and lock-stepping the assertions. + Returns the configured L{QueueManager} instance to use. + Can be overridden by subclasses that wish to change out any queue mgr parameters. + @rtype: L{QueueManager} """ + return QueueManager(store=MemoryQueue(), + subscriber_scheduler=FavorReliableSubscriberScheduler(), + queue_scheduler=RandomQueueScheduler()) - def setUp(self): - - self.clients = [] - self.server = None # This gets set in the server thread. - self.server_address = None # This gets set in the server thread. - self.ready_event = threading.Event() - - addr_bound = threading.Event() - - def start_server(): - self.server = TestStompServer(('127.0.0.1', 21613), - ready_event=self.ready_event, - authenticator=None, - queue_manager=self._queuemanager(), - topic_manager=self._topicmanager()) - self.server_address = self.server.socket.getsockname() - addr_bound.set() - self.server.serve_forever() - - self.server_thread = threading.Thread( - target=start_server, name='server') - self.server_thread.start() - self.ready_event.wait() - addr_bound.wait() - - def _queuemanager(self): - """ - Returns the configured L{QueueManager} instance to use. - Can be overridden by subclasses that wish to change out any queue mgr parameters. - @rtype: L{QueueManager} - """ - return QueueManager(store=MemoryQueue(), - subscriber_scheduler=FavorReliableSubscriberScheduler(), - queue_scheduler=RandomQueueScheduler()) - - def _topicmanager(self): - """ - Returns the configured L{TopicManager} instance to use. - Can be overridden by subclasses that wish to change out any topic mgr parameters. - @rtype: L{TopicManager} - """ - return TopicManager() + def _topicmanager(self): + """ + Returns the configured L{TopicManager} instance to use. + Can be overridden by subclasses that wish to change out any topic mgr parameters. + @rtype: L{TopicManager} + """ + return TopicManager() - def tearDown(self): - for c in self.clients: - c.close() - self.server.shutdown() # server_close takes too much time - self.server_thread.join() - self.ready_event.clear() - del self.server_thread + def tearDown(self): + for c in self.clients: + c.close() + self.server.shutdown() # server_close takes too much time + self.server_thread.join() + self.ready_event.clear() + del self.server_thread - def _new_client(self, connect=True): - """ - Get a new L{TestStompClient} connected to our test server. - The client will also be registered for close in the tearDown method. - @param connect: Whether to issue the CONNECT command. - @type connect: C{bool} - @rtype: L{TestStompClient} - """ - client = TestStompClient(self.server_address) - self.clients.append(client) - if connect: - client.connect() - res = client.received_frames.get(timeout=1) - self.assertEqual(res.cmd, frames.CONNECTED) - return client + def _new_client(self, connect=True): + """ + Get a new L{TestStompClient} connected to our test server. + The client will also be registered for close in the tearDown method. + @param connect: Whether to issue the CONNECT command. + @type connect: C{bool} + @rtype: L{TestStompClient} + """ + client = TestStompClient(self.server_address) + self.clients.append(client) + if connect: + client.connect() + res = client.received_frames.get(timeout=1) + self.assertEqual(res.cmd, frames.CONNECTED) + return client - def get_json(self, filename): - filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename) + def get_json(self, filename): + filepath = os.path.join(os.path.abspath(os.path.dirname(__file__)), "dummy_files", "stomp", filename) - with open(filepath) as f: - return f.read() + with open(filepath) as f: + return f.read() class TestStompServer(ThreadedStompServer): @@ -151,7 +152,7 @@ class TestStompServer(ThreadedStompServer): class TestStompClient(object): """ A stomp client for use in testing. - This client spawns a listener thread and pushes anything that comes in onto the + This client spawns a listener thread and pushes anything that comes in onto the read_frames queue. @ivar received_frames: A queue of Frame instances that have been received. @type received_frames: C{Queue.Queue} containing any received C{stompclient.frame.Frame} @@ -227,4 +228,19 @@ class TestStompClient(object): raise RuntimeError("Not connected") self.connected = False self.read_stopped.wait(timeout=0.5) - self.sock.close() \ No newline at end of file + self.sock.close() + +class TestCaseTcpConnection(ambari_stomp.Connection): + def __init__(self, url): + self.correlation_id = -1 + ambari_stomp.Connection.__init__(self, host_and_ports=[('127.0.0.1', 21613)]) + + def send(self, destination, body, content_type=None, headers=None, **keyword_headers): + self.correlation_id += 1 + ambari_stomp.Connection.send(self, destination, body, content_type=content_type, headers=headers, correlationId=self.correlation_id, **keyword_headers) + + def add_listener(self, listener): + self.set_listener(listener.__class__.__name__, listener) + +from ambari_agent import security +security.AmbariStompConnection = TestCaseTcpConnection \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py ---------------------------------------------------------------------- diff --git a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py index d2a83ff..f5caa7b 100644 --- a/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py +++ b/ambari-agent/src/test/python/ambari_agent/TestAgentStompResponses.py @@ -27,6 +27,7 @@ from coilmq.util.frames import Frame from BaseStompServerTestCase import BaseStompServerTestCase from ambari_agent import HeartbeatThread +from ambari_agent.InitializerModule import initializer_module from mock.mock import MagicMock, patch @@ -34,6 +35,9 @@ class TestAgentStompResponses(BaseStompServerTestCase): def test_mock_server_can_start(self): self.init_stdout_logger() + #initializer_module.server_hostname = 'gc6401' + #initializer_module.init() + self.remove(['/tmp/configurations.json', '/tmp/metadata.json', '/tmp/topology.json']) heartbeat_thread = HeartbeatThread.HeartbeatThread() @@ -49,7 +53,7 @@ class TestAgentStompResponses(BaseStompServerTestCase): registration_frame = self.server.frames_queue.get() # server sends registration response - f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json")) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json")) self.server.topic_manager.send(f) f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json")) @@ -67,26 +71,27 @@ class TestAgentStompResponses(BaseStompServerTestCase): heartbeat_frame = self.server.frames_queue.get() heartbeat_thread._stop.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'})) self.server.topic_manager.send(f) heartbeat_thread.join() - self.assertEquals(heartbeat_thread.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org') - self.assertEquals(heartbeat_thread.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',)) - self.assertEquals(heartbeat_thread.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181') + self.assertEquals(initializer_module.topology_cache['cl1']['topology']['hosts'][0]['hostname'], 'c6401.ambari.apache.org') + self.assertEquals(initializer_module.metadata_cache['cl1']['metadata']['status_commands_to_run'], ('STATUS',)) + self.assertEquals(initializer_module.configurations_cache['cl1']['configurations']['zoo.cfg']['clientPort'], '2181') """ ============================================================================================ ============================================================================================ """ + delattr(initializer_module,'_connection') + self.server.frames_queue.queue.clear() + heartbeat_thread = HeartbeatThread.HeartbeatThread() heartbeat_thread.heartbeat_interval = 0 heartbeat_thread.start() - self.server.frames_queue.queue.clear() - connect_frame = self.server.frames_queue.get() users_subscribe_frame = self.server.frames_queue.get() commands_subscribe_frame = self.server.frames_queue.get() @@ -101,31 +106,17 @@ class TestAgentStompResponses(BaseStompServerTestCase): self.assertEquals(clusters_hashes['topology_hash'], 'd14ca943e4a69ad0dd640f32d713d2b9') # server sends registration response - f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '0'}, body=self.get_json("registration_response.json")) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '0'}, body=self.get_json("registration_response.json")) self.server.topic_manager.send(f) heartbeat_frame = self.server.frames_queue.get() heartbeat_thread._stop.set() - f = Frame(frames.MESSAGE, headers={'destination': '/user', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'})) + f = Frame(frames.MESSAGE, headers={'destination': '/user/', 'correlationId': '1'}, body=json.dumps({'heartbeat-response':'true'})) self.server.topic_manager.send(f) heartbeat_thread.join() - - def _other(self): - f = Frame(frames.MESSAGE, headers={'destination': '/events/configurations'}, body=self.get_json("configurations_update.json")) - self.server.topic_manager.send(f) - - f = Frame(frames.MESSAGE, headers={'destination': '/events/commands'}, body=self.get_json("execution_commands.json")) - self.server.topic_manager.send(f) - - f = Frame(frames.MESSAGE, headers={'destination': '/events/metadata'}, body=self.get_json("metadata_update.json")) - self.server.topic_manager.send(f) - - f = Frame(frames.MESSAGE, headers={'destination': '/events/topologies'}, body=self.get_json("topology_update.json")) - self.server.topic_manager.send(f) - def remove(self, filepathes): for filepath in filepathes: if os.path.isfile(filepath): http://git-wip-us.apache.org/repos/asf/ambari/blob/2a493704/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py ---------------------------------------------------------------------- diff --git a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py index 8416a27..91eaaac 100644 --- a/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py +++ b/ambari-common/src/main/python/ambari_stomp/adapter/websocket.py @@ -95,7 +95,7 @@ class WsTransport(Transport): Transport.stop(self) class WsConnection(BaseConnection, Protocol12): - def __init__(self, url, wait_on_receipt=False): + def __init__(self, url): self.transport = WsTransport(url) self.transport.set_listener('ws-listener', self) self.transactions = {}