Hello community,

here is the log from the commit of package python-kombu for openSUSE:Factory 
checked in at 2020-04-19 21:48:05
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-kombu (Old)
 and      /work/SRC/openSUSE:Factory/.python-kombu.new.2738 (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-kombu"

Sun Apr 19 21:48:05 2020 rev:67 rq:789804 version:4.6.8

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes        
2020-03-27 00:20:45.320118758 +0100
+++ /work/SRC/openSUSE:Factory/.python-kombu.new.2738/python-kombu.changes      
2020-04-19 21:48:12.359928629 +0200
@@ -1,0 +2,9 @@
+Mon Mar 30 14:13:32 UTC 2020 - Marketa Calabkova <[email protected]>
+
+- Update to 4.6.8
+  * Add support for health_check_interval option in broker_transport_options.
+  * Adding retry_on_timeout parameter
+  * Support standard values for ssl_cert_reqs query parameter.
+  * enabled ssl certificate verification when amqps is used for pyamqp 
transport
+
+-------------------------------------------------------------------

Old:
----
  kombu-4.6.7.tar.gz

New:
----
  kombu-4.6.8.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-kombu.spec ++++++
--- /var/tmp/diff_new_pack.SQZ6sV/_old  2020-04-19 21:48:14.183932289 +0200
+++ /var/tmp/diff_new_pack.SQZ6sV/_new  2020-04-19 21:48:14.187932296 +0200
@@ -18,7 +18,7 @@
 
 %{?!python_module:%define python_module() python-%{**} python3-%{**}}
 Name:           python-kombu
-Version:        4.6.7
+Version:        4.6.8
 Release:        0
 Summary:        AMQP Messaging Framework for Python
 License:        BSD-3-Clause

++++++ kombu-4.6.7.tar.gz -> kombu-4.6.8.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/AUTHORS new/kombu-4.6.8/AUTHORS
--- old/kombu-4.6.7/AUTHORS     2019-12-07 15:31:27.000000000 +0100
+++ new/kombu-4.6.8/AUTHORS     2020-03-02 11:00:55.000000000 +0100
@@ -78,6 +78,7 @@
 Juan Carlos Ferrer <[email protected]>
 Kai Groner <[email protected]>
 Keith Fitzgerald <[email protected]>
+Kevin Fox <[email protected]>
 Kevin McCarthy <[email protected]>
 Kevin McDonald <[email protected]>
 Latitia M. Haskins <[email protected]>
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/PKG-INFO new/kombu-4.6.8/PKG-INFO
--- old/kombu-4.6.7/PKG-INFO    2019-12-07 15:45:55.000000000 +0100
+++ new/kombu-4.6.8/PKG-INFO    2020-03-02 11:08:47.316780600 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: kombu
-Version: 4.6.7
+Version: 4.6.8
 Summary: Messaging library for Python.
 Home-page: https://kombu.readthedocs.io
 Author: Ask Solem
@@ -28,17 +28,17 @@
 Classifier: Topic :: System :: Networking
 Classifier: Topic :: Software Development :: Libraries :: Python Modules
 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
-Provides-Extra: redis
-Provides-Extra: yaml
 Provides-Extra: msgpack
-Provides-Extra: zookeeper
+Provides-Extra: yaml
+Provides-Extra: redis
+Provides-Extra: mongodb
 Provides-Extra: sqs
-Provides-Extra: pyro
+Provides-Extra: zookeeper
+Provides-Extra: sqlalchemy
 Provides-Extra: librabbitmq
-Provides-Extra: qpid
+Provides-Extra: pyro
 Provides-Extra: slmq
-Provides-Extra: azureservicebus
 Provides-Extra: azurestoragequeues
-Provides-Extra: mongodb
-Provides-Extra: sqlalchemy
+Provides-Extra: azureservicebus
+Provides-Extra: qpid
 Provides-Extra: consul
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/README.rst new/kombu-4.6.8/README.rst
--- old/kombu-4.6.7/README.rst  2019-12-07 15:44:09.000000000 +0100
+++ new/kombu-4.6.8/README.rst  2020-03-02 11:07:09.000000000 +0100
@@ -4,7 +4,7 @@
 
 |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| 
|[![Downloads](https://pepy.tech/badge/kombu)](https://pepy.tech/project/kombu)|
 
-:Version: 4.6.7
+:Version: 4.6.8
 :Documentation: https://kombu.readthedocs.io/
 :Download: https://pypi.org/project/kombu/
 :Source: https://github.com/celery/kombu/
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/docs/changelog.rst 
new/kombu-4.6.8/docs/changelog.rst
--- old/kombu-4.6.7/docs/changelog.rst  2019-07-12 07:26:52.000000000 +0200
+++ new/kombu-4.6.8/docs/changelog.rst  2020-03-02 11:00:55.000000000 +0100
@@ -1 +1 @@
-.. include:: ../Changelog
+.. include:: ../Changelog.rst
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/docs/includes/introduction.txt 
new/kombu-4.6.8/docs/includes/introduction.txt
--- old/kombu-4.6.7/docs/includes/introduction.txt      2019-12-07 
15:43:40.000000000 +0100
+++ new/kombu-4.6.8/docs/includes/introduction.txt      2020-03-02 
11:06:41.000000000 +0100
@@ -1,4 +1,4 @@
-:Version: 4.6.7
+:Version: 4.6.8
 :Web: https://kombu.readthedocs.io/
 :Download: https://pypi.org/project/kombu/
 :Source: https://github.com/celery/kombu/
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/__init__.py 
new/kombu-4.6.8/kombu/__init__.py
--- old/kombu-4.6.7/kombu/__init__.py   2019-12-07 15:43:21.000000000 +0100
+++ new/kombu-4.6.8/kombu/__init__.py   2020-03-02 11:05:10.000000000 +0100
@@ -6,11 +6,11 @@
 import sys
 
 if sys.version_info < (2, 7):  # pragma: no cover
-    raise Exception('Kombu 4.0 requires Python versions 2.7 or later.')
+    raise Exception('Kombu 4.6 requires Python versions 2.7 or later.')
 
 from collections import namedtuple  # noqa
 
-__version__ = '4.6.7'
+__version__ = '4.6.8'
 __author__ = 'Ask Solem'
 __contact__ = '[email protected], [email protected]'
 __homepage__ = 'https://kombu.readthedocs.io'
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/asynchronous/aws/sqs/connection.py 
new/kombu-4.6.8/kombu/asynchronous/aws/sqs/connection.py
--- old/kombu-4.6.7/kombu/asynchronous/aws/sqs/connection.py    2019-07-12 
07:26:52.000000000 +0200
+++ new/kombu-4.6.8/kombu/asynchronous/aws/sqs/connection.py    2020-03-02 
11:00:55.000000000 +0100
@@ -58,7 +58,7 @@
             queue.id, callback=callback,
         )
 
-    def receive_message(self, queue,
+    def receive_message(self, queue, queue_url,
                         number_messages=1, visibility_timeout=None,
                         attributes=None, wait_time_seconds=None,
                         callback=None):
@@ -72,7 +72,6 @@
             params.update(attrs)
         if wait_time_seconds is not None:
             params['WaitTimeSeconds'] = wait_time_seconds
-        queue_url = self.get_queue_url(queue)
         return self.get_list(
             'ReceiveMessage', params, [('Message', AsyncMessage)],
             queue_url, callback=callback, parent=queue,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/SQS.py 
new/kombu-4.6.8/kombu/transport/SQS.py
--- old/kombu-4.6.7/kombu/transport/SQS.py      2019-07-12 07:26:52.000000000 
+0200
+++ new/kombu-4.6.8/kombu/transport/SQS.py      2020-03-02 11:00:55.000000000 
+0100
@@ -31,6 +31,33 @@
     up to 'prefetch_count' messages from queueA and work on them all before
     moving on to queueB.  If queueB is empty, it will wait up until
     'polling_interval' expires before moving back and checking on queueA.
+
+Other Features supported by this transport:
+  Predefined Queues:
+    The default behavior of this transport is to use a single AWS credential
+    pair in order to manage all SQS queues (e.g. listing queues, creating
+    queues, polling queues, deleting messages).
+
+    If it is preferable for your environment to use a single AWS credential, 
you
+    can use the 'predefined_queues' setting inside the  'transport_options' 
map.
+    This setting allows you to specify the SQS queue URL and AWS credentials 
for
+    each of your queues. For example, if you have two queues which both already
+    exist in AWS) you can tell this transport about them as follows:
+
+    transport_options = {
+      'predefined_queues': {
+        'queue-1': {
+          'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
+          'access_key_id': 'a',
+          'secret_access_key': 'b',
+        },
+        'queue-2': {
+          'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
+          'access_key_id': 'c',
+          'secret_access_key': 'd',
+        },
+      }
+    }
 """  # noqa: E501
 
 from __future__ import absolute_import, unicode_literals
@@ -76,6 +103,10 @@
         return x
 
 
+class UndefinedQueueException(Exception):
+    """Predefined queues are being used and an undefined queue was used."""
+
+
 class Channel(virtual.Channel):
     """SQS Channel."""
 
@@ -84,7 +115,9 @@
     default_wait_time_seconds = 10  # up to 20 seconds max
     domain_format = 'kombu%(vhost)s'
     _asynsqs = None
+    _predefined_queue_async_clients = {}  # A client for each predefined queue
     _sqs = None
+    _predefined_queue_clients = {}  # A client for each predefined queue
     _queue_cache = {}
     _noack_queues = set()
 
@@ -102,7 +135,12 @@
         self.hub = kwargs.get('hub') or get_event_loop()
 
     def _update_queue_cache(self, queue_name_prefix):
-        resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix)
+        if self.predefined_queues:
+            for queue_name, q in self.predefined_queues.items():
+                self._queue_cache[queue_name] = q['url']
+            return
+
+        resp = self.sqs().list_queues(QueueNamePrefix=queue_name_prefix)
         for url in resp.get('QueueUrls', []):
             queue_name = url.split('/')[-1]
             self._queue_cache[queue_name] = url
@@ -177,6 +215,12 @@
         try:
             return self._queue_cache[queue]
         except KeyError:
+            if self.predefined_queues:
+                raise UndefinedQueueException((
+                    "Queue with name '{}' must be "
+                    "defined in 'predefined_queues'."
+                ).format(queue))
+
             attributes = {'VisibilityTimeout': str(self.visibility_timeout)}
             if queue.endswith('.fifo'):
                 attributes['FifoQueue'] = 'true'
@@ -189,17 +233,22 @@
         """Create an SQS queue with a given name and nominal attributes."""
         # Allow specifying additional boto create_queue Attributes
         # via transport options
+        if self.predefined_queues:
+            return None
+
         attributes.update(
             self.transport_options.get('sqs-creation-attributes') or {},
         )
 
-        return self.sqs.create_queue(
+        return self.sqs(queue=queue_name).create_queue(
             QueueName=queue_name,
             Attributes=attributes,
         )
 
     def _delete(self, queue, *args, **kwargs):
         """Delete queue by name."""
+        if self.predefined_queues:
+            return
         super(Channel, self)._delete(queue)
         self._queue_cache.pop(queue, None)
 
@@ -219,14 +268,16 @@
                     message['properties']['MessageDeduplicationId']
             else:
                 kwargs['MessageDeduplicationId'] = str(uuid.uuid4())
+
+        c = self.sqs(queue=self.canonical_queue_name(queue))
         if message.get('redelivered'):
-            self.sqs.change_message_visibility(
+            c.change_message_visibility(
                 QueueUrl=q_url,
                 ReceiptHandle=message['properties']['delivery_tag'],
                 VisibilityTimeout=0
             )
         else:
-            self.sqs.send_message(**kwargs)
+            c.send_message(**kwargs)
 
     def _message_to_python(self, message, queue_name, queue):
         try:
@@ -236,7 +287,10 @@
         payload = loads(bytes_to_str(body))
         if queue_name in self._noack_queues:
             queue = self._new_queue(queue_name)
-            self.asynsqs.delete_message(queue, message['ReceiptHandle'])
+            self.asynsqs(queue=queue_name).delete_message(
+                queue,
+                message['ReceiptHandle'],
+            )
         else:
             try:
                 properties = payload['properties']
@@ -303,7 +357,7 @@
         max_count = self._get_message_estimate()
         if max_count:
             q_url = self._new_queue(queue)
-            resp = self.sqs.receive_message(
+            resp = self.sqs(queue=queue).receive_message(
                 QueueUrl=q_url, MaxNumberOfMessages=max_count,
                 WaitTimeSeconds=self.wait_time_seconds)
             if resp.get('Messages'):
@@ -317,7 +371,7 @@
     def _get(self, queue):
         """Try to retrieve a single message off ``queue``."""
         q_url = self._new_queue(queue)
-        resp = self.sqs.receive_message(
+        resp = self.sqs(queue=queue).receive_message(
             QueueUrl=q_url, MaxNumberOfMessages=1,
             WaitTimeSeconds=self.wait_time_seconds)
         if resp.get('Messages'):
@@ -359,7 +413,7 @@
         q = self._new_queue(queue)
         qname = self.canonical_queue_name(queue)
         return self._get_from_sqs(
-            qname, count=count, connection=self.asynsqs,
+            qname, count=count, connection=self.asynsqs(queue=qname),
             callback=transform(self._on_messages_ready, callback, q, queue),
         )
 
@@ -377,8 +431,17 @@
         Uses long polling and returns :class:`~vine.promises.promise`.
         """
         connection = connection if connection is not None else queue.connection
+        if self.predefined_queues:
+            if queue not in self._queue_cache:
+                raise UndefinedQueueException((
+                    "Queue with name '{}' must be defined in "
+                    "'predefined_queues'."
+                ).format(queue))
+            queue_url = self._queue_cache[queue]
+        else:
+            queue_url = connection.get_queue_url(queue)
         return connection.receive_message(
-            queue, number_messages=count,
+            queue, queue_url, number_messages=count,
             wait_time_seconds=self.wait_time_seconds,
             callback=callback,
         )
@@ -397,14 +460,21 @@
         except KeyError:
             pass
         else:
-            self.sqs.delete_message(QueueUrl=message['sqs_queue'],
-                                    ReceiptHandle=sqs_message['ReceiptHandle'])
+            queue = None
+            if 'routing_key' in message:
+                queue = self.canonical_queue_name(message['routing_key'])
+
+            self.sqs(queue=queue).delete_message(
+                QueueUrl=message['sqs_queue'],
+                ReceiptHandle=sqs_message['ReceiptHandle'],
+            )
         super(Channel, self).basic_ack(delivery_tag)
 
     def _size(self, queue):
         """Return the number of messages in a queue."""
         url = self._new_queue(queue)
-        resp = self.sqs.get_queue_attributes(
+        c = self.sqs(queue=self.canonical_queue_name(queue))
+        resp = c.get_queue_attributes(
             QueueUrl=url,
             AttributeNames=['ApproximateNumberOfMessages'])
         return int(resp['Attributes']['ApproximateNumberOfMessages'])
@@ -419,43 +489,83 @@
             size += int(self._size(queue))
             if not size:
                 break
-        self.sqs.purge_queue(QueueUrl=q)
+        self.sqs(queue=queue).purge_queue(QueueUrl=q)
         return size
 
     def close(self):
         super(Channel, self).close()
         # if self._asynsqs:
         #     try:
-        #         self.asynsqs.close()
+        #         self.asynsqs().close()
         #     except AttributeError as exc:  # FIXME ???
         #         if "can't set attribute" not in str(exc):
         #             raise
 
-    @property
-    def sqs(self):
-        if self._sqs is None:
-            session = boto3.session.Session(
-                region_name=self.region,
-                aws_access_key_id=self.conninfo.userid,
-                aws_secret_access_key=self.conninfo.password,
+    def new_sqs_client(self, region, access_key_id, secret_access_key):
+        session = boto3.session.Session(
+            region_name=region,
+            aws_access_key_id=access_key_id,
+            aws_secret_access_key=secret_access_key,
+        )
+        is_secure = self.is_secure if self.is_secure is not None else True
+        client_kwargs = {
+            'use_ssl': is_secure
+        }
+        if self.endpoint_url is not None:
+            client_kwargs['endpoint_url'] = self.endpoint_url
+        return session.client('sqs', **client_kwargs)
+
+    def sqs(self, queue=None):
+        if queue is not None and self.predefined_queues:
+            if queue in self._predefined_queue_clients:
+                return self._predefined_queue_clients[queue]
+            if queue not in self.predefined_queues:
+                raise UndefinedQueueException((
+                    "Queue with name '{}' must be defined in "
+                    "'predefined_queues'."
+                ).format(queue))
+            q = self.predefined_queues[queue]
+            c = self._predefined_queue_clients[queue] = self.new_sqs_client(
+                region=q.get('region', self.region),
+                access_key_id=q['access_key_id'] or self.conninfo.userid,
+                secret_access_key=q['secret_access_key'] or 
self.conninfo.password,  # noqa: E501
             )
-            is_secure = self.is_secure if self.is_secure is not None else True
-            client_kwargs = {
-                'use_ssl': is_secure
-            }
-            if self.endpoint_url is not None:
-                client_kwargs['endpoint_url'] = self.endpoint_url
-            self._sqs = session.client('sqs', **client_kwargs)
-        return self._sqs
+            return c
 
-    @property
-    def asynsqs(self):
-        if self._asynsqs is None:
-            self._asynsqs = AsyncSQSConnection(
-                sqs_connection=self.sqs,
-                region=self.region
+        if self._sqs is not None:
+            return self._sqs
+
+        c = self._sqs = self.new_sqs_client(
+            region=self.region,
+            access_key_id=self.conninfo.userid,
+            secret_access_key=self.conninfo.password,
+        )
+        return c
+
+    def asynsqs(self, queue=None):
+        if queue is not None and self.predefined_queues:
+            if queue in self._predefined_queue_async_clients:
+                return self._predefined_queue_async_clients[queue]
+            if queue not in self.predefined_queues:
+                raise UndefinedQueueException((
+                    "Queue with name '{}' must be defined in "
+                    "'predefined_queues'."
+                ).format(queue))
+            q = self.predefined_queues[queue]
+            c = self._predefined_queue_async_clients[queue] = 
AsyncSQSConnection(  # noqa: E501
+                sqs_connection=self.sqs(queue=queue),
+                region=q.get('region', self.region)
             )
-        return self._asynsqs
+            return c
+
+        if self._asynsqs is not None:
+            return self._asynsqs
+
+        c = self._asynsqs = AsyncSQSConnection(
+            sqs_connection=self.sqs(queue=queue),
+            region=self.region
+        )
+        return c
 
     @property
     def conninfo(self):
@@ -471,6 +581,11 @@
                 self.default_visibility_timeout)
 
     @cached_property
+    def predefined_queues(self):
+        """Map of queue_name to predefined queue settings."""
+        return self.transport_options.get('predefined_queues', None)
+
+    @cached_property
     def queue_name_prefix(self):
         return self.transport_options.get('queue_name_prefix', '')
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/pyamqp.py 
new/kombu-4.6.8/kombu/transport/pyamqp.py
--- old/kombu-4.6.7/kombu/transport/pyamqp.py   2019-07-12 07:26:52.000000000 
+0200
+++ new/kombu-4.6.8/kombu/transport/pyamqp.py   2020-03-02 11:00:55.000000000 
+0100
@@ -176,4 +176,5 @@
         super(SSLTransport, self).__init__(*args, **kwargs)
 
         # ugh, not exactly pure, but hey, it's python.
-        self.client.ssl = True
+        if not self.client.ssl: # not dict or False
+            self.client.ssl = True
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/redis.py 
new/kombu-4.6.8/kombu/transport/redis.py
--- old/kombu-4.6.7/kombu/transport/redis.py    2019-12-07 15:31:27.000000000 
+0100
+++ new/kombu-4.6.8/kombu/transport/redis.py    2020-03-02 11:00:55.000000000 
+0100
@@ -124,7 +124,7 @@
             try:
                 with client.pipeline(True) as pipe:
                     pipe.watch(name)
-                    if pipe.get(name) == lock_id:
+                    if bytes_to_str(pipe.get(name)) == lock_id:
                         pipe.multi()
                         pipe.delete(name)
                         pipe.execute()
@@ -349,9 +349,9 @@
         for channel in self._channels:
             # only if subclient property is cached
             client = channel.__dict__.get('subclient')
-            if client is not None:
-                if callable(getattr(client, "check_health", None)):
-                    client.check_health()
+            if client is not None \
+                    and callable(getattr(client, 'check_health', None)):
+                client.check_health()
 
     def on_readable(self, fileno):
         chan, type = self._fd_to_chan[fileno]
@@ -427,7 +427,9 @@
     socket_connect_timeout = None
     socket_keepalive = None
     socket_keepalive_options = None
+    retry_on_timeout = None
     max_connections = 10
+    health_check_interval = DEFAULT_HEALTH_CHECK_INTERVAL
     #: Transport option to disable fanout keyprefix.
     #: Can also be string, in which case it changes the default
     #: prefix ('/{db}.') into to something else.  The prefix must
@@ -491,14 +493,15 @@
          'socket_keepalive_options',
          'queue_order_strategy',
          'max_connections',
+         'health_check_interval',
+         'retry_on_timeout',
          'priority_steps')  # <-- do not add comma here!
     )
 
     connection_class = redis.Connection if redis else None
 
     def __init__(self, *args, **kwargs):
-        super_ = super(Channel, self)
-        super_.__init__(*args, **kwargs)
+        super(Channel, self).__init__(*args, **kwargs)
 
         if not self.ack_emulation:  # disable visibility timeout
             self.QoS = virtual.QoS
@@ -775,7 +778,9 @@
 
     def _q_for_pri(self, queue, pri):
         pri = self.priority(pri)
-        return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', ''))
+        if pri:
+            return "{{{}}}{}{}".format(queue, self.sep, pri)
+        return queue
 
     def priority(self, n):
         steps = self.priority_steps
@@ -905,14 +910,19 @@
             'socket_connect_timeout': self.socket_connect_timeout,
             'socket_keepalive': self.socket_keepalive,
             'socket_keepalive_options': self.socket_keepalive_options,
+            'health_check_interval': self.health_check_interval,
+            'retry_on_timeout': self.retry_on_timeout,
         }
 
         conn_class = self.connection_class
+
+        # If the connection class does not support the `health_check_interval`
+        # argument then remove it.
         if (
             hasattr(conn_class, '__init__') and
-            accepts_argument(conn_class.__init__, 'health_check_interval')
+            not accepts_argument(conn_class.__init__, 'health_check_interval')
         ):
-            connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL
+            connparams.pop('health_check_interval')
 
         if conninfo.ssl:
             # Connection(ssl={}) must be a dict containing the keys:
@@ -1064,8 +1074,12 @@
             [add_reader(fd, on_readable, fd) for fd in cycle.fds]
         loop.on_tick.add(on_poll_start)
         loop.call_repeatedly(10, cycle.maybe_restore_messages)
+        health_check_interval = connection.client.transport_options.get(
+            'health_check_interval',
+            DEFAULT_HEALTH_CHECK_INTERVAL
+        )
         loop.call_repeatedly(
-            DEFAULT_HEALTH_CHECK_INTERVAL,
+            health_check_interval,
             cycle.maybe_check_subclient_health
         )
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu/utils/url.py 
new/kombu-4.6.8/kombu/utils/url.py
--- old/kombu-4.6.7/kombu/utils/url.py  2019-07-12 07:26:52.000000000 +0200
+++ new/kombu-4.6.8/kombu/utils/url.py  2020-03-02 11:00:55.000000000 +0100
@@ -49,10 +49,8 @@
         keys = [key for key in query.keys() if key.startswith('ssl_')]
         for key in keys:
             if key == 'ssl_cert_reqs':
-                if ssl_available:
-                    query[key] = getattr(ssl, query[key])
-                else:
-                    query[key] = None
+                query[key] = parse_ssl_cert_reqs(query[key])
+                if query[key] is None:
                     logger.warning('Defaulting to insecure SSL behaviour.')
 
             if 'ssl' not in query:
@@ -120,3 +118,20 @@
     if isinstance(url, string_t) and '://' in url:
         return sanitize_url(url, mask)
     return url
+
+
+def parse_ssl_cert_reqs(query_value):
+    # type: (str) -> Any
+    """Given the query parameter for ssl_cert_reqs, return the SSL constant or 
None."""
+    if ssl_available:
+        query_value_to_constant = {
+            'CERT_REQUIRED': ssl.CERT_REQUIRED,
+            'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
+            'CERT_NONE': ssl.CERT_NONE,
+            'required': ssl.CERT_REQUIRED,
+            'optional': ssl.CERT_OPTIONAL,
+            'none': ssl.CERT_NONE,
+        }
+        return query_value_to_constant[query_value]
+    else:
+        return None
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/kombu.egg-info/PKG-INFO 
new/kombu-4.6.8/kombu.egg-info/PKG-INFO
--- old/kombu-4.6.7/kombu.egg-info/PKG-INFO     2019-12-07 15:45:55.000000000 
+0100
+++ new/kombu-4.6.8/kombu.egg-info/PKG-INFO     2020-03-02 11:08:47.000000000 
+0100
@@ -1,6 +1,6 @@
 Metadata-Version: 2.1
 Name: kombu
-Version: 4.6.7
+Version: 4.6.8
 Summary: Messaging library for Python.
 Home-page: https://kombu.readthedocs.io
 Author: Ask Solem
@@ -28,17 +28,17 @@
 Classifier: Topic :: System :: Networking
 Classifier: Topic :: Software Development :: Libraries :: Python Modules
 Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*
-Provides-Extra: redis
-Provides-Extra: yaml
 Provides-Extra: msgpack
-Provides-Extra: zookeeper
+Provides-Extra: yaml
+Provides-Extra: redis
+Provides-Extra: mongodb
 Provides-Extra: sqs
-Provides-Extra: pyro
+Provides-Extra: zookeeper
+Provides-Extra: sqlalchemy
 Provides-Extra: librabbitmq
-Provides-Extra: qpid
+Provides-Extra: pyro
 Provides-Extra: slmq
-Provides-Extra: azureservicebus
 Provides-Extra: azurestoragequeues
-Provides-Extra: mongodb
-Provides-Extra: sqlalchemy
+Provides-Extra: azureservicebus
+Provides-Extra: qpid
 Provides-Extra: consul
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' 
old/kombu-4.6.7/t/unit/asynchronous/aws/sqs/test_connection.py 
new/kombu-4.6.8/t/unit/asynchronous/aws/sqs/test_connection.py
--- old/kombu-4.6.7/t/unit/asynchronous/aws/sqs/test_connection.py      
2019-07-12 07:26:52.000000000 +0200
+++ new/kombu-4.6.8/t/unit/asynchronous/aws/sqs/test_connection.py      
2020-03-02 11:00:55.000000000 +0100
@@ -86,7 +86,12 @@
 
     def test_receive_message(self):
         queue = Mock(name='queue')
-        self.x.receive_message(queue, 4, callback=self.callback)
+        self.x.receive_message(
+            queue,
+            self.x.get_queue_url('queue'),
+            4,
+            callback=self.callback,
+        )
         self.x.get_list.assert_called_with(
             'ReceiveMessage', {'MaxNumberOfMessages': 4},
             [('Message', AsyncMessage)],
@@ -96,7 +101,13 @@
 
     def test_receive_message__with_visibility_timeout(self):
         queue = Mock(name='queue')
-        self.x.receive_message(queue, 4, 3666, callback=self.callback)
+        self.x.receive_message(
+            queue,
+            self.x.get_queue_url('queue'),
+            4,
+            3666,
+            callback=self.callback,
+        )
         self.x.get_list.assert_called_with(
             'ReceiveMessage', {
                 'MaxNumberOfMessages': 4,
@@ -110,7 +121,11 @@
     def test_receive_message__with_wait_time_seconds(self):
         queue = Mock(name='queue')
         self.x.receive_message(
-            queue, 4, wait_time_seconds=303, callback=self.callback,
+            queue,
+            self.x.get_queue_url('queue'),
+            4,
+            wait_time_seconds=303,
+            callback=self.callback,
         )
         self.x.get_list.assert_called_with(
             'ReceiveMessage', {
@@ -125,7 +140,11 @@
     def test_receive_message__with_attributes(self):
         queue = Mock(name='queue')
         self.x.receive_message(
-            queue, 4, attributes=['foo', 'bar'], callback=self.callback,
+            queue,
+            self.x.get_queue_url('queue'),
+            4,
+            attributes=['foo', 'bar'],
+            callback=self.callback,
         )
         self.x.get_list.assert_called_with(
             'ReceiveMessage', {
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/t/unit/test_connection.py 
new/kombu-4.6.8/t/unit/test_connection.py
--- old/kombu-4.6.7/t/unit/test_connection.py   2019-12-07 15:31:27.000000000 
+0100
+++ new/kombu-4.6.8/t/unit/test_connection.py   2020-03-02 11:00:55.000000000 
+0100
@@ -117,6 +117,7 @@
         clone = deepcopy(conn)
         assert clone.alt == ['amqp://host']
 
+    @skip.unless_module('sqlalchemy')
     def test_parse_generated_as_uri_pg(self):
         conn = Connection(self.pg_url)
         assert conn.as_uri() == self.pg_nopass
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_SQS.py 
new/kombu-4.6.8/t/unit/transport/test_SQS.py
--- old/kombu-4.6.7/t/unit/transport/test_SQS.py        2019-07-12 
07:26:52.000000000 +0200
+++ new/kombu-4.6.8/t/unit/transport/test_SQS.py        2020-03-02 
11:00:55.000000000 +0100
@@ -23,6 +23,20 @@
 SQS_Channel_sqs = SQS.Channel.sqs
 
 
+example_predefined_queues = {
+    'queue-1': {
+        'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-1',
+        'access_key_id': 'a',
+        'secret_access_key': 'b',
+    },
+    'queue-2': {
+        'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-2',
+        'access_key_id': 'c',
+        'secret_access_key': 'd',
+    },
+}
+
+
 class SQSMessageMock(object):
     def __init__(self):
         """
@@ -49,19 +63,14 @@
 
 class SQSClientMock(object):
 
-    def __init__(self):
+    def __init__(self, QueueName='unittest_queue'):
         """
         Imitate the SQS Client from boto3.
         """
         self._receive_messages_calls = 0
         # _queues doesn't exist on the real client, here for testing.
         self._queues = {}
-        for n in range(1):
-            name = 'q_{}'.format(n)
-            url = 'sqs://q_{}'.format(n)
-            self.create_queue(QueueName=name)
-
-        url = self.create_queue(QueueName='unittest_queue')['QueueUrl']
+        url = self.create_queue(QueueName=QueueName)['QueueUrl']
         self.send_message(QueueUrl=url, MessageBody='hello')
 
     def _get_q(self, url):
@@ -73,7 +82,7 @@
 
     def create_queue(self, QueueName=None, Attributes=None):
         q = self._queues[QueueName] = QueueMock(
-            'sqs://' + QueueName,
+            'https://sqs.us-east-1.amazonaws.com/xxx/' + QueueName,
             Attributes,
         )
         return {'QueueUrl': q.url}
@@ -133,10 +142,21 @@
 
         # Mock the sqs() method that returns an SQSConnection object and
         # instead return an SQSConnectionMock() object.
-        self.sqs_conn_mock = SQSClientMock()
+        sqs_conn_mock = SQSClientMock()
+        self.sqs_conn_mock = sqs_conn_mock
+
+        predefined_queues_sqs_conn_mocks = {
+            'queue-1': SQSClientMock(QueueName='queue-1'),
+            'queue-2': SQSClientMock(QueueName='queue-2'),
+        }
 
         def mock_sqs():
-            return self.sqs_conn_mock
+            def sqs(self, queue=None):
+                if queue in predefined_queues_sqs_conn_mocks:
+                    return predefined_queues_sqs_conn_mocks[queue]
+                return sqs_conn_mock
+
+            return sqs
 
         SQS.Channel.sqs = mock_sqs()
 
@@ -218,7 +238,7 @@
         expected_endpoint_url = 'http://localhost:5493'
         assert self.channel.endpoint_url == expected_endpoint_url
         boto3_sqs = SQS_Channel_sqs.__get__(self.channel, SQS.Channel)
-        assert boto3_sqs._endpoint.host == expected_endpoint_url
+        assert boto3_sqs()._endpoint.host == expected_endpoint_url
 
     def test_none_hostname_persists(self):
         conn = Connection(hostname=None, transport=SQS.Transport)
@@ -350,7 +370,7 @@
         assert message == results
 
     def test_redelivered(self):
-        self.channel.sqs.change_message_visibility = \
+        self.channel.sqs().change_message_visibility = \
             Mock(name='change_message_visibility')
         message = {
             'redelivered': True,
@@ -480,9 +500,61 @@
         mock_messages = Mock()
         mock_messages.delivery_info = message
         self.channel.qos.append(mock_messages, 1)
-        self.channel.sqs.delete_message = Mock()
+        self.channel.sqs().delete_message = Mock()
         self.channel.basic_ack(1)
         self.sqs_conn_mock.delete_message.assert_called_with(
             QueueUrl=message['sqs_queue'],
             ReceiptHandle=message['sqs_message']['ReceiptHandle']
         )
+
+    def test_predefined_queues_primes_queue_cache(self):
+        connection = Connection(transport=SQS.Transport, transport_options={
+            'predefined_queues': example_predefined_queues,
+        })
+        channel = connection.channel()
+
+        assert 'queue-1' in channel._queue_cache
+        assert 'queue-2' in channel._queue_cache
+
+    def test_predefined_queues_new_queue_raises_if_queue_not_exists(self):
+        connection = Connection(transport=SQS.Transport, transport_options={
+            'predefined_queues': example_predefined_queues,
+        })
+        channel = connection.channel()
+
+        with pytest.raises(SQS.UndefinedQueueException):
+            channel._new_queue('queue-99')
+
+    def test_predefined_queues_get_from_sqs(self):
+        connection = Connection(transport=SQS.Transport, transport_options={
+            'predefined_queues': example_predefined_queues,
+        })
+        channel = connection.channel()
+
+        def message_to_python(message, queue_name, queue):
+            return message
+
+        channel._message_to_python = Mock(side_effect=message_to_python)
+
+        queue_name = "queue-1"
+
+        exchange = Exchange('test_SQS', type='direct')
+        p = messaging.Producer(channel, exchange, routing_key=queue_name)
+        queue = Queue(queue_name, exchange, queue_name)
+        queue(channel).declare()
+
+        # Getting a single message
+        p.publish('message')
+        result = channel._get(queue_name)
+
+        assert 'Body' in result.keys()
+
+        # Getting many messages
+        for i in range(3):
+            p.publish('message: {0}'.format(i))
+
+        channel.connection._deliver = Mock(name='_deliver')
+        channel._get_bulk(queue_name, max_if_unlimited=3)
+        channel.connection._deliver.assert_called()
+
+        assert len(channel.sqs(queue_name)._queues[queue_name].messages) == 0
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_pyamqp.py 
new/kombu-4.6.8/t/unit/transport/test_pyamqp.py
--- old/kombu-4.6.7/t/unit/transport/test_pyamqp.py     2019-07-12 
07:26:52.000000000 +0200
+++ new/kombu-4.6.8/t/unit/transport/test_pyamqp.py     2020-03-02 
11:00:55.000000000 +0100
@@ -91,6 +91,15 @@
         self.transport.create_channel(connection)
         connection.channel.assert_called_with()
 
+    def test_ssl_cert_passed(self):
+        ssl_dict={
+            'ca_certs': '/etc/pki/tls/certs/something.crt',
+            'cert_reqs': "ssl.CERT_REQUIRED",
+        }
+        ssl_dict_copy = {k: ssl_dict[k] for k in ssl_dict}
+        connection = Connection('amqps://', ssl=ssl_dict_copy)
+        assert connection.transport.client.ssl == ssl_dict
+
     def test_driver_version(self):
         assert self.transport.driver_version()
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_redis.py 
new/kombu-4.6.8/t/unit/transport/test_redis.py
--- old/kombu-4.6.7/t/unit/transport/test_redis.py      2019-12-07 
15:31:27.000000000 +0100
+++ new/kombu-4.6.8/t/unit/transport/test_redis.py      2020-03-02 
11:00:55.000000000 +0100
@@ -14,6 +14,7 @@
 from kombu.five import Empty, Queue as _Queue, bytes_if_py2
 from kombu.transport import virtual
 from kombu.utils import eventio  # patch poll
+from kombu.utils.encoding import str_to_bytes
 from kombu.utils.json import dumps
 
 
@@ -311,6 +312,18 @@
             conn.channel()
         pool.disconnect.assert_not_called()
 
+    def test_get_redis_ConnectionError(self):
+        from redis.exceptions import ConnectionError
+        from kombu.transport.redis import get_redis_ConnectionError
+        connection_error = get_redis_ConnectionError()
+        assert connection_error == ConnectionError
+
+    def test_after_fork_cleanup_channel(self):
+        from kombu.transport.redis import _after_fork_cleanup_channel
+        channel = Mock()
+        _after_fork_cleanup_channel(channel)
+        channel._after_fork.assert_called_once()
+
     def test_after_fork(self):
         self.channel._pool = None
         self.channel._after_fork()
@@ -696,6 +709,20 @@
         path = connection_parameters['path']
         assert (password, path) == (None, '/var/run/redis.sock')
 
+    def test_connparams_health_check_interval_not_supported(self):
+        with patch('kombu.transport.redis.Channel._create_client'):
+            with Connection('redis+socket:///tmp/redis.sock') as conn:
+                conn.default_channel.connection_class = \
+                    Mock(name='connection_class')
+                connparams = conn.default_channel._connparams()
+                assert 'health_check_interval' not in connparams
+
+    def test_connparams_health_check_interval_supported(self):
+        with patch('kombu.transport.redis.Channel._create_client'):
+            with Connection('redis+socket:///tmp/redis.sock') as conn:
+                connparams = conn.default_channel._connparams()
+                assert connparams['health_check_interval'] == 25
+
     def test_rotate_cycle_ValueError(self):
         cycle = self.channel._queue_cycle
         cycle.update(['kramer', 'jerry'])
@@ -733,6 +760,7 @@
         transport.cycle = Mock(name='cycle')
         transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'}
         conn = Mock(name='conn')
+        conn.client = Mock(name='client', transport_options={})
         loop = Mock(name='loop')
         redis.Transport.register_with_event_loop(transport, conn, loop)
         transport.cycle.on_poll_init.assert_called_with(loop.poller)
@@ -750,6 +778,31 @@
             call(13, transport.on_readable, 13),
         ])
 
+    def test_configurable_health_check(self):
+        transport = self.connection.transport
+        transport.cycle = Mock(name='cycle')
+        transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'}
+        conn = Mock(name='conn')
+        conn.client = Mock(name='client', transport_options={
+            'health_check_interval': 15,
+        })
+        loop = Mock(name='loop')
+        redis.Transport.register_with_event_loop(transport, conn, loop)
+        transport.cycle.on_poll_init.assert_called_with(loop.poller)
+        loop.call_repeatedly.assert_has_calls([
+            call(10, transport.cycle.maybe_restore_messages),
+            call(15, transport.cycle.maybe_check_subclient_health),
+        ])
+        loop.on_tick.add.assert_called()
+        on_poll_start = loop.on_tick.add.call_args[0][0]
+
+        on_poll_start()
+        transport.cycle.on_poll_start.assert_called_with()
+        loop.add_reader.assert_has_calls([
+            call(12, transport.on_readable, 12),
+            call(13, transport.on_readable, 13),
+        ])
+
     def test_transport_on_readable(self):
         transport = self.connection.transport
         cycle = transport.cycle = Mock(name='cyle')
@@ -1311,13 +1364,13 @@
             client.setnx.return_value = True
             client.pipeline = ContextMock()
             pipe = client.pipeline.return_value
-            pipe.get.return_value = lock_id
+            pipe.get.return_value = str_to_bytes(lock_id)  # redis gives bytes
             held = False
             with redis.Mutex(client, 'foo1', 100):
                 held = True
             assert held
             client.setnx.assert_called_with('foo1', lock_id)
-            pipe.get.return_value = 'yyy'
+            pipe.get.return_value = b'yyy'
             held = False
             with redis.Mutex(client, 'foo1', 100):
                 held = True
@@ -1325,7 +1378,7 @@
 
             # Did not win
             client.expire.reset_mock()
-            pipe.get.return_value = lock_id
+            pipe.get.return_value = str_to_bytes(lock_id)
             client.setnx.return_value = False
             with pytest.raises(redis.MutexHeld):
                 held = False
@@ -1389,7 +1442,8 @@
                 connection_class=mock.ANY, db=0, max_connections=10,
                 min_other_sentinels=0, password=None, sentinel_kwargs=None,
                 socket_connect_timeout=None, socket_keepalive=None,
-                socket_keepalive_options=None, socket_timeout=None)
+                socket_keepalive_options=None, socket_timeout=None,
+                retry_on_timeout=None)
 
             master_for = patched.return_value.master_for
             master_for.assert_called()
@@ -1411,7 +1465,8 @@
                 connection_class=mock.ANY, db=0, max_connections=10,
                 min_other_sentinels=0, password=None, sentinel_kwargs=None,
                 socket_connect_timeout=None, socket_keepalive=None,
-                socket_keepalive_options=None, socket_timeout=None)
+                socket_keepalive_options=None, socket_timeout=None,
+                retry_on_timeout=None)
 
             master_for = patched.return_value.master_for
             master_for.assert_called()
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-4.6.7/t/unit/utils/test_url.py 
new/kombu-4.6.8/t/unit/utils/test_url.py
--- old/kombu-4.6.7/t/unit/utils/test_url.py    2019-07-12 07:26:52.000000000 
+0200
+++ new/kombu-4.6.8/t/unit/utils/test_url.py    2020-03-02 11:00:55.000000000 
+0100
@@ -2,7 +2,6 @@
 
 try:
     from urllib.parse import urlencode
-
 except ImportError:
     from urllib import urlencode
 
@@ -12,6 +11,7 @@
 
 import kombu.utils.url
 from kombu.utils.url import as_url, parse_url, maybe_sanitize_url
+from kombu.utils.url import parse_ssl_cert_reqs
 
 
 def test_parse_url():
@@ -51,7 +51,7 @@
 def test_ssl_parameters():
     url = 'rediss://user:password@host:6379/0?'
     querystring = urlencode({
-        'ssl_cert_reqs': 'CERT_REQUIRED',
+        'ssl_cert_reqs': 'required',
         'ssl_ca_certs': '/var/ssl/myca.pem',
         'ssl_certfile': '/var/ssl/server-cert.pem',
         'ssl_keyfile': '/var/ssl/priv/worker-key.pem',
@@ -69,3 +69,24 @@
     assert kwargs['ssl']['ssl_cert_reqs'] is None
 
     kombu.utils.url.ssl_available = True
+
+
[email protected]('query_param,ssl_available,expected', [
+    ('CERT_REQUIRED', True, ssl.CERT_REQUIRED),
+    ('CERT_OPTIONAL', True, ssl.CERT_OPTIONAL),
+    ('CERT_NONE', True, ssl.CERT_NONE),
+    ('required', True, ssl.CERT_REQUIRED),
+    ('optional', True, ssl.CERT_OPTIONAL),
+    ('none', True, ssl.CERT_NONE),
+    ('CERT_REQUIRED', None, None),
+])
+def test_parse_ssl_cert_reqs(query_param, ssl_available, expected):
+    kombu.utils.url.ssl_available = ssl_available
+    result = parse_ssl_cert_reqs(query_param)
+    kombu.utils.url.ssl_available = True
+    assert result == expected
+
+
+def test_parse_ssl_cert_reqs_bad_value():
+    with pytest.raises(KeyError):
+        parse_ssl_cert_reqs('badvalue')


Reply via email to