Hello community,
here is the log from the commit of package python-amqpstorm for
openSUSE:Factory checked in at 2019-06-12 13:15:43
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-amqpstorm (Old)
and /work/SRC/openSUSE:Factory/.python-amqpstorm.new.4811 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-amqpstorm"
Wed Jun 12 13:15:43 2019 rev:6 rq:708980 version:2.7.0
Changes:
--------
--- /work/SRC/openSUSE:Factory/python-amqpstorm/python-amqpstorm.changes
2019-05-07 23:19:54.413044115 +0200
+++
/work/SRC/openSUSE:Factory/.python-amqpstorm.new.4811/python-amqpstorm.changes
2019-06-12 13:15:47.256723032 +0200
@@ -1,0 +2,8 @@
+Fri Jun 7 14:46:15 UTC 2019 - Marketa Calabkova <[email protected]>
+
+- Update to 2.7.0
+ * Added support for passing your own ssl context
+ * Improved logging verbosity on connection failures
+ * Fixed occasional error message when closing a SSL connection
+
+-------------------------------------------------------------------
Old:
----
AMQPStorm-2.6.2.tar.gz
New:
----
AMQPStorm-2.7.0.tar.gz
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Other differences:
------------------
++++++ python-amqpstorm.spec ++++++
--- /var/tmp/diff_new_pack.It172d/_old 2019-06-12 13:15:48.308721930 +0200
+++ /var/tmp/diff_new_pack.It172d/_new 2019-06-12 13:15:48.344721892 +0200
@@ -18,7 +18,7 @@
%{?!python_module:%define python_module() python-%{**} python3-%{**}}
Name: python-amqpstorm
-Version: 2.6.2
+Version: 2.7.0
Release: 0
Summary: Thread-safe Python RabbitMQ Client & Management library
License: MIT
++++++ AMQPStorm-2.6.2.tar.gz -> AMQPStorm-2.7.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/AMQPStorm.egg-info/PKG-INFO
new/AMQPStorm-2.7.0/AMQPStorm.egg-info/PKG-INFO
--- old/AMQPStorm-2.6.2/AMQPStorm.egg-info/PKG-INFO 2019-02-03
00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/AMQPStorm.egg-info/PKG-INFO 2019-04-20
08:26:18.000000000 +0200
@@ -1,12 +1,11 @@
-Metadata-Version: 1.1
+Metadata-Version: 2.1
Name: AMQPStorm
-Version: 2.6.2
+Version: 2.7.0
Summary: Thread-safe Python RabbitMQ Client & Management library.
Home-page: https://www.amqpstorm.io
Author: Erik Olof Gunnar Andersson
Author-email: [email protected]
License: MIT License
-Description-Content-Type: UNKNOWN
Description: AMQPStorm
=========
Thread-safe Python RabbitMQ Client & Management library.
@@ -29,6 +28,12 @@
Changelog
=========
+ Version 2.7.0
+ -------------
+ - Added support for passing your own ssl context [#71] - Thanks
troglas.
+ - Improved logging verbosity on connection failures [#72] - Thanks
troglas.
+ - Fixed occasional error message when closing a SSL connection [#68] -
Thanks troglas.
+
Version 2.6.2
-------------
- Set default TCP Timeout to 10s on UriConnection to match Connection
[#67] - Thanks josemonteiro.
@@ -53,21 +58,6 @@
- Properly wait until the inbound queue is empty when break_on_empty
is set [#63] - Thanks TomGudman.
- Fixed issue with Management queue/exchange declare when the passive
flag was set to True.
- Version 2.4.2
- -------------
- - Added support for External Authentication - Thanks Bernd Höhl.
- - Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
- - LICENSE file now included in package - Thanks Tomáš Chvátal.
-
- Version 2.4.1
- -------------
- - Added client/server negotiation to better determine the maximum
supported channels and maximum allowed frame size [#52] - Thanks gastlich.
- - We now raise an exception if the maximum allowed channel count is
reached.
-
- Version 2.4.0
- -------------
- - basic.consume now allows for multiple callbacks [#48].
-
Credits
=======
Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in
addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.
@@ -105,3 +95,4 @@
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
+Provides-Extra: management
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/AMQPStorm.egg-info/SOURCES.txt
new/AMQPStorm-2.7.0/AMQPStorm.egg-info/SOURCES.txt
--- old/AMQPStorm-2.6.2/AMQPStorm.egg-info/SOURCES.txt 2019-02-03
00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/AMQPStorm.egg-info/SOURCES.txt 2019-04-20
08:26:18.000000000 +0200
@@ -59,6 +59,8 @@
amqpstorm/tests/functional/management/queue_tests.py
amqpstorm/tests/functional/management/user_tests.py
amqpstorm/tests/functional/management/virtual_host_tests.py
+amqpstorm/tests/functional/ssl/__init__.py
+amqpstorm/tests/functional/ssl/reliability_tests.py
amqpstorm/tests/unit/__init__.py
amqpstorm/tests/unit/compatiblity_tests.py
amqpstorm/tests/unit/exception_tests.py
@@ -102,7 +104,9 @@
amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
examples/__init__.py
examples/consume_queue_until_empty.py
+examples/create_queue_with_a_ttl_on_messages.py
examples/flask_threaded_rpc_client.py
+examples/publish_message_with_expiration.py
examples/robust_consumer.py
examples/scalable_consumer.py
examples/scalable_rpc_server.py
@@ -112,6 +116,7 @@
examples/simple_rpc_client.py
examples/simple_rpc_server.py
examples/simple_transaction_publisher.py
+examples/ssl_with_context.py
examples/management_api/__init__.py
examples/management_api/aliveness_test.py
examples/management_api/create_user.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/CHANGELOG.rst
new/AMQPStorm-2.7.0/CHANGELOG.rst
--- old/AMQPStorm-2.6.2/CHANGELOG.rst 2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/CHANGELOG.rst 2019-04-20 08:25:42.000000000 +0200
@@ -1,6 +1,12 @@
Changelog
=========
+Version 2.7.0
+-------------
+- Added support for passing your own ssl context [#71] - Thanks troglas.
+- Improved logging verbosity on connection failures [#72] - Thanks troglas.
+- Fixed occasional error message when closing a SSL connection [#68] - Thanks
troglas.
+
Version 2.6.2
-------------
- Set default TCP Timeout to 10s on UriConnection to match Connection [#67] -
Thanks josemonteiro.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/PKG-INFO new/AMQPStorm-2.7.0/PKG-INFO
--- old/AMQPStorm-2.6.2/PKG-INFO 2019-02-03 00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/PKG-INFO 2019-04-20 08:26:18.000000000 +0200
@@ -1,12 +1,11 @@
-Metadata-Version: 1.1
+Metadata-Version: 2.1
Name: AMQPStorm
-Version: 2.6.2
+Version: 2.7.0
Summary: Thread-safe Python RabbitMQ Client & Management library.
Home-page: https://www.amqpstorm.io
Author: Erik Olof Gunnar Andersson
Author-email: [email protected]
License: MIT License
-Description-Content-Type: UNKNOWN
Description: AMQPStorm
=========
Thread-safe Python RabbitMQ Client & Management library.
@@ -29,6 +28,12 @@
Changelog
=========
+ Version 2.7.0
+ -------------
+ - Added support for passing your own ssl context [#71] - Thanks
troglas.
+ - Improved logging verbosity on connection failures [#72] - Thanks
troglas.
+ - Fixed occasional error message when closing a SSL connection [#68] -
Thanks troglas.
+
Version 2.6.2
-------------
- Set default TCP Timeout to 10s on UriConnection to match Connection
[#67] - Thanks josemonteiro.
@@ -53,21 +58,6 @@
- Properly wait until the inbound queue is empty when break_on_empty
is set [#63] - Thanks TomGudman.
- Fixed issue with Management queue/exchange declare when the passive
flag was set to True.
- Version 2.4.2
- -------------
- - Added support for External Authentication - Thanks Bernd Höhl.
- - Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
- - LICENSE file now included in package - Thanks Tomáš Chvátal.
-
- Version 2.4.1
- -------------
- - Added client/server negotiation to better determine the maximum
supported channels and maximum allowed frame size [#52] - Thanks gastlich.
- - We now raise an exception if the maximum allowed channel count is
reached.
-
- Version 2.4.0
- -------------
- - basic.consume now allows for multiple callbacks [#48].
-
Credits
=======
Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in
addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.
@@ -105,3 +95,4 @@
Classifier: Topic :: Software Development :: Libraries
Classifier: Topic :: Software Development :: Libraries :: Python Modules
Classifier: Topic :: System :: Networking
+Provides-Extra: management
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/README.rst
new/AMQPStorm-2.7.0/README.rst
--- old/AMQPStorm-2.6.2/README.rst 2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/README.rst 2019-04-20 08:25:42.000000000 +0200
@@ -20,6 +20,12 @@
Changelog
=========
+Version 2.7.0
+-------------
+- Added support for passing your own ssl context [#71] - Thanks troglas.
+- Improved logging verbosity on connection failures [#72] - Thanks troglas.
+- Fixed occasional error message when closing a SSL connection [#68] - Thanks
troglas.
+
Version 2.6.2
-------------
- Set default TCP Timeout to 10s on UriConnection to match Connection [#67] -
Thanks josemonteiro.
@@ -44,21 +50,6 @@
- Properly wait until the inbound queue is empty when break_on_empty is set
[#63] - Thanks TomGudman.
- Fixed issue with Management queue/exchange declare when the passive flag was
set to True.
-Version 2.4.2
--------------
-- Added support for External Authentication - Thanks Bernd Höhl.
-- Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
-- LICENSE file now included in package - Thanks Tomáš Chvátal.
-
-Version 2.4.1
--------------
-- Added client/server negotiation to better determine the maximum supported
channels and maximum allowed frame size [#52] - Thanks gastlich.
-- We now raise an exception if the maximum allowed channel count is reached.
-
-Version 2.4.0
--------------
-- basic.consume now allows for multiple callbacks [#48].
-
Credits
=======
Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition
amqpstorm is heavily influenced by his pika and rabbitpy libraries.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/__init__.py
new/AMQPStorm-2.7.0/amqpstorm/__init__.py
--- old/AMQPStorm-2.6.2/amqpstorm/__init__.py 2019-02-03 00:17:55.000000000
+0100
+++ new/AMQPStorm-2.7.0/amqpstorm/__init__.py 2019-04-20 08:25:42.000000000
+0200
@@ -1,5 +1,5 @@
"""AMQPStorm."""
-__version__ = '2.6.2' # noqa
+__version__ = '2.7.0' # noqa
__author__ = 'eandersson' # noqa
import logging
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/compatibility.py
new/AMQPStorm-2.7.0/amqpstorm/compatibility.py
--- old/AMQPStorm-2.6.2/amqpstorm/compatibility.py 2017-02-23
05:17:41.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/compatibility.py 2019-04-20
08:25:42.000000000 +0200
@@ -36,7 +36,8 @@
'certfile',
'cert_reqs',
'ssl_version',
- 'ca_certs'
+ 'ca_certs',
+ 'server_hostname',
]
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/connection.py
new/AMQPStorm-2.7.0/amqpstorm/connection.py
--- old/AMQPStorm-2.6.2/amqpstorm/connection.py 2019-02-03 00:17:55.000000000
+0100
+++ new/AMQPStorm-2.7.0/amqpstorm/connection.py 2019-04-20 08:25:42.000000000
+0200
@@ -317,7 +317,7 @@
self.heartbeat.register_read()
if channel_id == 0:
self._channel0.on_frame(frame_in)
- else:
+ elif channel_id in self._channels:
self._channels[channel_id].on_frame(frame_in)
return data_in
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/io.py
new/AMQPStorm-2.7.0/amqpstorm/io.py
--- old/AMQPStorm-2.6.2/amqpstorm/io.py 2018-12-30 00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/io.py 2019-04-20 08:25:42.000000000 +0200
@@ -56,7 +56,8 @@
def __init__(self, parameters, exceptions=None, on_read_impl=None):
self._exceptions = exceptions
- self._lock = threading.Lock()
+ self._wr_lock = threading.Lock()
+ self._rd_lock = threading.Lock()
self._inbound_thread = None
self._on_read_impl = on_read_impl
self._running = threading.Event()
@@ -71,18 +72,20 @@
:return:
"""
- self._lock.acquire()
+ self._wr_lock.acquire()
+ self._rd_lock.acquire()
try:
self._running.clear()
if self.socket:
- self.socket.close()
+ self._close_socket()
if self._inbound_thread:
self._inbound_thread.join(timeout=self._parameters['timeout'])
self._inbound_thread = None
self.poller = None
self.socket = None
finally:
- self._lock.release()
+ self._wr_lock.release()
+ self._rd_lock.release()
def open(self):
"""Open Socket and establish a connection.
@@ -91,7 +94,8 @@
encountered an error.
:return:
"""
- self._lock.acquire()
+ self._wr_lock.acquire()
+ self._rd_lock.acquire()
try:
self.data_in = EMPTY_BUFFER
self._running.set()
@@ -101,7 +105,8 @@
timeout=self._parameters['timeout'])
self._inbound_thread = self._create_inbound_thread()
finally:
- self._lock.release()
+ self._wr_lock.release()
+ self._rd_lock.release()
def write_to_socket(self, frame_data):
"""Write data to the socket.
@@ -109,7 +114,7 @@
:param str frame_data:
:return:
"""
- self._lock.acquire()
+ self._wr_lock.acquire()
try:
total_bytes_written = 0
bytes_to_send = len(frame_data)
@@ -131,7 +136,18 @@
self._exceptions.append(AMQPConnectionError(why))
return
finally:
- self._lock.release()
+ self._wr_lock.release()
+
+ def _close_socket(self):
+ """Shutdown and close the Socket.
+
+ :return:
+ """
+ try:
+ self.socket.shutdown(socket.SHUT_RDWR)
+ except (OSError, socket.error):
+ pass
+ self.socket.close()
def _get_socket_addresses(self):
"""Get Socket address information.
@@ -159,16 +175,19 @@
:rtype: socket.socket
"""
+ error_message = None
for address in addresses:
sock = self._create_socket(socket_family=address[0])
try:
sock.connect(address[4])
- except (IOError, OSError):
+ except (IOError, OSError) as why:
+ error_message = why.strerror
continue
return sock
raise AMQPConnectionError(
- 'Could not connect to %s:%d' % (
- self._parameters['hostname'], self._parameters['port']
+ 'Could not connect to %s:%d error: %s' % (
+ self._parameters['hostname'], self._parameters['port'],
+ error_message
)
)
@@ -194,6 +213,13 @@
:param socket.socket sock:
:rtype: SSLSocket
"""
+ context = self._parameters['ssl_options'].get('context')
+ if context is not None:
+ hostname = self._parameters['ssl_options'].get('server_hostname')
+ return context.wrap_socket(
+ sock, do_handshake_on_connect=True,
+ server_hostname=hostname
+ )
if 'ssl_version' not in self._parameters['ssl_options']:
self._parameters['ssl_options']['ssl_version'] = (
compatibility.DEFAULT_SSL_VERSION
@@ -234,8 +260,6 @@
"""
data_in = EMPTY_BUFFER
try:
- if not self.socket:
- raise socket.error('connection/socket error')
data_in = self._read_from_socket()
except socket.timeout:
pass
@@ -250,6 +274,12 @@
:rtype: bytes
"""
- if self.use_ssl:
+ if not self.use_ssl:
+ if not self.socket:
+ raise socket.error('connection/socket error')
+ return self.socket.recv(MAX_FRAME_SIZE)
+
+ with self._rd_lock:
+ if not self.socket:
+ raise socket.error('connection/socket error')
return self.socket.read(MAX_FRAME_SIZE)
- return self.socket.recv(MAX_FRAME_SIZE)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/tests/__init__.py
new/AMQPStorm-2.7.0/amqpstorm/tests/__init__.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/__init__.py 2018-12-30
00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/__init__.py 2019-04-20
08:25:42.000000000 +0200
@@ -1,7 +1,15 @@
import os
+CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
+
HOST = os.environ.get('AMQP_HOST', '127.0.0.1')
USERNAME = os.environ.get('AMQP_USERNAME', 'guest')
PASSWORD = os.environ.get('AMQP_PASSWORD', 'guest')
URI = os.environ.get('AMQP_URI', 'amqp://guest:[email protected]:5672/%2F')
HTTP_URL = os.environ.get('AMQP_HTTP_URL', 'http://127.0.0.1:15672')
+
+SSL_URI = os.environ.get('AMQP_SSL_URI',
+ 'amqps://guest:[email protected]:5671/%2F')
+SSL_HOST = os.environ.get('AMQP_SSL_HOST', 'rmq.eandersson.net')
+CAFILE = os.environ.get('AMQP_CAFILE',
+ '{0}/resources/cacert.pem'.format(CURRENT_DIR))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/amqpstorm/tests/functional/reliability_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/functional/reliability_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/functional/reliability_tests.py
2018-12-30 00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/functional/reliability_tests.py
2019-04-20 08:25:42.000000000 +0200
@@ -42,6 +42,7 @@
self.assertIsNone(self.connection._io.socket)
self.assertIsNone(self.connection._io.poller)
self.assertFalse(self.connection._io._running.is_set())
+ self.assertFalse(self.connection.exceptions)
@setup(new_connection=False, queue=True)
def test_functional_open_close_connection_loop(self):
@@ -69,6 +70,7 @@
self.assertIsNone(self.connection._io.socket)
self.assertIsNone(self.connection._io.poller)
self.assertFalse(self.connection._io._running.is_set())
+ self.assertFalse(self.connection.exceptions)
@setup(new_connection=True, new_channel=False, queue=True)
def test_functional_close_gracefully_after_publish_mandatory_fails(self):
@@ -129,14 +131,11 @@
@setup(new_connection=False, queue=False)
def test_functional_close_performance(self):
- """Make sure closing a connection never takes longer than ~3 seconds.
-
- In general closing a connection should take about ~1s, but we
- try to close it faster, if-possible.
+ """Make sure closing a connection never takes longer than ~1 seconds.
:return:
"""
- for _ in range(5):
+ for _ in range(100):
self.connection = self.connection = Connection(HOST, USERNAME,
PASSWORD)
start_time = time.time()
@@ -165,8 +164,8 @@
imp.reload(compatibility)
-class PublishAndConsume5kTest(TestFunctionalFramework):
- messages_to_send = 5000
+class PublishAndConsume1kTest(TestFunctionalFramework):
+ messages_to_send = 1000
messages_consumed = 0
lock = threading.Lock()
@@ -194,7 +193,7 @@
self.messages_consumed += 1
@setup(queue=True)
- def test_functional_publish_and_consume_5k_messages(self):
+ def test_functional_publish_and_consume_1k_messages(self):
self.channel.queue.declare(self.queue_name)
publish_thread = threading.Thread(target=self.publish_messages, )
@@ -220,7 +219,7 @@
'test took too long')
-class PublishAndConsumeUntilEmptyTest(TestFunctionalFramework):
+class Consume1kUntilEmpty(TestFunctionalFramework):
messages_to_send = 1000
def configure(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/amqpstorm/tests/functional/ssl/reliability_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/functional/ssl/reliability_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/functional/ssl/reliability_tests.py
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/functional/ssl/reliability_tests.py
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,273 @@
+import ssl
+import threading
+import time
+
+from amqpstorm import Connection
+from amqpstorm import UriConnection
+from amqpstorm.tests import CAFILE
+from amqpstorm.tests import PASSWORD
+from amqpstorm.tests import SSL_HOST
+from amqpstorm.tests import SSL_URI
+from amqpstorm.tests import USERNAME
+from amqpstorm.tests.utility import TestFunctionalFramework
+from amqpstorm.tests.utility import setup
+
+
+class SSLReliabilityFunctionalTests(TestFunctionalFramework):
+ @setup(new_connection=False, queue=True)
+ def test_functional_ssl_open_new_connection_loop(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+
+ for _ in range(5):
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options, timeout=1)
+ self.channel = self.connection.channel()
+
+ # Make sure that it's a new channel.
+ self.assertEqual(int(self.channel), 1)
+
+ self.channel.queue.declare(self.queue_name)
+
+ # Verify that the Connection/Channel has been opened properly.
+ self.assertIsNotNone(self.connection._io.socket)
+ self.assertIsNotNone(self.connection._io.poller)
+ self.assertTrue(self.connection.is_open)
+
+ self.channel.close()
+ self.connection.close()
+
+ # Verify that the Connection has been closed properly.
+ self.assertTrue(self.connection.is_closed)
+ self.assertIsNone(self.connection._io.socket)
+ self.assertIsNone(self.connection._io.poller)
+ self.assertFalse(self.connection._io._running.is_set())
+ self.assertFalse(self.connection.exceptions)
+
+ @setup(new_connection=False, queue=True)
+ def test_functional_ssl_open_close_connection_loop(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options, timeout=1, lazy=True)
+
+ for _ in range(5):
+ self.connection.open()
+ channel = self.connection.channel()
+
+ # Make sure that it's a new channel.
+ self.assertEqual(int(channel), 1)
+
+ channel.queue.declare(self.queue_name)
+
+ channel.close()
+
+ # Verify that the Connection/Channel has been opened properly.
+ self.assertIsNotNone(self.connection._io.socket)
+ self.assertIsNotNone(self.connection._io.poller)
+ self.assertTrue(self.connection.is_open)
+
+ self.connection.close()
+
+ # Verify that the Connection has been closed properly.
+ self.assertTrue(self.connection.is_closed)
+ self.assertIsNone(self.connection._io.socket)
+ self.assertIsNone(self.connection._io.poller)
+ self.assertFalse(self.connection._io._running.is_set())
+ self.assertFalse(self.connection.exceptions)
+
+ @setup(new_connection=False, queue=False)
+ def test_functional_ssl_open_close_channel_loop(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options)
+
+ for _ in range(25):
+ channel = self.connection.channel()
+
+ # Verify that the Channel has been opened properly.
+ self.assertTrue(self.connection.is_open)
+ self.assertTrue(channel.is_open)
+
+ # Channel id should be staying at 1.
+ self.assertEqual(int(channel), 1)
+
+ channel.close()
+
+ # Verify that theChannel has been closed properly.
+ self.assertTrue(self.connection.is_open)
+ self.assertTrue(channel.is_closed)
+
+ @setup(new_connection=False, queue=True)
+ def test_functional_ssl_open_multiple_channels(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options, timeout=1, lazy=True)
+
+ for _ in range(5):
+ channels = []
+ self.connection.open()
+ for index in range(3):
+ channel = self.connection.channel()
+ channels.append(channel)
+
+ # Verify that the Channel has been opened properly.
+ self.assertTrue(channel.is_open)
+ self.assertEqual(int(channel), len(channels))
+ self.connection.close()
+
+ @setup(new_connection=False, queue=False)
+ def test_functional_ssl_close_performance(self):
+ """Make sure closing a connection never takes longer than ~1 seconds.
+
+ :return:
+ """
+ for _ in range(100):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options, lazy=True)
+
+ start_time = time.time()
+ self.connection.close()
+ self.assertLess(time.time() - start_time, 3)
+
+ @setup(new_connection=False)
+ def test_functional_ssl_uri_connection(self):
+ self.connection = UriConnection(SSL_URI)
+ self.channel = self.connection.channel()
+ self.assertTrue(self.connection.is_open)
+
+ @setup(new_connection=False)
+ def test_functional_ssl_uri_connection_with_context(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+
+ self.connection = UriConnection(SSL_URI, ssl_options=ssl_options)
+ self.channel = self.connection.channel()
+ self.assertTrue(self.connection.is_open)
+
+
+class PublishAndConsume1kWithSSLTest(TestFunctionalFramework):
+ messages_to_send = 1000
+ messages_consumed = 0
+ lock = threading.Lock()
+
+ def configure(self):
+ self.disable_logging_validation()
+
+ def publish_messages(self):
+ for _ in range(self.messages_to_send):
+ self.channel.basic.publish(body=self.message,
+ routing_key=self.queue_name)
+
+ def consume_messages(self):
+ channel = self.connection.channel()
+ channel.basic.consume(queue=self.queue_name,
+ no_ack=False)
+ for message in channel.build_inbound_messages(
+ break_on_empty=False):
+ self.increment_message_count()
+ message.ack()
+ if self.messages_consumed == self.messages_to_send:
+ break
+
+ def increment_message_count(self):
+ with self.lock:
+ self.messages_consumed += 1
+
+ @setup(new_connection=False, queue=False)
+ def test_functional_publish_1k_with_ssl(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options)
+
+ self.channel = self.connection.channel()
+ self.channel.queue.declare(self.queue_name)
+
+ publish_thread = threading.Thread(target=self.publish_messages, )
+ publish_thread.daemon = True
+ publish_thread.start()
+
+ for _ in range(4):
+ consumer_thread = threading.Thread(target=self.consume_messages, )
+ consumer_thread.daemon = True
+ consumer_thread.start()
+
+ start_time = time.time()
+ while self.messages_consumed != self.messages_to_send:
+ if time.time() - start_time >= 60:
+ break
+ time.sleep(0.1)
+
+ for channel in list(self.connection.channels.values()):
+ channel.stop_consuming()
+ channel.close()
+
+ self.assertEqual(self.messages_consumed, self.messages_to_send,
+ 'test took too long')
+
+
+class Consume1kWithSSLUntilEmpty(TestFunctionalFramework):
+ messages_to_send = 1000
+
+ def configure(self):
+ self.disable_logging_validation()
+
+ def publish_messages(self):
+ for _ in range(self.messages_to_send):
+ self.channel.basic.publish(body=self.message,
+ routing_key=self.queue_name)
+
+ @setup(new_connection=False, queue=True)
+ def test_functional_consume_with_ssl_until_empty(self):
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile=CAFILE),
+ 'server_hostname': SSL_HOST
+ }
+ self.connection = self.connection = Connection(
+ SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+ ssl_options=ssl_options)
+
+ self.channel = self.connection.channel()
+ self.channel.queue.declare(self.queue_name)
+ self.channel.confirm_deliveries()
+ self.publish_messages()
+
+ channel = self.connection.channel()
+ channel.basic.consume(queue=self.queue_name,
+ no_ack=False)
+ message_count = 0
+ for message in channel.build_inbound_messages(break_on_empty=True):
+ message_count += 1
+ message.ack()
+
+ result = channel.queue.declare(self.queue_name, passive=True)
+ self.assertEqual(result['message_count'], 0)
+ self.assertEqual(message_count, self.messages_to_send,
+ 'not all messages consumed')
+
+ channel.close()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_exception_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_exception_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_exception_tests.py
2017-02-23 05:17:41.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_exception_tests.py
2019-04-20 08:25:42.000000000 +0200
@@ -14,6 +14,15 @@
class IOExceptionTests(TestFramework):
+ def test_io_shutdown_with_io_error(self):
+ connection = FakeConnection()
+
+ io = IO(connection.parameters)
+ io._exceptions = []
+ io.socket = mock.Mock(name='socket', spec=socket.socket)
+ io.socket.shutdown.side_effect = OSError()
+ io._close_socket()
+
def test_io_receive_raises_socket_error(self):
connection = FakeConnection()
@@ -126,7 +135,7 @@
io = IO(parameters)
self.assertRaisesRegexp(
AMQPConnectionError,
- 'Could not connect to localhost:1234',
+ 'Could not connect to localhost:1234 error: Connection refused',
io.open
)
@@ -171,3 +180,31 @@
'connection/socket error',
connection.check_for_errors
)
+
+ def test_io_socket_read_fails(self):
+ connection = FakeConnection()
+ parameters = FakeConnection().parameters
+ parameters['ssl'] = False
+ io = IO(parameters, exceptions=connection.exceptions)
+
+ self.assertFalse(io.use_ssl)
+
+ self.assertRaisesRegexp(
+ socket.error,
+ 'connection/socket error',
+ io._read_from_socket
+ )
+
+ def test_io_socket_read_fails_with_ssl(self):
+ connection = FakeConnection()
+ parameters = FakeConnection().parameters
+ parameters['ssl'] = True
+ io = IO(parameters, exceptions=connection.exceptions)
+
+ self.assertTrue(io.use_ssl)
+
+ self.assertRaisesRegexp(
+ socket.error,
+ 'connection/socket error',
+ io._read_from_socket
+ )
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_tests.py 2018-01-17
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_tests.py 2019-04-20
08:25:42.000000000 +0200
@@ -141,6 +141,25 @@
self.assertEqual(connection.parameters['ssl_options']['ssl_version'],
'travis-ci')
+ def test_io_set_ssl_context(self):
+ connection = FakeConnection()
+ connection.parameters['ssl_options'] = {
+ 'context': ssl.create_default_context(),
+ 'server_hostname': 'localhost',
+ }
+
+ io = IO(connection.parameters)
+ self.assertTrue(io._ssl_wrap_socket(socket.socket()))
+
+ def test_io_set_ssl_context_no_hostname_provided(self):
+ connection = FakeConnection()
+ connection.parameters['ssl_options'] = {
+ 'context': ssl.create_default_context(),
+ }
+
+ io = IO(connection.parameters)
+ self.assertRaises(ValueError, io._ssl_wrap_socket, socket.socket())
+
def test_io_has_ipv6(self):
restore_func = socket.getaddrinfo
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
---
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
2018-01-17 03:51:54.000000000 +0100
+++
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
2019-04-20 08:25:42.000000000 +0200
@@ -12,11 +12,11 @@
class UriConnectionExceptionTests(TestFramework):
@unittest.skipIf(sys.version_info < (3, 3), 'Python 3.x test')
def test_uri_py3_raises_on_invalid_uri(self):
- self.assertRaises(ValueError, UriConnection, 'amqp://a:b', True)
+ self.assertRaises(ValueError, UriConnection, 'amqp://a:b', {}, True)
@unittest.skipIf(sys.version_info[0] == 3, 'Python 2.x test')
def test_uri_py2_raises_on_invalid_uri(self):
- self.assertRaises(ValueError, UriConnection, 'amqp://a:b', True)
+ self.assertRaises(ValueError, UriConnection, 'amqp://a:b', {}, True)
def test_uri_raises_on_invalid_object(self):
self.assertRaises(AttributeError, UriConnection, None)
@@ -26,7 +26,7 @@
def test_uri_invalid_ssl_options(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
ssl_kwargs = {
'unit_test': ['not_required'],
@@ -39,7 +39,7 @@
def test_uri_get_invalid_ssl_version(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertEqual(connection._get_ssl_version('protocol_test'),
@@ -50,7 +50,7 @@
def test_uri_get_invalid_ssl_validation(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertEqual(ssl.CERT_NONE,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
---
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
2019-02-03 00:17:55.000000000 +0100
+++
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
2019-04-20 08:25:42.000000000 +0200
@@ -10,7 +10,7 @@
class UriConnectionTests(TestFramework):
def test_uri_default(self):
connection = UriConnection(
- 'amqp://guest:guest@localhost:5672/%2F', True
+ 'amqp://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertEqual(connection.parameters['hostname'], 'localhost')
@@ -27,14 +27,14 @@
def test_uri_ssl(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertTrue(connection.parameters['ssl'])
def test_uri_simple(self):
connection = UriConnection(
- 'amqps://localhost:5672/%2F', True
+ 'amqps://localhost:5672/%2F', lazy=True
)
self.assertEqual(connection.parameters['hostname'], 'localhost')
@@ -44,7 +44,7 @@
def test_uri_set_hostname(self):
connection = UriConnection(
'amqps://guest:guest@my-server:5672/%2F?'
- 'heartbeat=360', True
+ 'heartbeat=360', lazy=True
)
self.assertIsInstance(connection.parameters['hostname'], str)
@@ -53,7 +53,7 @@
def test_uri_set_username(self):
connection = UriConnection(
'amqps://username:guest@localhost:5672/%2F?'
- 'heartbeat=360', True
+ 'heartbeat=360', lazy=True
)
self.assertIsInstance(connection.parameters['username'], str)
@@ -62,7 +62,7 @@
def test_uri_set_password(self):
connection = UriConnection(
'amqps://guest:password@localhost:5672/%2F?'
- 'heartbeat=360', True
+ 'heartbeat=360', lazy=True
)
self.assertIsInstance(connection.parameters['password'], str)
@@ -70,7 +70,7 @@
def test_uri_set_port(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertIsInstance(connection.parameters['port'], int)
@@ -78,8 +78,8 @@
def test_uri_set_heartbeat(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F?'
- 'heartbeat=360', True
+ 'amqps://guest:lazy=True@localhost:5672/%2F?'
+ 'heartbeat=360', lazy=True
)
self.assertIsInstance(connection.parameters['heartbeat'], int)
@@ -88,7 +88,7 @@
def test_uri_set_timeout(self):
connection = UriConnection(
'amqps://guest:guest@localhost:5672/%2F?'
- 'timeout=360', True
+ 'timeout=360', lazy=True
)
self.assertIsInstance(connection.parameters['timeout'], int)
@@ -96,7 +96,7 @@
def test_uri_set_virtual_host(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/travis', True
+ 'amqps://guest:guest@localhost:5672/travis', lazy=True
)
self.assertIsInstance(connection.parameters['virtual_host'], str)
@@ -107,7 +107,7 @@
'amqps://guest:guest@localhost:5671/%2F?'
'ssl_version=protocol_tlsv1&cert_reqs=cert_required&'
'keyfile=file.key&certfile=file.crt&'
- 'ca_certs=travis-ci', True
+ 'ca_certs=travis-ci', lazy=True
)
self.assertTrue(connection.parameters['ssl'])
@@ -124,7 +124,7 @@
def test_uri_get_ssl_version(self):
connection = UriConnection(
- 'amqp://guest:guest@localhost:5672/%2F', True
+ 'amqp://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertEqual(ssl.PROTOCOL_TLSv1,
@@ -132,7 +132,7 @@
def test_uri_get_ssl_validation(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5672/%2F', lazy=True
)
self.assertEqual(ssl.CERT_REQUIRED,
@@ -140,7 +140,7 @@
def test_uri_get_ssl_options(self):
connection = UriConnection(
- 'amqps://guest:guest@localhost:5672/%2F', True
+ 'amqps://guest:guest@localhost:5671/%2F', lazy=True
)
ssl_kwargs = {
'cert_reqs': ['cert_required'],
@@ -153,4 +153,26 @@
self.assertEqual(ssl_options['cert_reqs'], ssl.CERT_REQUIRED)
self.assertEqual(ssl_options['ssl_version'], ssl.PROTOCOL_TLSv1)
self.assertEqual(ssl_options['keyfile'], 'file.key')
+ self.assertEqual(ssl_options['certfile'], 'file.crt')
+
+ def test_uri_get_ssl_options_new_method(self):
+ ssl_kwargs = {
+ 'cert_reqs': ssl.CERT_REQUIRED,
+ 'ssl_version': ssl.PROTOCOL_TLSv1,
+ 'keyfile': 'file.key',
+ 'certfile': 'file.crt'
+ }
+ connection = UriConnection(
+ 'amqps://guest:guest@localhost:5671/%2F?'
+ 'server_hostname=rmq.eandersson.net&certfile=file.crt',
+ ssl_options=ssl_kwargs,
+ lazy=True
+ )
+
+ ssl_options = connection.parameters.get('ssl_options')
+
+ self.assertEqual(ssl_options['server_hostname'], 'rmq.eandersson.net')
+ self.assertEqual(ssl_options['cert_reqs'], ssl.CERT_REQUIRED)
+ self.assertEqual(ssl_options['ssl_version'], ssl.PROTOCOL_TLSv1)
+ self.assertEqual(ssl_options['keyfile'], 'file.key')
self.assertEqual(ssl_options['certfile'], 'file.crt')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/uri_connection.py
new/AMQPStorm-2.7.0/amqpstorm/uri_connection.py
--- old/AMQPStorm-2.6.2/amqpstorm/uri_connection.py 2019-02-03
00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/uri_connection.py 2019-04-20
08:25:42.000000000 +0200
@@ -25,7 +25,7 @@
"""
__slots__ = []
- def __init__(self, uri, lazy=False):
+ def __init__(self, uri, ssl_options=None, lazy=False):
"""
:param str uri: AMQP Connection string
@@ -42,19 +42,20 @@
port = parsed_uri.port or 5672
username = urlparse.unquote(parsed_uri.username or 'guest')
password = urlparse.unquote(parsed_uri.password or 'guest')
- kwargs = self._parse_uri_options(parsed_uri, use_ssl)
+ kwargs = self._parse_uri_options(parsed_uri, use_ssl, ssl_options)
super(UriConnection, self).__init__(hostname, username,
password, port,
lazy=lazy,
**kwargs)
- def _parse_uri_options(self, parsed_uri, use_ssl):
+ def _parse_uri_options(self, parsed_uri, use_ssl=False, ssl_options=None):
"""Parse the uri options.
:param parsed_uri:
:param bool use_ssl:
:return:
"""
+ ssl_options = ssl_options or {}
kwargs = urlparse.parse_qs(parsed_uri.query)
vhost = urlparse.unquote(parsed_uri.path[1:]) or DEFAULT_VIRTUAL_HOST
options = {
@@ -71,7 +72,8 @@
'Python not compiled with support '
'for TLSv1 or higher'
)
- options['ssl_options'] = self._parse_ssl_options(kwargs)
+ ssl_options.update(self._parse_ssl_options(kwargs))
+ options['ssl_options'] = ssl_options
return options
def _parse_ssl_options(self, ssl_kwargs):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/examples/consume_queue_until_empty.py
new/AMQPStorm-2.7.0/examples/consume_queue_until_empty.py
--- old/AMQPStorm-2.6.2/examples/consume_queue_until_empty.py 2018-11-26
02:01:27.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/consume_queue_until_empty.py 2019-04-20
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
from amqpstorm import Connection
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
def consume_until_queue_is_empty():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/examples/create_queue_with_a_ttl_on_messages.py
new/AMQPStorm-2.7.0/examples/create_queue_with_a_ttl_on_messages.py
--- old/AMQPStorm-2.6.2/examples/create_queue_with_a_ttl_on_messages.py
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/create_queue_with_a_ttl_on_messages.py
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,27 @@
+import logging
+
+from amqpstorm import Connection
+from amqpstorm import Message
+
+logging.basicConfig(level=logging.INFO)
+
+
+def publish_message(channel, body, queue):
+ # Create a message.
+ message = Message.create(channel, body)
+
+ # Publish the message to a queue.
+ message.publish(queue)
+
+
+if __name__ == '__main__':
+ with Connection('127.0.0.1', 'guest', 'guest') as CONNECTION:
+ with CONNECTION.channel() as CHANNEL:
+ # Declare the Queue, 'simple_queue' with a message ttl of 6000ms.
+ CHANNEL.queue.declare('simple_ttl_queue', arguments={
+ 'x-message-ttl': 6000,
+ })
+
+ # Publish the message to a queue called, 'simple_queue' with an
+ # expiration set to 6000ms.
+ publish_message(CHANNEL, 'Hello World', 'simple_ttl_queue')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/examples/publish_message_with_expiration.py
new/AMQPStorm-2.7.0/examples/publish_message_with_expiration.py
--- old/AMQPStorm-2.6.2/examples/publish_message_with_expiration.py
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/publish_message_with_expiration.py
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,26 @@
+import logging
+
+from amqpstorm import Connection
+from amqpstorm import Message
+
+logging.basicConfig(level=logging.INFO)
+
+
+def publish_message(channel, body, queue, expiration="600"):
+ # Create the message with a expiration (time to live).
+ message = Message.create(channel, body,
+ properties={"expiration": expiration})
+
+ # Publish the message to a queue.
+ message.publish(queue)
+
+
+if __name__ == '__main__':
+ with Connection('127.0.0.1', 'guest', 'guest') as CONNECTION:
+ with CONNECTION.channel() as CHANNEL:
+ # Declare the Queue, 'simple_queue'.
+ CHANNEL.queue.declare('simple_queue')
+
+ # Publish the message to a queue called, 'simple_queue' with an
+ # expiration set to 6000ms.
+ publish_message(CHANNEL, 'Hello World', 'simple_queue', "6000")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/robust_consumer.py
new/AMQPStorm-2.7.0/examples/robust_consumer.py
--- old/AMQPStorm-2.6.2/examples/robust_consumer.py 2018-01-17
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/robust_consumer.py 2019-04-20
08:25:42.000000000 +0200
@@ -7,7 +7,7 @@
import amqpstorm
from amqpstorm import Connection
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/scalable_consumer.py
new/AMQPStorm-2.7.0/examples/scalable_consumer.py
--- old/AMQPStorm-2.6.2/examples/scalable_consumer.py 2018-09-10
08:18:55.000000000 +0200
+++ new/AMQPStorm-2.7.0/examples/scalable_consumer.py 2019-04-20
08:25:42.000000000 +0200
@@ -8,7 +8,7 @@
import amqpstorm
from amqpstorm import Connection
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/scalable_rpc_server.py
new/AMQPStorm-2.7.0/examples/scalable_rpc_server.py
--- old/AMQPStorm-2.6.2/examples/scalable_rpc_server.py 2018-09-10
08:18:55.000000000 +0200
+++ new/AMQPStorm-2.7.0/examples/scalable_rpc_server.py 2019-04-20
08:25:42.000000000 +0200
@@ -9,7 +9,7 @@
from amqpstorm import Connection
from amqpstorm import Message
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
LOGGER = logging.getLogger()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/simple_consumer.py
new/AMQPStorm-2.7.0/examples/simple_consumer.py
--- old/AMQPStorm-2.6.2/examples/simple_consumer.py 2018-01-17
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_consumer.py 2019-04-20
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
from amqpstorm import Connection
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
def on_message(message):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/examples/simple_generator_consumer.py
new/AMQPStorm-2.7.0/examples/simple_generator_consumer.py
--- old/AMQPStorm-2.6.2/examples/simple_generator_consumer.py 2018-01-17
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_generator_consumer.py 2019-04-20
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
from amqpstorm import Connection
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
def start_consumer():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/simple_publisher.py
new/AMQPStorm-2.7.0/examples/simple_publisher.py
--- old/AMQPStorm-2.6.2/examples/simple_publisher.py 2018-01-17
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_publisher.py 2019-04-20
08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
from amqpstorm import Connection
from amqpstorm import Message
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
def publish_message():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore'
old/AMQPStorm-2.6.2/examples/simple_transaction_publisher.py
new/AMQPStorm-2.7.0/examples/simple_transaction_publisher.py
--- old/AMQPStorm-2.6.2/examples/simple_transaction_publisher.py
2018-01-17 03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_transaction_publisher.py
2019-04-20 08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
from amqpstorm import Connection
from amqpstorm import Message
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
def publish_messages():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/ssl_with_context.py
new/AMQPStorm-2.7.0/examples/ssl_with_context.py
--- old/AMQPStorm-2.6.2/examples/ssl_with_context.py 1970-01-01
01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/ssl_with_context.py 2019-04-20
08:25:42.000000000 +0200
@@ -0,0 +1,58 @@
+import logging
+import ssl
+
+from amqpstorm import Connection
+
+logging.basicConfig(level=logging.INFO)
+
+
+def on_message(message):
+ """This function is called on message received.
+
+ :param message:
+ :return:
+ """
+ print("Message:", message.body)
+
+ # Acknowledge that we handled the message without any issues.
+ message.ack()
+
+ # Reject the message.
+ # message.reject()
+
+ # Reject the message, and put it back in the queue.
+ # message.reject(requeue=True)
+
+
+def start_consumer():
+ ssl_options = {
+ 'context': ssl.create_default_context(cafile='cacert.pem'),
+ 'server_hostname': 'rmq.eandersson.net'
+ }
+
+ with Connection('rmq.eandersson.net', 'guest', 'guest', port=5671,
+ ssl=True, ssl_options=ssl_options) as connection:
+ with connection.channel() as channel:
+ # Declare the Queue, 'simple_queue'.
+ channel.queue.declare('simple_queue')
+
+ # Set QoS to 100.
+ # This will limit the consumer to only prefetch a 100 messages.
+
+ # This is a recommended setting, as it prevents the
+ # consumer from keeping all of the messages in a queue to itself.
+ channel.basic.qos(100)
+
+ # Start consuming the queue 'simple_queue' using the callback
+ # 'on_message' and last require the message to be acknowledged.
+ channel.basic.consume(on_message, 'simple_queue', no_ack=False)
+
+ try:
+ # Start consuming messages.
+ channel.start_consuming()
+ except KeyboardInterrupt:
+ channel.close()
+
+
+if __name__ == '__main__':
+ start_consumer()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn'
'--exclude=.svnignore' old/AMQPStorm-2.6.2/setup.py new/AMQPStorm-2.7.0/setup.py
--- old/AMQPStorm-2.6.2/setup.py 2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/setup.py 2019-04-20 08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
setup(
name='AMQPStorm',
- version='2.6.2',
+ version='2.7.0',
description='Thread-safe Python RabbitMQ Client & Management library.',
long_description=open('README.rst').read(),
author='Erik Olof Gunnar Andersson',