Tushar Gupta has proposed merging ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into lp-codeimport:focal with ~tushar5526/lp-codeimport:deprecate-bzr as a prerequisite.
Requested reviews: Launchpad code reviewers (launchpad-reviewers) For more details, see: https://code.launchpad.net/~tushar5526/lp-codeimport/+git/lp-codeimport/+merge/488162 -- Your team Launchpad code reviewers is requested to review the proposed merge of ~tushar5526/lp-codeimport:move-to-kombu-from-amqp into lp-codeimport:focal.
diff --git a/configs/development/codeimport-lazr.conf b/configs/development/codeimport-lazr.conf index 30dc168..0696766 100644 --- a/configs/development/codeimport-lazr.conf +++ b/configs/development/codeimport-lazr.conf @@ -23,7 +23,5 @@ error_dir: /var/tmp/lperr ca_certificates_path: /etc/ssl/certs/ca-certificates.crt [rabbitmq] -host: localhost:56720 -userid: guest -password: guest -virtual_host: / +launch: True +broker_urls: amqp://guest:guest@localhost:56720// diff --git a/configs/testrunner/codeimport-lazr.conf b/configs/testrunner/codeimport-lazr.conf index c91cff0..07c8580 100644 --- a/configs/testrunner/codeimport-lazr.conf +++ b/configs/testrunner/codeimport-lazr.conf @@ -10,7 +10,5 @@ oops_prefix: T error_dir: /var/tmp/lperr.test [rabbitmq] -host: none -userid: none -password: none -virtual_host: none +launch: False +broker_urls: none diff --git a/lib/lp/services/config/schema-lazr.conf b/lib/lp/services/config/schema-lazr.conf index 3041d6c..6d2d380 100644 --- a/lib/lp/services/config/schema-lazr.conf +++ b/lib/lp/services/config/schema-lazr.conf @@ -114,12 +114,26 @@ urlfetch_timeout: 30 [rabbitmq] -# The host:port at which RabbitMQ is listening. +# Should RabbitMQ be launched by default? +# datatype: boolean +launch: False +# The URL at which RabbitMQ is listening (in the form +# amqp://USERNAME:PASSWORD@HOSTNAME:PORT/VIRTUAL_HOST), or a space-separated +# list of such URLs to use round-robin failover between them. +broker_urls: none +# The host:port at which RabbitMQ is listening (ignored if broker_urls is +# set). # datatype: string host: none +# The username to use when connecting to RabbitMQ (ignored if broker_urls is +# set). # datatype: string userid: none +# The password to use when connecting to RabbitMQ (ignored if broker_urls is +# set). # datatype: string password: none +# The virtual host to use when connecting to RabbitMQ (ignored if +# broker_urls is set). # datatype: string virtual_host: none diff --git a/lib/lp/services/messaging/interfaces.py b/lib/lp/services/messaging/interfaces.py index c3fa0e3..c73efac 100644 --- a/lib/lp/services/messaging/interfaces.py +++ b/lib/lp/services/messaging/interfaces.py @@ -3,20 +3,10 @@ """Messaging interfaces.""" -__metaclass__ = type __all__ = [ - 'IMessageConsumer', - 'IMessageProducer', - 'IMessageSession', - 'MessagingException', - 'MessagingUnavailable', - 'QueueEmpty', - 'QueueNotFound', - ] - - -from zope.interface import Interface -from zope.schema import Bool + "MessagingException", + "MessagingUnavailable", +] class MessagingException(Exception): @@ -25,77 +15,3 @@ class MessagingException(Exception): class MessagingUnavailable(MessagingException): """Messaging systems are not available.""" - - -class QueueNotFound(MessagingException): - """Raised if the queue was not found.""" - - -class QueueEmpty(MessagingException): - """Raised if there are no queued messages on a non-blocking read.""" - - -class IMessageSession(Interface): - - is_connected = Bool( - u"Whether the session is connected to the messaging system.") - - def connect(): - """Connect to the messaging system. - - If the session is already connected this should be a no-op. - """ - - def disconnect(): - """Disconnect from the messaging system. - - If the session is already disconnected this should be a no-op. - """ - - def flush(): - """Run deferred tasks.""" - - def finish(): - """Flush the session and reset.""" - - def reset(): - """Reset the session.""" - - def defer(func, *args, **kwargs): - """Schedule something to happen when this session is finished.""" - - def getProducer(name): - """Get a `IMessageProducer` associated with this session.""" - - def getConsumer(name): - """Get a `IMessageConsumer` associated with this session.""" - - -class IMessageConsumer(Interface): - - def receive(blocking=True): - """Receive data from the queue. - - :raises EmptyQueue: If non-blocking and the queue is empty. - """ - - -class IMessageProducer(Interface): - - def send(data): - """Serialize `data` into JSON and send it to the queue on commit.""" - - def sendNow(data): - """Serialize `data` into JSON and send it to the queue immediately.""" - - def associateConsumer(consumer): - """Make the consumer receive messages from this producer on commit. - - :param consumer: An `IMessageConsumer` - """ - - def associateConsumerNow(consumer): - """Make the consumer receive messages from this producer. - - :param consumer: An `IMessageConsumer` - """ diff --git a/lib/lp/services/messaging/rabbit.py b/lib/lp/services/messaging/rabbit.py index 417d3a6..bfdf60f 100644 --- a/lib/lp/services/messaging/rabbit.py +++ b/lib/lp/services/messaging/rabbit.py @@ -3,67 +3,28 @@ """An API for messaging systems in Launchpad, e.g. RabbitMQ.""" -__metaclass__ = type __all__ = [ "connect", "is_configured", - "session", - "unreliable_session", - ] +] -from collections import deque -from functools import partial -import json -import socket -import sys -import threading -import time - -import amqp -import transaction -from transaction._transaction import Status as TransactionStatus -from zope.interface import implementer +import kombu +from lazr.config import as_host_port from lp.services.config import config -from lp.services.messaging.interfaces import ( - IMessageConsumer, - IMessageProducer, - IMessageSession, - MessagingUnavailable, - QueueEmpty, - QueueNotFound, - ) - - -LAUNCHPAD_EXCHANGE = "launchpad-exchange" - - -@implementer(transaction.interfaces.ISynchronizer) -class RabbitSessionTransactionSync: - - def __init__(self, session): - self.session = session - - def newTransaction(self, txn): - pass - - def beforeCompletion(self, txn): - pass - - def afterCompletion(self, txn): - if txn.status == TransactionStatus.COMMITTED: - self.session.finish() - else: - self.session.reset() +from lp.services.messaging.interfaces import MessagingUnavailable def is_configured(): """Return True if rabbit looks to be configured.""" + if config.rabbitmq.broker_urls is not None: + return True return not ( - config.rabbitmq.host is None or - config.rabbitmq.userid is None or - config.rabbitmq.password is None or - config.rabbitmq.virtual_host is None) + config.rabbitmq.host is None + or config.rabbitmq.userid is None + or config.rabbitmq.password is None + or config.rabbitmq.virtual_host is None + ) def connect(): @@ -73,216 +34,16 @@ def connect(): """ if not is_configured(): raise MessagingUnavailable("Incomplete configuration") - connection = amqp.Connection( - host=config.rabbitmq.host, userid=config.rabbitmq.userid, - password=config.rabbitmq.password, - virtual_host=config.rabbitmq.virtual_host) + if config.rabbitmq.broker_urls is not None: + connection = kombu.Connection(config.rabbitmq.broker_urls.split()) + else: + hostname, port = as_host_port(config.rabbitmq.host, default_port=5672) + connection = kombu.Connection( + hostname=hostname, + userid=config.rabbitmq.userid, + password=config.rabbitmq.password, + virtual_host=config.rabbitmq.virtual_host, + port=port, + ) connection.connect() return connection - - -@implementer(IMessageSession) -class RabbitSession(threading.local): - - exchange = LAUNCHPAD_EXCHANGE - - def __init__(self): - super(RabbitSession, self).__init__() - self._connection = None - self._deferred = deque() - # Maintain sessions according to transaction boundaries. Keep a strong - # reference to the sync because the transaction manager does not. We - # need one per thread (definining it here is enough to ensure that). - self._sync = RabbitSessionTransactionSync(self) - transaction.manager.registerSynch(self._sync) - - @property - def is_connected(self): - """See `IMessageSession`.""" - return self._connection is not None and self._connection.connected - - def connect(self): - """See `IMessageSession`. - - Open a connection for this thread if necessary. Connections cannot be - shared between threads. - """ - if self._connection is None or not self._connection.connected: - self._connection = connect() - return self._connection - - def disconnect(self): - """See `IMessageSession`.""" - if self._connection is not None: - try: - self._connection.close() - except socket.error: - # Socket error is fine; the connection is still closed. - pass - finally: - self._connection = None - - def flush(self): - """See `IMessageSession`.""" - tasks = self._deferred - while len(tasks) != 0: - tasks.popleft()() - - def finish(self): - """See `IMessageSession`.""" - try: - self.flush() - finally: - self.reset() - - def reset(self): - """See `IMessageSession`.""" - self._deferred.clear() - self.disconnect() - - def defer(self, func, *args, **kwargs): - """See `IMessageSession`.""" - self._deferred.append(partial(func, *args, **kwargs)) - - def getProducer(self, name): - """See `IMessageSession`.""" - return RabbitRoutingKey(self, name) - - def getConsumer(self, name): - """See `IMessageSession`.""" - return RabbitQueue(self, name) - - -# Per-thread sessions. -session = RabbitSession() -session_finish_handler = ( - lambda event: session.finish()) - - -class RabbitUnreliableSession(RabbitSession): - """An "unreliable" `RabbitSession`. - - Unreliable in this case means that certain errors in deferred tasks are - silently suppressed. This means that services can continue to function - even in the absence of a running and fully functional message queue. - - Other types of errors are also caught because we don't want this - subsystem to destabilise other parts of Launchpad but we nonetheless - record OOPses for these. - - XXX: We only suppress MessagingUnavailable for now because we want to - monitor this closely before we add more exceptions to the - suppressed_errors list. Potential candidates are `MessagingException`, - `IOError` or `amqp.AMQPException`. - """ - - suppressed_errors = ( - MessagingUnavailable, - ) - - def finish(self): - """See `IMessageSession`. - - Suppresses errors listed in `suppressed_errors`. Also suppresses - other errors but files an oops report for these. - """ - try: - super(RabbitUnreliableSession, self).finish() - except self.suppressed_errors: - pass - except Exception: - from lp.services.webapp import errorlog - errorlog.globalErrorUtility.raising(sys.exc_info()) - - -# Per-thread "unreliable" sessions. -unreliable_session = RabbitUnreliableSession() -unreliable_session_finish_handler = ( - lambda event: unreliable_session.finish()) - - -class RabbitMessageBase: - """Base class for all RabbitMQ messaging.""" - - def __init__(self, session): - self.session = IMessageSession(session) - self._channel = None - - @property - def channel(self): - if self._channel is None or not self._channel.is_open: - connection = self.session.connect() - self._channel = connection.channel() - self._channel.exchange_declare( - self.session.exchange, "direct", durable=False, - auto_delete=False, nowait=False) - return self._channel - - -@implementer(IMessageProducer) -class RabbitRoutingKey(RabbitMessageBase): - """A RabbitMQ data origination point.""" - - def __init__(self, session, routing_key): - super(RabbitRoutingKey, self).__init__(session) - self.key = routing_key - - def associateConsumer(self, consumer): - """Only receive messages for requested routing key.""" - self.session.defer(self.associateConsumerNow, consumer) - - def associateConsumerNow(self, consumer): - """Only receive messages for requested routing key.""" - # The queue will be auto-deleted 5 minutes after its last use. - # http://www.rabbitmq.com/extensions.html#queue-leases - self.channel.queue_declare( - consumer.name, nowait=False, auto_delete=False, - arguments={"x-expires": 300000}) # 5 minutes. - self.channel.queue_bind( - queue=consumer.name, exchange=self.session.exchange, - routing_key=self.key, nowait=False) - - def send(self, data): - """See `IMessageProducer`.""" - self.session.defer(self.sendNow, data) - - def sendNow(self, data): - """Immediately send a message to the broker.""" - json_data = json.dumps(data) - msg = amqp.Message(json_data) - self.channel.basic_publish( - exchange=self.session.exchange, - routing_key=self.key, msg=msg) - - -@implementer(IMessageConsumer) -class RabbitQueue(RabbitMessageBase): - """A RabbitMQ Queue.""" - - def __init__(self, session, name): - super(RabbitQueue, self).__init__(session) - self.name = name - - def receive(self, timeout=0.0): - """Pull a message from the queue. - - :param timeout: Wait a maximum of `timeout` seconds before giving up, - trying at least once. - :raises QueueEmpty: if the timeout passes. - """ - endtime = time.time() + timeout - while True: - try: - message = self.channel.basic_get(self.name) - if message is None: - if time.time() > endtime: - raise QueueEmpty() - time.sleep(0.1) - else: - self.channel.basic_ack(message.delivery_tag) - return json.loads(message.body) - except amqp.ChannelError as error: - if error.reply_code == 404: - raise QueueNotFound() - else: - raise diff --git a/lib/lp/services/messaging/tests/test_rabbit.py b/lib/lp/services/messaging/tests/test_rabbit.py index b25c073..f4d8f7c 100644 --- a/lib/lp/services/messaging/tests/test_rabbit.py +++ b/lib/lp/services/messaging/tests/test_rabbit.py @@ -1,417 +1,121 @@ -# Copyright 2011 Canonical Ltd. This software is licensed under the +# Copyright 2022 Canonical Ltd. This software is licensed under the # GNU Affero General Public License version 3 (see the file LICENSE). -"""Messaging utility tests.""" +from kombu.utils.url import parse_url +from testtools.matchers import MatchesStructure -__metaclass__ = type +from lp.services.config import config +from lp.services.messaging import rabbit +from lp.testing import TestCase +from lp.testing.layers import BaseLayer, RabbitMQLayer -from functools import partial -from itertools import count -import socket -import six -from testtools.testcase import ExpectedException -import transaction -from transaction._transaction import Status as TransactionStatus +class TestIsConfigured(TestCase): + layer = BaseLayer -from lp.services.messaging.interfaces import ( - IMessageConsumer, - IMessageProducer, - IMessageSession, - MessagingUnavailable, - QueueEmpty, - QueueNotFound, - ) -from lp.services.messaging.rabbit import ( - RabbitMessageBase, - RabbitQueue, - RabbitRoutingKey, - RabbitSession, - RabbitSessionTransactionSync, - RabbitUnreliableSession, - session as global_session, - unreliable_session as global_unreliable_session, - ) -from lp.testing import ( - monkey_patch, - TestCase, - ) -from lp.testing.fakemethod import FakeMethod -from lp.testing.faketransaction import FakeTransaction -from lp.testing.layers import RabbitMQLayer -from lp.testing.matchers import Provides + def test_unconfigured(self): + self.assertFalse(rabbit.is_configured()) + def test_broker_url(self): + self.pushConfig( + "rabbitmq", broker_urls="amqp://guest:guest@rabbitmq.example//" + ) + self.assertTrue(rabbit.is_configured()) -# RabbitMQ is not (yet) torn down or reset between tests, so here are sources -# of distinct names. -queue_names = ("queue.%d" % num for num in count(1)) -key_names = ("key.%d" % num for num in count(1)) + def test_partial_compat(self): + self.pushConfig("rabbitmq", host="rabbitmq.example") + self.assertFalse(rabbit.is_configured()) + def test_full_compat(self): + self.pushConfig( + "rabbitmq", + host="rabbitmq.example", + userid="guest", + password="guest", + virtual_host="/", + ) + self.assertTrue(rabbit.is_configured()) -class FakeRabbitSession: - def __init__(self): - self.log = [] - - def finish(self): - self.log.append("finish") - - def reset(self): - self.log.append("reset") - - -class TestRabbitSessionTransactionSync(TestCase): - - def test_interface(self): - self.assertThat( - RabbitSessionTransactionSync(None), - Provides(transaction.interfaces.ISynchronizer)) - - def test_afterCompletion_COMMITTED(self): - txn = FakeTransaction() - txn.status = TransactionStatus.COMMITTED - fake_session = FakeRabbitSession() - sync = RabbitSessionTransactionSync(fake_session) - sync.afterCompletion(txn) - self.assertEqual(["finish"], fake_session.log) - - def test_afterCompletion_ACTIVE(self): - txn = FakeTransaction() - txn.status = TransactionStatus.ACTIVE - fake_session = FakeRabbitSession() - sync = RabbitSessionTransactionSync(fake_session) - sync.afterCompletion(txn) - self.assertEqual(["reset"], fake_session.log) - - -class RabbitTestCase(TestCase): - - layer = RabbitMQLayer - - def tearDown(self): - super(RabbitTestCase, self).tearDown() - global_session.reset() - global_unreliable_session.reset() - - -class TestRabbitSession(RabbitTestCase): - - session_factory = RabbitSession - - def test_interface(self): - session = self.session_factory() - self.assertThat(session, Provides(IMessageSession)) - - def test_connect(self): - session = self.session_factory() - self.assertFalse(session.is_connected) - connection = session.connect() - self.assertTrue(session.is_connected) - self.assertIs(connection, session._connection) - - def test_connect_with_incomplete_configuration(self): - self.pushConfig("rabbitmq", host="none") - session = self.session_factory() - with ExpectedException( - MessagingUnavailable, "Incomplete configuration"): - session.connect() - - def test_disconnect(self): - session = self.session_factory() - session.connect() - session.disconnect() - self.assertFalse(session.is_connected) - - def test_disconnect_with_error(self): - session = self.session_factory() - session.connect() - old_close = session._connection.close - - def new_close(*args, **kwargs): - old_close(*args, **kwargs) - raise socket.error - - with monkey_patch(session._connection, close=new_close): - session.disconnect() - self.assertFalse(session.is_connected) - - def test_is_connected(self): - # is_connected is False once a connection has been closed. - session = self.session_factory() - session.connect() - # Close the connection without using disconnect(). - session._connection.close() - self.assertFalse(session.is_connected) - - def test_defer(self): - task = lambda foo, bar: None - session = self.session_factory() - session.defer(task, "foo", bar="baz") - self.assertEqual(1, len(session._deferred)) - [deferred_task] = session._deferred - self.assertIsInstance(deferred_task, partial) - self.assertIs(task, deferred_task.func) - self.assertEqual(("foo",), deferred_task.args) - self.assertEqual({"bar": "baz"}, deferred_task.keywords) - - def test_flush(self): - # RabbitSession.flush() runs deferred tasks. - log = [] - task = lambda: log.append("task") - session = self.session_factory() - session.defer(task) - session.connect() - session.flush() - self.assertEqual(["task"], log) - self.assertEqual([], list(session._deferred)) - self.assertTrue(session.is_connected) - - def test_reset(self): - # RabbitSession.reset() resets session variables and does not run - # deferred tasks. - log = [] - task = lambda: log.append("task") - session = self.session_factory() - session.defer(task) - session.connect() - session.reset() - self.assertEqual([], log) - self.assertEqual([], list(session._deferred)) - self.assertFalse(session.is_connected) - - def test_finish(self): - # RabbitSession.finish() resets session variables after running - # deferred tasks. - log = [] - task = lambda: log.append("task") - session = self.session_factory() - session.defer(task) - session.connect() - session.finish() - self.assertEqual(["task"], log) - self.assertEqual([], list(session._deferred)) - self.assertFalse(session.is_connected) - - def test_getProducer(self): - session = self.session_factory() - producer = session.getProducer("foo") - self.assertIsInstance(producer, RabbitRoutingKey) - self.assertIs(session, producer.session) - self.assertEqual("foo", producer.key) - - def test_getConsumer(self): - session = self.session_factory() - consumer = session.getConsumer("foo") - self.assertIsInstance(consumer, RabbitQueue) - self.assertIs(session, consumer.session) - self.assertEqual("foo", consumer.name) - - -class TestRabbitUnreliableSession(TestRabbitSession): - - session_factory = RabbitUnreliableSession +class TestConnect(TestCase): layer = RabbitMQLayer - def setUp(self): - super(TestRabbitUnreliableSession, self).setUp() - self.prev_oops = self.getOops() - - def getOops(self): - try: - self.oops_capture.sync() - return self.oopses[-1] - except IndexError: - return None - - def assertNoOops(self): - oops_report = self.getOops() - self.assertEqual(repr(self.prev_oops), repr(oops_report)) - - def assertOops(self, text_in_oops): - oops_report = self.getOops() - self.assertNotEqual( - repr(self.prev_oops), repr(oops_report), 'No OOPS reported!') - self.assertIn(text_in_oops, str(oops_report)) - - def _test_finish_suppresses_exception(self, exception): - # Simple helper to test that the given exception is suppressed - # when raised by finish(). - session = self.session_factory() - session.defer(FakeMethod(failure=exception)) - session.finish() # Look, no exceptions! - - def test_finish_suppresses_MessagingUnavailable(self): - self._test_finish_suppresses_exception( - MessagingUnavailable('Messaging borked.')) - self.assertNoOops() - - def test_finish_suppresses_other_errors_with_oopses(self): - exception = Exception("That hent worked.") - self._test_finish_suppresses_exception(exception) - self.assertOops(str(exception)) - - -class TestRabbitMessageBase(RabbitTestCase): - - def test_session(self): - base = RabbitMessageBase(global_session) - self.assertIs(global_session, base.session) - - def test_channel(self): - # Referencing the channel property causes the session to connect. - base = RabbitMessageBase(global_session) - self.assertFalse(base.session.is_connected) - channel = base.channel - self.assertTrue(base.session.is_connected) - self.assertIsNot(None, channel) - # The same channel is returned every time. - self.assertIs(channel, base.channel) - - def test_channel_session_closed(self): - # When the session is disconnected the channel is thrown away too. - base = RabbitMessageBase(global_session) - channel1 = base.channel - base.session.disconnect() - channel2 = base.channel - self.assertNotEqual(channel1, channel2) - - -class TestRabbitRoutingKey(RabbitTestCase): - - def test_interface(self): - routing_key = RabbitRoutingKey(global_session, next(key_names)) - self.assertThat(routing_key, Provides(IMessageProducer)) - - def test_associateConsumer(self): - # associateConsumer() only associates the consumer at transaction - # commit time. However, order is preserved. - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumer(consumer) - # The session is still not connected. - self.assertFalse(global_session.is_connected) - routing_key.sendNow('now') - routing_key.send('later') - # The queue is not found because the consumer has not yet been - # associated with the routing key and the queue declared. - self.assertRaises(QueueNotFound, consumer.receive, timeout=2) - transaction.commit() - # Now that the transaction has been committed, the consumer is - # associated, and receives the deferred message. - self.assertEqual('later', consumer.receive(timeout=2)) - - def test_associateConsumerNow(self): - # associateConsumerNow() associates the consumer right away. - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - routing_key.sendNow('now') - routing_key.send('later') - # There is already something in the queue. - self.assertEqual('now', consumer.receive(timeout=2)) - transaction.commit() - # Now that the transaction has been committed there is another item in - # the queue. - self.assertEqual('later', consumer.receive(timeout=2)) - - def test_send(self): - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - - for data in range(90, 100): - routing_key.send(data) - - routing_key.sendNow('sync') - # There is nothing in the queue except the sync we just sent. - self.assertEqual('sync', consumer.receive(timeout=2)) - - # Messages get sent on commit - transaction.commit() - for data in range(90, 100): - self.assertEqual(data, consumer.receive()) - - # There are no more messages. They have all been consumed. - routing_key.sendNow('sync') - self.assertEqual('sync', consumer.receive(timeout=2)) - - def test_sendNow(self): - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - - for data in range(50, 60): - routing_key.sendNow(data) - received_data = consumer.receive(timeout=2) - self.assertEqual(data, received_data) - - def test_does_not_connect_session_immediately(self): - # RabbitRoutingKey does not connect the session until necessary. - RabbitRoutingKey(global_session, next(key_names)) - self.assertFalse(global_session.is_connected) - - -class TestRabbitQueue(RabbitTestCase): - - def test_interface(self): - consumer = RabbitQueue(global_session, next(queue_names)) - self.assertThat(consumer, Provides(IMessageConsumer)) - - def test_receive(self): - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - - for data in range(55, 65): - routing_key.sendNow(data) - self.assertEqual(data, consumer.receive(timeout=2)) - - # All the messages received were consumed. - self.assertRaises(QueueEmpty, consumer.receive, timeout=2) - - # New connections to the queue see an empty queue too. - consumer.session.disconnect() - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - self.assertRaises(QueueEmpty, consumer.receive, timeout=2) - - def test_does_not_connect_session_immediately(self): - # RabbitQueue does not connect the session until necessary. - RabbitQueue(global_session, next(queue_names)) - self.assertFalse(global_session.is_connected) - - -class TestRabbit(RabbitTestCase): - """Integration-like tests for the RabbitMQ messaging abstractions.""" - - def get_synced_sessions(self): - try: - syncs_set = transaction.manager.manager._synchs - except KeyError: - return set() - else: - return set( - sync.session for sync in six.itervalues(syncs_set.data) - if isinstance(sync, RabbitSessionTransactionSync)) - - def test_global_session(self): - self.assertIsInstance(global_session, RabbitSession) - self.assertIn(global_session, self.get_synced_sessions()) - - def test_global_unreliable_session(self): - self.assertIsInstance( - global_unreliable_session, RabbitUnreliableSession) - self.assertIn(global_unreliable_session, self.get_synced_sessions()) - - def test_abort(self): - consumer = RabbitQueue(global_session, next(queue_names)) - routing_key = RabbitRoutingKey(global_session, next(key_names)) - routing_key.associateConsumerNow(consumer) - - for data in range(90, 100): - routing_key.send(data) - - # Messages sent using send() are forgotten on abort. - transaction.abort() - self.assertRaises(QueueEmpty, consumer.receive, timeout=2) + def test_unconfigured(self): + self.pushConfig("rabbitmq", broker_urls="none") + self.assertRaisesWithContent( + rabbit.MessagingUnavailable, + "Incomplete configuration", + rabbit.connect, + ) + + def test_single_broker_url(self): + self.assertIsNotNone(config.rabbitmq.broker_urls) + [broker_url] = config.rabbitmq.broker_urls.split() + parsed_url = parse_url(broker_url) + with rabbit.connect() as connection: + self.assertThat( + connection, + MatchesStructure.byEquality( + # kombu.transport.pyamqp forces "localhost" to "127.0.0.1". + hostname="127.0.0.1", + userid=parsed_url["userid"], + password=parsed_url["password"], + virtual_host=parsed_url["virtual_host"], + port=int(parsed_url["port"]), + alt=[broker_url], + ), + ) + + def test_multiple_broker_urls(self): + self.assertIsNotNone(config.rabbitmq.broker_urls) + [broker_url] = config.rabbitmq.broker_urls.split() + parsed_url = parse_url(broker_url) + self.assertEqual("localhost", parsed_url["hostname"]) + self.pushConfig( + "rabbitmq", + broker_urls=( + "%s amqp://guest:guest@alternate.example//" % broker_url + ), + ) + with rabbit.connect() as connection: + self.assertThat( + connection, + MatchesStructure.byEquality( + # kombu.transport.pyamqp forces "localhost" to "127.0.0.1". + hostname="127.0.0.1", + userid=parsed_url["userid"], + password=parsed_url["password"], + virtual_host=parsed_url["virtual_host"], + port=int(parsed_url["port"]), + alt=[broker_url, "amqp://guest:guest@alternate.example//"], + ), + ) + + def test_compat_config(self): + # The old-style host/userid/password/virtual_host configuration + # format still works. + self.assertIsNotNone(config.rabbitmq.broker_urls) + [broker_url] = config.rabbitmq.broker_urls.split() + parsed_url = parse_url(broker_url) + self.assertEqual("localhost", parsed_url["hostname"]) + self.pushConfig( + "rabbitmq", + broker_urls="none", + host="%s:%s" % (parsed_url["hostname"], parsed_url["port"]), + userid=parsed_url["userid"], + password=parsed_url["password"], + virtual_host=parsed_url["virtual_host"], + ) + with rabbit.connect() as connection: + self.assertThat( + connection, + MatchesStructure.byEquality( + # kombu.transport.pyamqp forces "localhost" to "127.0.0.1". + hostname="127.0.0.1", + userid=parsed_url["userid"], + password=parsed_url["password"], + virtual_host=parsed_url["virtual_host"], + port=int(parsed_url["port"]), + alt=[], + ), + ) diff --git a/lib/lp/services/rabbit/server.py b/lib/lp/services/rabbit/server.py index 9cd3434..0cd8f6d 100644 --- a/lib/lp/services/rabbit/server.py +++ b/lib/lp/services/rabbit/server.py @@ -3,17 +3,9 @@ """RabbitMQ server fixture.""" -from __future__ import ( - absolute_import, - print_function, - unicode_literals, - ) - - -__metaclass__ = type __all__ = [ - 'RabbitServer', - ] + "RabbitServer", +] from textwrap import dedent @@ -28,11 +20,14 @@ class RabbitServer(rabbitfixture.server.RabbitServer): """ def setUp(self): - super(RabbitServer, self).setUp() - self.config.service_config = dedent("""\ + super().setUp() + # The two trailing slashes here are deliberate: this has the effect + # of setting the virtual host to "/" rather than to the empty + # string. + self.config.service_config = dedent( + """\ [rabbitmq] - host: localhost:%d - userid: guest - password: guest - virtual_host: / - """ % self.config.port) + broker_urls: amqp://guest:guest@localhost:%d// + """ + % self.config.port + ) diff --git a/lib/lp/services/rabbit/tests/test_server.py b/lib/lp/services/rabbit/tests/test_server.py index ed93e2d..ec200d2 100644 --- a/lib/lp/services/rabbit/tests/test_server.py +++ b/lib/lp/services/rabbit/tests/test_server.py @@ -3,45 +3,36 @@ """Tests for lp.services.rabbit.RabbitServer.""" -from __future__ import ( - absolute_import, - print_function, - unicode_literals, - ) - - -__metaclass__ = type - import io +from configparser import ConfigParser from fixtures import EnvironmentVariableFixture -from lp.services.compat import SafeConfigParser from lp.services.rabbit.server import RabbitServer from lp.testing import TestCase from lp.testing.layers import BaseLayer class TestRabbitServer(TestCase): - layer = BaseLayer def test_service_config(self): # Rabbit needs to fully isolate itself: an existing per user # .erlang.cookie has to be ignored, and ditto bogus HOME if other # tests fail to cleanup. - self.useFixture(EnvironmentVariableFixture('HOME', '/nonsense/value')) + self.useFixture(EnvironmentVariableFixture("HOME", "/nonsense/value")) + # The default timeout is 15 seconds, but increase this a bit to + # allow some more leeway for slow test environments. + fixture = self.useFixture(RabbitServer(ctltimeout=120)) # RabbitServer pokes some .ini configuration into its config. - fixture = self.useFixture(RabbitServer()) - service_config = SafeConfigParser() - service_config.readfp(io.StringIO(fixture.config.service_config)) + service_config = ConfigParser() + service_config.read_file(io.StringIO(fixture.config.service_config)) self.assertEqual(["rabbitmq"], service_config.sections()) expected = { - "host": "localhost:%d" % fixture.config.port, - "userid": "guest", - "password": "guest", - "virtual_host": "/", - } + "broker_urls": ( + "amqp://guest:guest@localhost:%d//" % fixture.config.port + ), + } observed = dict(service_config.items("rabbitmq")) self.assertEqual(expected, observed) diff --git a/lib/lp/testing/__init__.py b/lib/lp/testing/__init__.py index 68b2fcc..7ff3940 100644 --- a/lib/lp/testing/__init__.py +++ b/lib/lp/testing/__init__.py @@ -167,7 +167,18 @@ class TestCase(testtools.TestCase, fixtures.TestWithFixtures): if msg is None: msg = "%r is %r" % (expected, observed) self.assertTrue(expected is not observed, msg) - + + def assertRaisesWithContent( + self, exception, exception_content, func, *args, **kwargs + ): + """Check if the given exception is raised with given content. + + If the exception isn't raised or the exception_content doesn't + match what was raised an AssertionError is raised. + """ + err = self.assertRaises(exception, func, *args, **kwargs) + self.assertEqual(exception_content, str(err)) + def assertContentEqual(self, iter1, iter2): """Assert that 'iter1' has the same content as 'iter2'.""" self.assertThat(iter1, MatchesSetwise(*(map(Equals, iter2)))) diff --git a/lib/lp/testing/tests/test_layers_functional.py b/lib/lp/testing/tests/test_layers_functional.py index b891061..6938f27 100644 --- a/lib/lp/testing/tests/test_layers_functional.py +++ b/lib/lp/testing/tests/test_layers_functional.py @@ -15,13 +15,13 @@ __metaclass__ = type import os import uuid -import amqp from fixtures import ( EnvironmentVariableFixture, Fixture, ) from lp.services.config import config +from lp.services.messaging import rabbit from lp.testing import TestCase from lp.testing.layers import ( BaseLayer, @@ -97,18 +97,10 @@ class BaseTestCase(TestCase): self.assertEqual(BaseLayer.isSetUp, True) def testRabbitWorking(self): - rabbitmq = config.rabbitmq if not self.want_rabbitmq: - self.assertEqual(None, rabbitmq.host) + self.assertFalse(rabbit.is_configured()) else: - self.assertNotEqual(None, rabbitmq.host) - conn = amqp.Connection( - host=rabbitmq.host, - userid=rabbitmq.userid, - password=rabbitmq.password, - virtual_host=rabbitmq.virtual_host) - conn.connect() - conn.close() + rabbit.connect().close() class RabbitMQTestCase(BaseTestCase): diff --git a/requirements/lp-codeimport.txt b/requirements/lp-codeimport.txt index 8f3c399..cbf253f 100644 --- a/requirements/lp-codeimport.txt +++ b/requirements/lp-codeimport.txt @@ -5,7 +5,7 @@ # Don't list entries from ztk-versions.cfg here unless overriding their # versions; they will be included automatically. -amqp==2.4.2 +amqp==2.6.1 appdirs==1.4.3 asn1crypto==0.23.0 attrs==19.1.0 @@ -29,6 +29,7 @@ incremental==17.5.0 ipaddress==1.0.18 iso8601==0.1.12 keyring==0.6.2 +kombu==4.6.11 launchpadlib==1.10.9 lazr.config==2.2.2 lazr.delegates==2.0.4 @@ -40,7 +41,7 @@ mistune==0.8.3 mock==1.0.1 oauthlib==3.1.0 oops==0.0.14 -oops-amqp==0.1.0 +oops-amqp==0.2.0 oops-datedir-repo==0.0.24 oops-datedir2amqp==0.1.0 oops-timeline==0.0.3 @@ -54,7 +55,7 @@ pymacaroons==0.13.0 PyNaCl==1.3.0 pyOpenSSL==17.5.0 python-dateutil==2.8.1 -rabbitfixture==0.4.2 +rabbitfixture==0.5.3 responses==0.9.0 scandir==1.7 service-identity==18.1.0 @@ -62,7 +63,6 @@ setuptools-git==1.2 setuptools-scm==3.4.3 six==1.15.0 subprocess32==3.2.6 -subvertpy==0.11.0 testresources==0.2.7 testscenarios==0.4 timeline==0.0.7 diff --git a/setup.py b/setup.py index 8b4b377..fb9dede 100644 --- a/setup.py +++ b/setup.py @@ -162,11 +162,10 @@ setup( # entirely, but we need to retain it until codeimport has been # ported to Breezy. 'breezy', - 'bzr; python_version < "3"', - 'contextlib2; python_version < "3.3"', 'defusedxml', 'dulwich', 'fixtures', + 'kombu', 'lazr.config', 'lazr.enum', 'lazr.uri',
_______________________________________________ Mailing list: https://launchpad.net/~launchpad-reviewers Post to : launchpad-reviewers@lists.launchpad.net Unsubscribe : https://launchpad.net/~launchpad-reviewers More help : https://help.launchpad.net/ListHelp