Hello community,

here is the log from the commit of package python-amqpstorm for 
openSUSE:Factory checked in at 2019-06-12 13:15:43
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-amqpstorm (Old)
 and      /work/SRC/openSUSE:Factory/.python-amqpstorm.new.4811 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-amqpstorm"

Wed Jun 12 13:15:43 2019 rev:6 rq:708980 version:2.7.0

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-amqpstorm/python-amqpstorm.changes        
2019-05-07 23:19:54.413044115 +0200
+++ 
/work/SRC/openSUSE:Factory/.python-amqpstorm.new.4811/python-amqpstorm.changes  
    2019-06-12 13:15:47.256723032 +0200
@@ -1,0 +2,8 @@
+Fri Jun  7 14:46:15 UTC 2019 - Marketa Calabkova <[email protected]>
+
+- Update to 2.7.0
+  * Added support for passing your own ssl context 
+  * Improved logging verbosity on connection failures
+  * Fixed occasional error message when closing a SSL connection
+
+-------------------------------------------------------------------

Old:
----
  AMQPStorm-2.6.2.tar.gz

New:
----
  AMQPStorm-2.7.0.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-amqpstorm.spec ++++++
--- /var/tmp/diff_new_pack.It172d/_old  2019-06-12 13:15:48.308721930 +0200
+++ /var/tmp/diff_new_pack.It172d/_new  2019-06-12 13:15:48.344721892 +0200
@@ -18,7 +18,7 @@
 
 %{?!python_module:%define python_module() python-%{**} python3-%{**}}
 Name:           python-amqpstorm
-Version:        2.6.2
+Version:        2.7.0
 Release:        0
 Summary:        Thread-safe Python RabbitMQ Client & Management library
 License:        MIT

++++++ AMQPStorm-2.6.2.tar.gz -> AMQPStorm-2.7.0.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/AMQPStorm.egg-info/PKG-INFO 
new/AMQPStorm-2.7.0/AMQPStorm.egg-info/PKG-INFO
--- old/AMQPStorm-2.6.2/AMQPStorm.egg-info/PKG-INFO     2019-02-03 
00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/AMQPStorm.egg-info/PKG-INFO     2019-04-20 
08:26:18.000000000 +0200
@@ -1,12 +1,11 @@
-Metadata-Version: 1.1
+Metadata-Version: 2.1
 Name: AMQPStorm
-Version: 2.6.2
+Version: 2.7.0
 Summary: Thread-safe Python RabbitMQ Client & Management library.
 Home-page: https://www.amqpstorm.io
 Author: Erik Olof Gunnar Andersson
 Author-email: [email protected]
 License: MIT License
-Description-Content-Type: UNKNOWN
 Description: AMQPStorm
         =========
         Thread-safe Python RabbitMQ Client & Management library.
@@ -29,6 +28,12 @@
         Changelog
         =========
         
+        Version 2.7.0
+        -------------
+        - Added support for passing your own ssl context [#71] - Thanks 
troglas.
+        - Improved logging verbosity on connection failures [#72] - Thanks 
troglas.
+        - Fixed occasional error message when closing a SSL connection [#68] - 
Thanks troglas.
+        
         Version 2.6.2
         -------------
         - Set default TCP Timeout to 10s on UriConnection to match Connection 
[#67] - Thanks josemonteiro.
@@ -53,21 +58,6 @@
         - Properly wait until the inbound queue is empty when break_on_empty 
is set [#63] - Thanks TomGudman.
         - Fixed issue with Management queue/exchange declare when the passive 
flag was set to True.
         
-        Version 2.4.2
-        -------------
-        - Added support for External Authentication - Thanks Bernd Höhl.
-        - Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
-        - LICENSE file now included in package - Thanks Tomáš Chvátal.
-        
-        Version 2.4.1
-        -------------
-        - Added client/server negotiation to better determine the maximum 
supported channels and maximum allowed frame size [#52] - Thanks gastlich.
-        - We now raise an exception if the maximum allowed channel count is 
reached.
-        
-        Version 2.4.0
-        -------------
-        - basic.consume now allows for multiple callbacks [#48].
-        
         Credits
         =======
         Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in 
addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.
@@ -105,3 +95,4 @@
 Classifier: Topic :: Software Development :: Libraries
 Classifier: Topic :: Software Development :: Libraries :: Python Modules
 Classifier: Topic :: System :: Networking
+Provides-Extra: management
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/AMQPStorm.egg-info/SOURCES.txt 
new/AMQPStorm-2.7.0/AMQPStorm.egg-info/SOURCES.txt
--- old/AMQPStorm-2.6.2/AMQPStorm.egg-info/SOURCES.txt  2019-02-03 
00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/AMQPStorm.egg-info/SOURCES.txt  2019-04-20 
08:26:18.000000000 +0200
@@ -59,6 +59,8 @@
 amqpstorm/tests/functional/management/queue_tests.py
 amqpstorm/tests/functional/management/user_tests.py
 amqpstorm/tests/functional/management/virtual_host_tests.py
+amqpstorm/tests/functional/ssl/__init__.py
+amqpstorm/tests/functional/ssl/reliability_tests.py
 amqpstorm/tests/unit/__init__.py
 amqpstorm/tests/unit/compatiblity_tests.py
 amqpstorm/tests/unit/exception_tests.py
@@ -102,7 +104,9 @@
 amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
 examples/__init__.py
 examples/consume_queue_until_empty.py
+examples/create_queue_with_a_ttl_on_messages.py
 examples/flask_threaded_rpc_client.py
+examples/publish_message_with_expiration.py
 examples/robust_consumer.py
 examples/scalable_consumer.py
 examples/scalable_rpc_server.py
@@ -112,6 +116,7 @@
 examples/simple_rpc_client.py
 examples/simple_rpc_server.py
 examples/simple_transaction_publisher.py
+examples/ssl_with_context.py
 examples/management_api/__init__.py
 examples/management_api/aliveness_test.py
 examples/management_api/create_user.py
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/CHANGELOG.rst 
new/AMQPStorm-2.7.0/CHANGELOG.rst
--- old/AMQPStorm-2.6.2/CHANGELOG.rst   2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/CHANGELOG.rst   2019-04-20 08:25:42.000000000 +0200
@@ -1,6 +1,12 @@
 Changelog
 =========
 
+Version 2.7.0
+-------------
+- Added support for passing your own ssl context [#71] - Thanks troglas.
+- Improved logging verbosity on connection failures [#72] - Thanks troglas.
+- Fixed occasional error message when closing a SSL connection [#68] - Thanks 
troglas.
+
 Version 2.6.2
 -------------
 - Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - 
Thanks josemonteiro.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/PKG-INFO new/AMQPStorm-2.7.0/PKG-INFO
--- old/AMQPStorm-2.6.2/PKG-INFO        2019-02-03 00:19:22.000000000 +0100
+++ new/AMQPStorm-2.7.0/PKG-INFO        2019-04-20 08:26:18.000000000 +0200
@@ -1,12 +1,11 @@
-Metadata-Version: 1.1
+Metadata-Version: 2.1
 Name: AMQPStorm
-Version: 2.6.2
+Version: 2.7.0
 Summary: Thread-safe Python RabbitMQ Client & Management library.
 Home-page: https://www.amqpstorm.io
 Author: Erik Olof Gunnar Andersson
 Author-email: [email protected]
 License: MIT License
-Description-Content-Type: UNKNOWN
 Description: AMQPStorm
         =========
         Thread-safe Python RabbitMQ Client & Management library.
@@ -29,6 +28,12 @@
         Changelog
         =========
         
+        Version 2.7.0
+        -------------
+        - Added support for passing your own ssl context [#71] - Thanks 
troglas.
+        - Improved logging verbosity on connection failures [#72] - Thanks 
troglas.
+        - Fixed occasional error message when closing a SSL connection [#68] - 
Thanks troglas.
+        
         Version 2.6.2
         -------------
         - Set default TCP Timeout to 10s on UriConnection to match Connection 
[#67] - Thanks josemonteiro.
@@ -53,21 +58,6 @@
         - Properly wait until the inbound queue is empty when break_on_empty 
is set [#63] - Thanks TomGudman.
         - Fixed issue with Management queue/exchange declare when the passive 
flag was set to True.
         
-        Version 2.4.2
-        -------------
-        - Added support for External Authentication - Thanks Bernd Höhl.
-        - Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
-        - LICENSE file now included in package - Thanks Tomáš Chvátal.
-        
-        Version 2.4.1
-        -------------
-        - Added client/server negotiation to better determine the maximum 
supported channels and maximum allowed frame size [#52] - Thanks gastlich.
-        - We now raise an exception if the maximum allowed channel count is 
reached.
-        
-        Version 2.4.0
-        -------------
-        - basic.consume now allows for multiple callbacks [#48].
-        
         Credits
         =======
         Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in 
addition amqpstorm is heavily influenced by his pika and rabbitpy libraries.
@@ -105,3 +95,4 @@
 Classifier: Topic :: Software Development :: Libraries
 Classifier: Topic :: Software Development :: Libraries :: Python Modules
 Classifier: Topic :: System :: Networking
+Provides-Extra: management
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/README.rst 
new/AMQPStorm-2.7.0/README.rst
--- old/AMQPStorm-2.6.2/README.rst      2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/README.rst      2019-04-20 08:25:42.000000000 +0200
@@ -20,6 +20,12 @@
 Changelog
 =========
 
+Version 2.7.0
+-------------
+- Added support for passing your own ssl context [#71] - Thanks troglas.
+- Improved logging verbosity on connection failures [#72] - Thanks troglas.
+- Fixed occasional error message when closing a SSL connection [#68] - Thanks 
troglas.
+
 Version 2.6.2
 -------------
 - Set default TCP Timeout to 10s on UriConnection to match Connection [#67] - 
Thanks josemonteiro.
@@ -44,21 +50,6 @@
 - Properly wait until the inbound queue is empty when break_on_empty is set 
[#63] - Thanks TomGudman.
 - Fixed issue with Management queue/exchange declare when the passive flag was 
set to True.
 
-Version 2.4.2
--------------
-- Added support for External Authentication - Thanks Bernd Höhl.
-- Fixed typo in setup.py extra requirements - Thanks Bernd Höhl.
-- LICENSE file now included in package - Thanks Tomáš Chvátal.
-
-Version 2.4.1
--------------
-- Added client/server negotiation to better determine the maximum supported 
channels and maximum allowed frame size [#52] - Thanks gastlich.
-- We now raise an exception if the maximum allowed channel count is reached.
-
-Version 2.4.0
--------------
-- basic.consume now allows for multiple callbacks [#48].
-
 Credits
 =======
 Special thanks to gmr (Gavin M. Roy) for creating pamqp, and in addition 
amqpstorm is heavily influenced by his pika and rabbitpy libraries.
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/__init__.py 
new/AMQPStorm-2.7.0/amqpstorm/__init__.py
--- old/AMQPStorm-2.6.2/amqpstorm/__init__.py   2019-02-03 00:17:55.000000000 
+0100
+++ new/AMQPStorm-2.7.0/amqpstorm/__init__.py   2019-04-20 08:25:42.000000000 
+0200
@@ -1,5 +1,5 @@
 """AMQPStorm."""
-__version__ = '2.6.2'  # noqa
+__version__ = '2.7.0'  # noqa
 __author__ = 'eandersson'  # noqa
 
 import logging
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/compatibility.py 
new/AMQPStorm-2.7.0/amqpstorm/compatibility.py
--- old/AMQPStorm-2.6.2/amqpstorm/compatibility.py      2017-02-23 
05:17:41.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/compatibility.py      2019-04-20 
08:25:42.000000000 +0200
@@ -36,7 +36,8 @@
     'certfile',
     'cert_reqs',
     'ssl_version',
-    'ca_certs'
+    'ca_certs',
+    'server_hostname',
 ]
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/connection.py 
new/AMQPStorm-2.7.0/amqpstorm/connection.py
--- old/AMQPStorm-2.6.2/amqpstorm/connection.py 2019-02-03 00:17:55.000000000 
+0100
+++ new/AMQPStorm-2.7.0/amqpstorm/connection.py 2019-04-20 08:25:42.000000000 
+0200
@@ -317,7 +317,7 @@
             self.heartbeat.register_read()
             if channel_id == 0:
                 self._channel0.on_frame(frame_in)
-            else:
+            elif channel_id in self._channels:
                 self._channels[channel_id].on_frame(frame_in)
 
         return data_in
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/io.py 
new/AMQPStorm-2.7.0/amqpstorm/io.py
--- old/AMQPStorm-2.6.2/amqpstorm/io.py 2018-12-30 00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/io.py 2019-04-20 08:25:42.000000000 +0200
@@ -56,7 +56,8 @@
 
     def __init__(self, parameters, exceptions=None, on_read_impl=None):
         self._exceptions = exceptions
-        self._lock = threading.Lock()
+        self._wr_lock = threading.Lock()
+        self._rd_lock = threading.Lock()
         self._inbound_thread = None
         self._on_read_impl = on_read_impl
         self._running = threading.Event()
@@ -71,18 +72,20 @@
 
         :return:
         """
-        self._lock.acquire()
+        self._wr_lock.acquire()
+        self._rd_lock.acquire()
         try:
             self._running.clear()
             if self.socket:
-                self.socket.close()
+                self._close_socket()
             if self._inbound_thread:
                 self._inbound_thread.join(timeout=self._parameters['timeout'])
             self._inbound_thread = None
             self.poller = None
             self.socket = None
         finally:
-            self._lock.release()
+            self._wr_lock.release()
+            self._rd_lock.release()
 
     def open(self):
         """Open Socket and establish a connection.
@@ -91,7 +94,8 @@
                                      encountered an error.
         :return:
         """
-        self._lock.acquire()
+        self._wr_lock.acquire()
+        self._rd_lock.acquire()
         try:
             self.data_in = EMPTY_BUFFER
             self._running.set()
@@ -101,7 +105,8 @@
                                  timeout=self._parameters['timeout'])
             self._inbound_thread = self._create_inbound_thread()
         finally:
-            self._lock.release()
+            self._wr_lock.release()
+            self._rd_lock.release()
 
     def write_to_socket(self, frame_data):
         """Write data to the socket.
@@ -109,7 +114,7 @@
         :param str frame_data:
         :return:
         """
-        self._lock.acquire()
+        self._wr_lock.acquire()
         try:
             total_bytes_written = 0
             bytes_to_send = len(frame_data)
@@ -131,7 +136,18 @@
                     self._exceptions.append(AMQPConnectionError(why))
                     return
         finally:
-            self._lock.release()
+            self._wr_lock.release()
+
+    def _close_socket(self):
+        """Shutdown and close the Socket.
+
+        :return:
+        """
+        try:
+            self.socket.shutdown(socket.SHUT_RDWR)
+        except (OSError, socket.error):
+            pass
+        self.socket.close()
 
     def _get_socket_addresses(self):
         """Get Socket address information.
@@ -159,16 +175,19 @@
 
         :rtype: socket.socket
         """
+        error_message = None
         for address in addresses:
             sock = self._create_socket(socket_family=address[0])
             try:
                 sock.connect(address[4])
-            except (IOError, OSError):
+            except (IOError, OSError) as why:
+                error_message = why.strerror
                 continue
             return sock
         raise AMQPConnectionError(
-            'Could not connect to %s:%d' % (
-                self._parameters['hostname'], self._parameters['port']
+            'Could not connect to %s:%d error: %s' % (
+                self._parameters['hostname'], self._parameters['port'],
+                error_message
             )
         )
 
@@ -194,6 +213,13 @@
         :param socket.socket sock:
         :rtype: SSLSocket
         """
+        context = self._parameters['ssl_options'].get('context')
+        if context is not None:
+            hostname = self._parameters['ssl_options'].get('server_hostname')
+            return context.wrap_socket(
+                sock, do_handshake_on_connect=True,
+                server_hostname=hostname
+            )
         if 'ssl_version' not in self._parameters['ssl_options']:
             self._parameters['ssl_options']['ssl_version'] = (
                 compatibility.DEFAULT_SSL_VERSION
@@ -234,8 +260,6 @@
         """
         data_in = EMPTY_BUFFER
         try:
-            if not self.socket:
-                raise socket.error('connection/socket error')
             data_in = self._read_from_socket()
         except socket.timeout:
             pass
@@ -250,6 +274,12 @@
 
         :rtype: bytes
         """
-        if self.use_ssl:
+        if not self.use_ssl:
+            if not self.socket:
+                raise socket.error('connection/socket error')
+            return self.socket.recv(MAX_FRAME_SIZE)
+
+        with self._rd_lock:
+            if not self.socket:
+                raise socket.error('connection/socket error')
             return self.socket.read(MAX_FRAME_SIZE)
-        return self.socket.recv(MAX_FRAME_SIZE)
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/tests/__init__.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/__init__.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/__init__.py     2018-12-30 
00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/__init__.py     2019-04-20 
08:25:42.000000000 +0200
@@ -1,7 +1,15 @@
 import os
 
+CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
+
 HOST = os.environ.get('AMQP_HOST', '127.0.0.1')
 USERNAME = os.environ.get('AMQP_USERNAME', 'guest')
 PASSWORD = os.environ.get('AMQP_PASSWORD', 'guest')
 URI = os.environ.get('AMQP_URI', 'amqp://guest:[email protected]:5672/%2F')
 HTTP_URL = os.environ.get('AMQP_HTTP_URL', 'http://127.0.0.1:15672')
+
+SSL_URI = os.environ.get('AMQP_SSL_URI',
+                         'amqps://guest:[email protected]:5671/%2F')
+SSL_HOST = os.environ.get('AMQP_SSL_HOST', 'rmq.eandersson.net')
+CAFILE = os.environ.get('AMQP_CAFILE',
+                        '{0}/resources/cacert.pem'.format(CURRENT_DIR))
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/amqpstorm/tests/functional/reliability_tests.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/functional/reliability_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/functional/reliability_tests.py 
2018-12-30 00:45:01.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/functional/reliability_tests.py 
2019-04-20 08:25:42.000000000 +0200
@@ -42,6 +42,7 @@
             self.assertIsNone(self.connection._io.socket)
             self.assertIsNone(self.connection._io.poller)
             self.assertFalse(self.connection._io._running.is_set())
+            self.assertFalse(self.connection.exceptions)
 
     @setup(new_connection=False, queue=True)
     def test_functional_open_close_connection_loop(self):
@@ -69,6 +70,7 @@
             self.assertIsNone(self.connection._io.socket)
             self.assertIsNone(self.connection._io.poller)
             self.assertFalse(self.connection._io._running.is_set())
+            self.assertFalse(self.connection.exceptions)
 
     @setup(new_connection=True, new_channel=False, queue=True)
     def test_functional_close_gracefully_after_publish_mandatory_fails(self):
@@ -129,14 +131,11 @@
 
     @setup(new_connection=False, queue=False)
     def test_functional_close_performance(self):
-        """Make sure closing a connection never takes longer than ~3 seconds.
-
-            In general closing a connection should take about ~1s, but we
-            try to close it faster, if-possible.
+        """Make sure closing a connection never takes longer than ~1 seconds.
 
         :return:
         """
-        for _ in range(5):
+        for _ in range(100):
             self.connection = self.connection = Connection(HOST, USERNAME,
                                                            PASSWORD)
             start_time = time.time()
@@ -165,8 +164,8 @@
             imp.reload(compatibility)
 
 
-class PublishAndConsume5kTest(TestFunctionalFramework):
-    messages_to_send = 5000
+class PublishAndConsume1kTest(TestFunctionalFramework):
+    messages_to_send = 1000
     messages_consumed = 0
     lock = threading.Lock()
 
@@ -194,7 +193,7 @@
             self.messages_consumed += 1
 
     @setup(queue=True)
-    def test_functional_publish_and_consume_5k_messages(self):
+    def test_functional_publish_and_consume_1k_messages(self):
         self.channel.queue.declare(self.queue_name)
 
         publish_thread = threading.Thread(target=self.publish_messages, )
@@ -220,7 +219,7 @@
                          'test took too long')
 
 
-class PublishAndConsumeUntilEmptyTest(TestFunctionalFramework):
+class Consume1kUntilEmpty(TestFunctionalFramework):
     messages_to_send = 1000
 
     def configure(self):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/amqpstorm/tests/functional/ssl/reliability_tests.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/functional/ssl/reliability_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/functional/ssl/reliability_tests.py     
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/functional/ssl/reliability_tests.py     
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,273 @@
+import ssl
+import threading
+import time
+
+from amqpstorm import Connection
+from amqpstorm import UriConnection
+from amqpstorm.tests import CAFILE
+from amqpstorm.tests import PASSWORD
+from amqpstorm.tests import SSL_HOST
+from amqpstorm.tests import SSL_URI
+from amqpstorm.tests import USERNAME
+from amqpstorm.tests.utility import TestFunctionalFramework
+from amqpstorm.tests.utility import setup
+
+
+class SSLReliabilityFunctionalTests(TestFunctionalFramework):
+    @setup(new_connection=False, queue=True)
+    def test_functional_ssl_open_new_connection_loop(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+
+        for _ in range(5):
+            self.connection = self.connection = Connection(
+                SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+                ssl_options=ssl_options, timeout=1)
+            self.channel = self.connection.channel()
+
+            # Make sure that it's a new channel.
+            self.assertEqual(int(self.channel), 1)
+
+            self.channel.queue.declare(self.queue_name)
+
+            # Verify that the Connection/Channel has been opened properly.
+            self.assertIsNotNone(self.connection._io.socket)
+            self.assertIsNotNone(self.connection._io.poller)
+            self.assertTrue(self.connection.is_open)
+
+            self.channel.close()
+            self.connection.close()
+
+            # Verify that the Connection has been closed properly.
+            self.assertTrue(self.connection.is_closed)
+            self.assertIsNone(self.connection._io.socket)
+            self.assertIsNone(self.connection._io.poller)
+            self.assertFalse(self.connection._io._running.is_set())
+            self.assertFalse(self.connection.exceptions)
+
+    @setup(new_connection=False, queue=True)
+    def test_functional_ssl_open_close_connection_loop(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+        self.connection = self.connection = Connection(
+            SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+            ssl_options=ssl_options, timeout=1, lazy=True)
+
+        for _ in range(5):
+            self.connection.open()
+            channel = self.connection.channel()
+
+            # Make sure that it's a new channel.
+            self.assertEqual(int(channel), 1)
+
+            channel.queue.declare(self.queue_name)
+
+            channel.close()
+
+            # Verify that the Connection/Channel has been opened properly.
+            self.assertIsNotNone(self.connection._io.socket)
+            self.assertIsNotNone(self.connection._io.poller)
+            self.assertTrue(self.connection.is_open)
+
+            self.connection.close()
+
+            # Verify that the Connection has been closed properly.
+            self.assertTrue(self.connection.is_closed)
+            self.assertIsNone(self.connection._io.socket)
+            self.assertIsNone(self.connection._io.poller)
+            self.assertFalse(self.connection._io._running.is_set())
+            self.assertFalse(self.connection.exceptions)
+
+    @setup(new_connection=False, queue=False)
+    def test_functional_ssl_open_close_channel_loop(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+        self.connection = self.connection = Connection(
+            SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+            ssl_options=ssl_options)
+
+        for _ in range(25):
+            channel = self.connection.channel()
+
+            # Verify that the Channel has been opened properly.
+            self.assertTrue(self.connection.is_open)
+            self.assertTrue(channel.is_open)
+
+            # Channel id should be staying at 1.
+            self.assertEqual(int(channel), 1)
+
+            channel.close()
+
+            # Verify that theChannel has been closed properly.
+            self.assertTrue(self.connection.is_open)
+            self.assertTrue(channel.is_closed)
+
+    @setup(new_connection=False, queue=True)
+    def test_functional_ssl_open_multiple_channels(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+        self.connection = self.connection = Connection(
+            SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+            ssl_options=ssl_options, timeout=1, lazy=True)
+
+        for _ in range(5):
+            channels = []
+            self.connection.open()
+            for index in range(3):
+                channel = self.connection.channel()
+                channels.append(channel)
+
+                # Verify that the Channel has been opened properly.
+                self.assertTrue(channel.is_open)
+                self.assertEqual(int(channel), len(channels))
+            self.connection.close()
+
+    @setup(new_connection=False, queue=False)
+    def test_functional_ssl_close_performance(self):
+        """Make sure closing a connection never takes longer than ~1 seconds.
+
+        :return:
+        """
+        for _ in range(100):
+            ssl_options = {
+                'context': ssl.create_default_context(cafile=CAFILE),
+                'server_hostname': SSL_HOST
+            }
+            self.connection = self.connection = Connection(
+                SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+                ssl_options=ssl_options, lazy=True)
+
+            start_time = time.time()
+            self.connection.close()
+            self.assertLess(time.time() - start_time, 3)
+
+    @setup(new_connection=False)
+    def test_functional_ssl_uri_connection(self):
+        self.connection = UriConnection(SSL_URI)
+        self.channel = self.connection.channel()
+        self.assertTrue(self.connection.is_open)
+
+    @setup(new_connection=False)
+    def test_functional_ssl_uri_connection_with_context(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+
+        self.connection = UriConnection(SSL_URI, ssl_options=ssl_options)
+        self.channel = self.connection.channel()
+        self.assertTrue(self.connection.is_open)
+
+
+class PublishAndConsume1kWithSSLTest(TestFunctionalFramework):
+    messages_to_send = 1000
+    messages_consumed = 0
+    lock = threading.Lock()
+
+    def configure(self):
+        self.disable_logging_validation()
+
+    def publish_messages(self):
+        for _ in range(self.messages_to_send):
+            self.channel.basic.publish(body=self.message,
+                                       routing_key=self.queue_name)
+
+    def consume_messages(self):
+        channel = self.connection.channel()
+        channel.basic.consume(queue=self.queue_name,
+                              no_ack=False)
+        for message in channel.build_inbound_messages(
+                break_on_empty=False):
+            self.increment_message_count()
+            message.ack()
+            if self.messages_consumed == self.messages_to_send:
+                break
+
+    def increment_message_count(self):
+        with self.lock:
+            self.messages_consumed += 1
+
+    @setup(new_connection=False, queue=False)
+    def test_functional_publish_1k_with_ssl(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+        self.connection = self.connection = Connection(
+            SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+            ssl_options=ssl_options)
+
+        self.channel = self.connection.channel()
+        self.channel.queue.declare(self.queue_name)
+
+        publish_thread = threading.Thread(target=self.publish_messages, )
+        publish_thread.daemon = True
+        publish_thread.start()
+
+        for _ in range(4):
+            consumer_thread = threading.Thread(target=self.consume_messages, )
+            consumer_thread.daemon = True
+            consumer_thread.start()
+
+        start_time = time.time()
+        while self.messages_consumed != self.messages_to_send:
+            if time.time() - start_time >= 60:
+                break
+            time.sleep(0.1)
+
+        for channel in list(self.connection.channels.values()):
+            channel.stop_consuming()
+            channel.close()
+
+        self.assertEqual(self.messages_consumed, self.messages_to_send,
+                         'test took too long')
+
+
+class Consume1kWithSSLUntilEmpty(TestFunctionalFramework):
+    messages_to_send = 1000
+
+    def configure(self):
+        self.disable_logging_validation()
+
+    def publish_messages(self):
+        for _ in range(self.messages_to_send):
+            self.channel.basic.publish(body=self.message,
+                                       routing_key=self.queue_name)
+
+    @setup(new_connection=False, queue=True)
+    def test_functional_consume_with_ssl_until_empty(self):
+        ssl_options = {
+            'context': ssl.create_default_context(cafile=CAFILE),
+            'server_hostname': SSL_HOST
+        }
+        self.connection = self.connection = Connection(
+            SSL_HOST, USERNAME, PASSWORD, port=5671, ssl=True,
+            ssl_options=ssl_options)
+
+        self.channel = self.connection.channel()
+        self.channel.queue.declare(self.queue_name)
+        self.channel.confirm_deliveries()
+        self.publish_messages()
+
+        channel = self.connection.channel()
+        channel.basic.consume(queue=self.queue_name,
+                              no_ack=False)
+        message_count = 0
+        for message in channel.build_inbound_messages(break_on_empty=True):
+            message_count += 1
+            message.ack()
+
+        result = channel.queue.declare(self.queue_name, passive=True)
+        self.assertEqual(result['message_count'], 0)
+        self.assertEqual(message_count, self.messages_to_send,
+                         'not all messages consumed')
+
+        channel.close()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_exception_tests.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_exception_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_exception_tests.py   
2017-02-23 05:17:41.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_exception_tests.py   
2019-04-20 08:25:42.000000000 +0200
@@ -14,6 +14,15 @@
 
 
 class IOExceptionTests(TestFramework):
+    def test_io_shutdown_with_io_error(self):
+        connection = FakeConnection()
+
+        io = IO(connection.parameters)
+        io._exceptions = []
+        io.socket = mock.Mock(name='socket', spec=socket.socket)
+        io.socket.shutdown.side_effect = OSError()
+        io._close_socket()
+
     def test_io_receive_raises_socket_error(self):
         connection = FakeConnection()
 
@@ -126,7 +135,7 @@
         io = IO(parameters)
         self.assertRaisesRegexp(
             AMQPConnectionError,
-            'Could not connect to localhost:1234',
+            'Could not connect to localhost:1234 error: Connection refused',
             io.open
         )
 
@@ -171,3 +180,31 @@
             'connection/socket error',
             connection.check_for_errors
         )
+
+    def test_io_socket_read_fails(self):
+        connection = FakeConnection()
+        parameters = FakeConnection().parameters
+        parameters['ssl'] = False
+        io = IO(parameters, exceptions=connection.exceptions)
+
+        self.assertFalse(io.use_ssl)
+
+        self.assertRaisesRegexp(
+            socket.error,
+            'connection/socket error',
+            io._read_from_socket
+        )
+
+    def test_io_socket_read_fails_with_ssl(self):
+        connection = FakeConnection()
+        parameters = FakeConnection().parameters
+        parameters['ssl'] = True
+        io = IO(parameters, exceptions=connection.exceptions)
+
+        self.assertTrue(io.use_ssl)
+
+        self.assertRaisesRegexp(
+            socket.error,
+            'connection/socket error',
+            io._read_from_socket
+        )
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_tests.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_tests.py
--- old/AMQPStorm-2.6.2/amqpstorm/tests/unit/io/io_tests.py     2018-01-17 
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/tests/unit/io/io_tests.py     2019-04-20 
08:25:42.000000000 +0200
@@ -141,6 +141,25 @@
         self.assertEqual(connection.parameters['ssl_options']['ssl_version'],
                          'travis-ci')
 
+    def test_io_set_ssl_context(self):
+        connection = FakeConnection()
+        connection.parameters['ssl_options'] = {
+            'context': ssl.create_default_context(),
+            'server_hostname': 'localhost',
+        }
+
+        io = IO(connection.parameters)
+        self.assertTrue(io._ssl_wrap_socket(socket.socket()))
+
+    def test_io_set_ssl_context_no_hostname_provided(self):
+        connection = FakeConnection()
+        connection.parameters['ssl_options'] = {
+            'context': ssl.create_default_context(),
+        }
+
+        io = IO(connection.parameters)
+        self.assertRaises(ValueError, io._ssl_wrap_socket, socket.socket())
+
     def test_io_has_ipv6(self):
         restore_func = socket.getaddrinfo
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
--- 
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
   2018-01-17 03:51:54.000000000 +0100
+++ 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_exception_tests.py
   2019-04-20 08:25:42.000000000 +0200
@@ -12,11 +12,11 @@
 class UriConnectionExceptionTests(TestFramework):
     @unittest.skipIf(sys.version_info < (3, 3), 'Python 3.x test')
     def test_uri_py3_raises_on_invalid_uri(self):
-        self.assertRaises(ValueError, UriConnection, 'amqp://a:b', True)
+        self.assertRaises(ValueError, UriConnection, 'amqp://a:b', {}, True)
 
     @unittest.skipIf(sys.version_info[0] == 3, 'Python 2.x test')
     def test_uri_py2_raises_on_invalid_uri(self):
-        self.assertRaises(ValueError, UriConnection, 'amqp://a:b', True)
+        self.assertRaises(ValueError, UriConnection, 'amqp://a:b', {}, True)
 
     def test_uri_raises_on_invalid_object(self):
         self.assertRaises(AttributeError, UriConnection, None)
@@ -26,7 +26,7 @@
 
     def test_uri_invalid_ssl_options(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
         ssl_kwargs = {
             'unit_test': ['not_required'],
@@ -39,7 +39,7 @@
 
     def test_uri_get_invalid_ssl_version(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(connection._get_ssl_version('protocol_test'),
@@ -50,7 +50,7 @@
 
     def test_uri_get_invalid_ssl_validation(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(ssl.CERT_NONE,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py
--- 
old/AMQPStorm-2.6.2/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py 
    2019-02-03 00:17:55.000000000 +0100
+++ 
new/AMQPStorm-2.7.0/amqpstorm/tests/unit/uri_connection/uri_connection_tests.py 
    2019-04-20 08:25:42.000000000 +0200
@@ -10,7 +10,7 @@
 class UriConnectionTests(TestFramework):
     def test_uri_default(self):
         connection = UriConnection(
-            'amqp://guest:guest@localhost:5672/%2F', True
+            'amqp://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(connection.parameters['hostname'], 'localhost')
@@ -27,14 +27,14 @@
 
     def test_uri_ssl(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertTrue(connection.parameters['ssl'])
 
     def test_uri_simple(self):
         connection = UriConnection(
-            'amqps://localhost:5672/%2F', True
+            'amqps://localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(connection.parameters['hostname'], 'localhost')
@@ -44,7 +44,7 @@
     def test_uri_set_hostname(self):
         connection = UriConnection(
             'amqps://guest:guest@my-server:5672/%2F?'
-            'heartbeat=360', True
+            'heartbeat=360', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['hostname'], str)
@@ -53,7 +53,7 @@
     def test_uri_set_username(self):
         connection = UriConnection(
             'amqps://username:guest@localhost:5672/%2F?'
-            'heartbeat=360', True
+            'heartbeat=360', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['username'], str)
@@ -62,7 +62,7 @@
     def test_uri_set_password(self):
         connection = UriConnection(
             'amqps://guest:password@localhost:5672/%2F?'
-            'heartbeat=360', True
+            'heartbeat=360', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['password'], str)
@@ -70,7 +70,7 @@
 
     def test_uri_set_port(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['port'], int)
@@ -78,8 +78,8 @@
 
     def test_uri_set_heartbeat(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F?'
-            'heartbeat=360', True
+            'amqps://guest:lazy=True@localhost:5672/%2F?'
+            'heartbeat=360', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['heartbeat'], int)
@@ -88,7 +88,7 @@
     def test_uri_set_timeout(self):
         connection = UriConnection(
             'amqps://guest:guest@localhost:5672/%2F?'
-            'timeout=360', True
+            'timeout=360', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['timeout'], int)
@@ -96,7 +96,7 @@
 
     def test_uri_set_virtual_host(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/travis', True
+            'amqps://guest:guest@localhost:5672/travis', lazy=True
         )
 
         self.assertIsInstance(connection.parameters['virtual_host'], str)
@@ -107,7 +107,7 @@
             'amqps://guest:guest@localhost:5671/%2F?'
             'ssl_version=protocol_tlsv1&cert_reqs=cert_required&'
             'keyfile=file.key&certfile=file.crt&'
-            'ca_certs=travis-ci', True
+            'ca_certs=travis-ci', lazy=True
         )
 
         self.assertTrue(connection.parameters['ssl'])
@@ -124,7 +124,7 @@
 
     def test_uri_get_ssl_version(self):
         connection = UriConnection(
-            'amqp://guest:guest@localhost:5672/%2F', True
+            'amqp://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(ssl.PROTOCOL_TLSv1,
@@ -132,7 +132,7 @@
 
     def test_uri_get_ssl_validation(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5672/%2F', lazy=True
         )
 
         self.assertEqual(ssl.CERT_REQUIRED,
@@ -140,7 +140,7 @@
 
     def test_uri_get_ssl_options(self):
         connection = UriConnection(
-            'amqps://guest:guest@localhost:5672/%2F', True
+            'amqps://guest:guest@localhost:5671/%2F', lazy=True
         )
         ssl_kwargs = {
             'cert_reqs': ['cert_required'],
@@ -153,4 +153,26 @@
         self.assertEqual(ssl_options['cert_reqs'], ssl.CERT_REQUIRED)
         self.assertEqual(ssl_options['ssl_version'], ssl.PROTOCOL_TLSv1)
         self.assertEqual(ssl_options['keyfile'], 'file.key')
+        self.assertEqual(ssl_options['certfile'], 'file.crt')
+
+    def test_uri_get_ssl_options_new_method(self):
+        ssl_kwargs = {
+            'cert_reqs': ssl.CERT_REQUIRED,
+            'ssl_version': ssl.PROTOCOL_TLSv1,
+            'keyfile': 'file.key',
+            'certfile': 'file.crt'
+        }
+        connection = UriConnection(
+            'amqps://guest:guest@localhost:5671/%2F?'
+            'server_hostname=rmq.eandersson.net&certfile=file.crt',
+            ssl_options=ssl_kwargs,
+            lazy=True
+        )
+
+        ssl_options = connection.parameters.get('ssl_options')
+
+        self.assertEqual(ssl_options['server_hostname'], 'rmq.eandersson.net')
+        self.assertEqual(ssl_options['cert_reqs'], ssl.CERT_REQUIRED)
+        self.assertEqual(ssl_options['ssl_version'], ssl.PROTOCOL_TLSv1)
+        self.assertEqual(ssl_options['keyfile'], 'file.key')
         self.assertEqual(ssl_options['certfile'], 'file.crt')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/amqpstorm/uri_connection.py 
new/AMQPStorm-2.7.0/amqpstorm/uri_connection.py
--- old/AMQPStorm-2.6.2/amqpstorm/uri_connection.py     2019-02-03 
00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/amqpstorm/uri_connection.py     2019-04-20 
08:25:42.000000000 +0200
@@ -25,7 +25,7 @@
     """
     __slots__ = []
 
-    def __init__(self, uri, lazy=False):
+    def __init__(self, uri, ssl_options=None, lazy=False):
         """
         :param str uri: AMQP Connection string
 
@@ -42,19 +42,20 @@
         port = parsed_uri.port or 5672
         username = urlparse.unquote(parsed_uri.username or 'guest')
         password = urlparse.unquote(parsed_uri.password or 'guest')
-        kwargs = self._parse_uri_options(parsed_uri, use_ssl)
+        kwargs = self._parse_uri_options(parsed_uri, use_ssl, ssl_options)
         super(UriConnection, self).__init__(hostname, username,
                                             password, port,
                                             lazy=lazy,
                                             **kwargs)
 
-    def _parse_uri_options(self, parsed_uri, use_ssl):
+    def _parse_uri_options(self, parsed_uri, use_ssl=False, ssl_options=None):
         """Parse the uri options.
 
         :param parsed_uri:
         :param bool use_ssl:
         :return:
         """
+        ssl_options = ssl_options or {}
         kwargs = urlparse.parse_qs(parsed_uri.query)
         vhost = urlparse.unquote(parsed_uri.path[1:]) or DEFAULT_VIRTUAL_HOST
         options = {
@@ -71,7 +72,8 @@
                     'Python not compiled with support '
                     'for TLSv1 or higher'
                 )
-            options['ssl_options'] = self._parse_ssl_options(kwargs)
+            ssl_options.update(self._parse_ssl_options(kwargs))
+            options['ssl_options'] = ssl_options
         return options
 
     def _parse_ssl_options(self, ssl_kwargs):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/examples/consume_queue_until_empty.py 
new/AMQPStorm-2.7.0/examples/consume_queue_until_empty.py
--- old/AMQPStorm-2.6.2/examples/consume_queue_until_empty.py   2018-11-26 
02:01:27.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/consume_queue_until_empty.py   2019-04-20 
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
 
 from amqpstorm import Connection
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 
 def consume_until_queue_is_empty():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/examples/create_queue_with_a_ttl_on_messages.py 
new/AMQPStorm-2.7.0/examples/create_queue_with_a_ttl_on_messages.py
--- old/AMQPStorm-2.6.2/examples/create_queue_with_a_ttl_on_messages.py 
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/create_queue_with_a_ttl_on_messages.py 
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,27 @@
+import logging
+
+from amqpstorm import Connection
+from amqpstorm import Message
+
+logging.basicConfig(level=logging.INFO)
+
+
+def publish_message(channel, body, queue):
+    # Create a message.
+    message = Message.create(channel, body)
+
+    # Publish the message to a queue.
+    message.publish(queue)
+
+
+if __name__ == '__main__':
+    with Connection('127.0.0.1', 'guest', 'guest') as CONNECTION:
+        with CONNECTION.channel() as CHANNEL:
+            # Declare the Queue, 'simple_queue' with a message ttl of 6000ms.
+            CHANNEL.queue.declare('simple_ttl_queue', arguments={
+                'x-message-ttl': 6000,
+            })
+
+            # Publish the message to a queue called, 'simple_queue' with an
+            # expiration set to 6000ms.
+            publish_message(CHANNEL, 'Hello World', 'simple_ttl_queue')
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/examples/publish_message_with_expiration.py 
new/AMQPStorm-2.7.0/examples/publish_message_with_expiration.py
--- old/AMQPStorm-2.6.2/examples/publish_message_with_expiration.py     
1970-01-01 01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/publish_message_with_expiration.py     
2019-04-20 08:25:42.000000000 +0200
@@ -0,0 +1,26 @@
+import logging
+
+from amqpstorm import Connection
+from amqpstorm import Message
+
+logging.basicConfig(level=logging.INFO)
+
+
+def publish_message(channel, body, queue, expiration="600"):
+    # Create the message with a expiration (time to live).
+    message = Message.create(channel, body,
+                             properties={"expiration": expiration})
+
+    # Publish the message to a queue.
+    message.publish(queue)
+
+
+if __name__ == '__main__':
+    with Connection('127.0.0.1', 'guest', 'guest') as CONNECTION:
+        with CONNECTION.channel() as CHANNEL:
+            # Declare the Queue, 'simple_queue'.
+            CHANNEL.queue.declare('simple_queue')
+
+            # Publish the message to a queue called, 'simple_queue' with an
+            # expiration set to 6000ms.
+            publish_message(CHANNEL, 'Hello World', 'simple_queue', "6000")
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/robust_consumer.py 
new/AMQPStorm-2.7.0/examples/robust_consumer.py
--- old/AMQPStorm-2.6.2/examples/robust_consumer.py     2018-01-17 
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/robust_consumer.py     2019-04-20 
08:25:42.000000000 +0200
@@ -7,7 +7,7 @@
 import amqpstorm
 from amqpstorm import Connection
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 LOGGER = logging.getLogger()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/scalable_consumer.py 
new/AMQPStorm-2.7.0/examples/scalable_consumer.py
--- old/AMQPStorm-2.6.2/examples/scalable_consumer.py   2018-09-10 
08:18:55.000000000 +0200
+++ new/AMQPStorm-2.7.0/examples/scalable_consumer.py   2019-04-20 
08:25:42.000000000 +0200
@@ -8,7 +8,7 @@
 import amqpstorm
 from amqpstorm import Connection
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 LOGGER = logging.getLogger()
 
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/scalable_rpc_server.py 
new/AMQPStorm-2.7.0/examples/scalable_rpc_server.py
--- old/AMQPStorm-2.6.2/examples/scalable_rpc_server.py 2018-09-10 
08:18:55.000000000 +0200
+++ new/AMQPStorm-2.7.0/examples/scalable_rpc_server.py 2019-04-20 
08:25:42.000000000 +0200
@@ -9,7 +9,7 @@
 from amqpstorm import Connection
 from amqpstorm import Message
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 LOGGER = logging.getLogger()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/simple_consumer.py 
new/AMQPStorm-2.7.0/examples/simple_consumer.py
--- old/AMQPStorm-2.6.2/examples/simple_consumer.py     2018-01-17 
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_consumer.py     2019-04-20 
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
 
 from amqpstorm import Connection
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 
 def on_message(message):
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/examples/simple_generator_consumer.py 
new/AMQPStorm-2.7.0/examples/simple_generator_consumer.py
--- old/AMQPStorm-2.6.2/examples/simple_generator_consumer.py   2018-01-17 
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_generator_consumer.py   2019-04-20 
08:25:42.000000000 +0200
@@ -2,7 +2,7 @@
 
 from amqpstorm import Connection
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 
 def start_consumer():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/simple_publisher.py 
new/AMQPStorm-2.7.0/examples/simple_publisher.py
--- old/AMQPStorm-2.6.2/examples/simple_publisher.py    2018-01-17 
03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_publisher.py    2019-04-20 
08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
 from amqpstorm import Connection
 from amqpstorm import Message
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 
 def publish_message():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/AMQPStorm-2.6.2/examples/simple_transaction_publisher.py 
new/AMQPStorm-2.7.0/examples/simple_transaction_publisher.py
--- old/AMQPStorm-2.6.2/examples/simple_transaction_publisher.py        
2018-01-17 03:51:54.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/simple_transaction_publisher.py        
2019-04-20 08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
 from amqpstorm import Connection
 from amqpstorm import Message
 
-logging.basicConfig(level=logging.DEBUG)
+logging.basicConfig(level=logging.INFO)
 
 
 def publish_messages():
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/examples/ssl_with_context.py 
new/AMQPStorm-2.7.0/examples/ssl_with_context.py
--- old/AMQPStorm-2.6.2/examples/ssl_with_context.py    1970-01-01 
01:00:00.000000000 +0100
+++ new/AMQPStorm-2.7.0/examples/ssl_with_context.py    2019-04-20 
08:25:42.000000000 +0200
@@ -0,0 +1,58 @@
+import logging
+import ssl
+
+from amqpstorm import Connection
+
+logging.basicConfig(level=logging.INFO)
+
+
+def on_message(message):
+    """This function is called on message received.
+
+    :param message:
+    :return:
+    """
+    print("Message:", message.body)
+
+    # Acknowledge that we handled the message without any issues.
+    message.ack()
+
+    # Reject the message.
+    # message.reject()
+
+    # Reject the message, and put it back in the queue.
+    # message.reject(requeue=True)
+
+
+def start_consumer():
+    ssl_options = {
+        'context': ssl.create_default_context(cafile='cacert.pem'),
+        'server_hostname': 'rmq.eandersson.net'
+    }
+
+    with Connection('rmq.eandersson.net', 'guest', 'guest', port=5671,
+                    ssl=True, ssl_options=ssl_options) as connection:
+        with connection.channel() as channel:
+            # Declare the Queue, 'simple_queue'.
+            channel.queue.declare('simple_queue')
+
+            # Set QoS to 100.
+            # This will limit the consumer to only prefetch a 100 messages.
+
+            # This is a recommended setting, as it prevents the
+            # consumer from keeping all of the messages in a queue to itself.
+            channel.basic.qos(100)
+
+            # Start consuming the queue 'simple_queue' using the callback
+            # 'on_message' and last require the message to be acknowledged.
+            channel.basic.consume(on_message, 'simple_queue', no_ack=False)
+
+            try:
+                # Start consuming messages.
+                channel.start_consuming()
+            except KeyboardInterrupt:
+                channel.close()
+
+
+if __name__ == '__main__':
+    start_consumer()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/AMQPStorm-2.6.2/setup.py new/AMQPStorm-2.7.0/setup.py
--- old/AMQPStorm-2.6.2/setup.py        2019-02-03 00:17:55.000000000 +0100
+++ new/AMQPStorm-2.7.0/setup.py        2019-04-20 08:25:42.000000000 +0200
@@ -3,7 +3,7 @@
 
 setup(
     name='AMQPStorm',
-    version='2.6.2',
+    version='2.7.0',
     description='Thread-safe Python RabbitMQ Client & Management library.',
     long_description=open('README.rst').read(),
     author='Erik Olof Gunnar Andersson',


Reply via email to