Hello community, here is the log from the commit of package python-kombu for openSUSE:Factory checked in at 2020-04-19 21:48:05 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-kombu (Old) and /work/SRC/openSUSE:Factory/.python-kombu.new.2738 (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-kombu" Sun Apr 19 21:48:05 2020 rev:67 rq:789804 version:4.6.8 Changes: -------- --- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes 2020-03-27 00:20:45.320118758 +0100 +++ /work/SRC/openSUSE:Factory/.python-kombu.new.2738/python-kombu.changes 2020-04-19 21:48:12.359928629 +0200 @@ -1,0 +2,9 @@ +Mon Mar 30 14:13:32 UTC 2020 - Marketa Calabkova <[email protected]> + +- Update to 4.6.8 + * Add support for health_check_interval option in broker_transport_options. + * Adding retry_on_timeout parameter + * Support standard values for ssl_cert_reqs query parameter. + * enabled ssl certificate verification when amqps is used for pyamqp transport + +------------------------------------------------------------------- Old: ---- kombu-4.6.7.tar.gz New: ---- kombu-4.6.8.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-kombu.spec ++++++ --- /var/tmp/diff_new_pack.SQZ6sV/_old 2020-04-19 21:48:14.183932289 +0200 +++ /var/tmp/diff_new_pack.SQZ6sV/_new 2020-04-19 21:48:14.187932296 +0200 @@ -18,7 +18,7 @@ %{?!python_module:%define python_module() python-%{**} python3-%{**}} Name: python-kombu -Version: 4.6.7 +Version: 4.6.8 Release: 0 Summary: AMQP Messaging Framework for Python License: BSD-3-Clause ++++++ kombu-4.6.7.tar.gz -> kombu-4.6.8.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/AUTHORS new/kombu-4.6.8/AUTHORS --- old/kombu-4.6.7/AUTHORS 2019-12-07 15:31:27.000000000 +0100 +++ new/kombu-4.6.8/AUTHORS 2020-03-02 11:00:55.000000000 +0100 @@ -78,6 +78,7 @@ Juan Carlos Ferrer <[email protected]> Kai Groner <[email protected]> Keith Fitzgerald <[email protected]> +Kevin Fox <[email protected]> Kevin McCarthy <[email protected]> Kevin McDonald <[email protected]> Latitia M. Haskins <[email protected]> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/PKG-INFO new/kombu-4.6.8/PKG-INFO --- old/kombu-4.6.7/PKG-INFO 2019-12-07 15:45:55.000000000 +0100 +++ new/kombu-4.6.8/PKG-INFO 2020-03-02 11:08:47.316780600 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: kombu -Version: 4.6.7 +Version: 4.6.8 Summary: Messaging library for Python. Home-page: https://kombu.readthedocs.io Author: Ask Solem @@ -28,17 +28,17 @@ Classifier: Topic :: System :: Networking Classifier: Topic :: Software Development :: Libraries :: Python Modules Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.* -Provides-Extra: redis -Provides-Extra: yaml Provides-Extra: msgpack -Provides-Extra: zookeeper +Provides-Extra: yaml +Provides-Extra: redis +Provides-Extra: mongodb Provides-Extra: sqs -Provides-Extra: pyro +Provides-Extra: zookeeper +Provides-Extra: sqlalchemy Provides-Extra: librabbitmq -Provides-Extra: qpid +Provides-Extra: pyro Provides-Extra: slmq -Provides-Extra: azureservicebus Provides-Extra: azurestoragequeues -Provides-Extra: mongodb -Provides-Extra: sqlalchemy +Provides-Extra: azureservicebus +Provides-Extra: qpid Provides-Extra: consul diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/README.rst new/kombu-4.6.8/README.rst --- old/kombu-4.6.7/README.rst 2019-12-07 15:44:09.000000000 +0100 +++ new/kombu-4.6.8/README.rst 2020-03-02 11:07:09.000000000 +0100 @@ -4,7 +4,7 @@ |build-status| |coverage| |license| |wheel| |pyversion| |pyimp| |[](https://pepy.tech/project/kombu)| -:Version: 4.6.7 +:Version: 4.6.8 :Documentation: https://kombu.readthedocs.io/ :Download: https://pypi.org/project/kombu/ :Source: https://github.com/celery/kombu/ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/docs/changelog.rst new/kombu-4.6.8/docs/changelog.rst --- old/kombu-4.6.7/docs/changelog.rst 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/docs/changelog.rst 2020-03-02 11:00:55.000000000 +0100 @@ -1 +1 @@ -.. include:: ../Changelog +.. include:: ../Changelog.rst diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/docs/includes/introduction.txt new/kombu-4.6.8/docs/includes/introduction.txt --- old/kombu-4.6.7/docs/includes/introduction.txt 2019-12-07 15:43:40.000000000 +0100 +++ new/kombu-4.6.8/docs/includes/introduction.txt 2020-03-02 11:06:41.000000000 +0100 @@ -1,4 +1,4 @@ -:Version: 4.6.7 +:Version: 4.6.8 :Web: https://kombu.readthedocs.io/ :Download: https://pypi.org/project/kombu/ :Source: https://github.com/celery/kombu/ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/__init__.py new/kombu-4.6.8/kombu/__init__.py --- old/kombu-4.6.7/kombu/__init__.py 2019-12-07 15:43:21.000000000 +0100 +++ new/kombu-4.6.8/kombu/__init__.py 2020-03-02 11:05:10.000000000 +0100 @@ -6,11 +6,11 @@ import sys if sys.version_info < (2, 7): # pragma: no cover - raise Exception('Kombu 4.0 requires Python versions 2.7 or later.') + raise Exception('Kombu 4.6 requires Python versions 2.7 or later.') from collections import namedtuple # noqa -__version__ = '4.6.7' +__version__ = '4.6.8' __author__ = 'Ask Solem' __contact__ = '[email protected], [email protected]' __homepage__ = 'https://kombu.readthedocs.io' diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/asynchronous/aws/sqs/connection.py new/kombu-4.6.8/kombu/asynchronous/aws/sqs/connection.py --- old/kombu-4.6.7/kombu/asynchronous/aws/sqs/connection.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/kombu/asynchronous/aws/sqs/connection.py 2020-03-02 11:00:55.000000000 +0100 @@ -58,7 +58,7 @@ queue.id, callback=callback, ) - def receive_message(self, queue, + def receive_message(self, queue, queue_url, number_messages=1, visibility_timeout=None, attributes=None, wait_time_seconds=None, callback=None): @@ -72,7 +72,6 @@ params.update(attrs) if wait_time_seconds is not None: params['WaitTimeSeconds'] = wait_time_seconds - queue_url = self.get_queue_url(queue) return self.get_list( 'ReceiveMessage', params, [('Message', AsyncMessage)], queue_url, callback=callback, parent=queue, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/SQS.py new/kombu-4.6.8/kombu/transport/SQS.py --- old/kombu-4.6.7/kombu/transport/SQS.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/kombu/transport/SQS.py 2020-03-02 11:00:55.000000000 +0100 @@ -31,6 +31,33 @@ up to 'prefetch_count' messages from queueA and work on them all before moving on to queueB. If queueB is empty, it will wait up until 'polling_interval' expires before moving back and checking on queueA. + +Other Features supported by this transport: + Predefined Queues: + The default behavior of this transport is to use a single AWS credential + pair in order to manage all SQS queues (e.g. listing queues, creating + queues, polling queues, deleting messages). + + If it is preferable for your environment to use a single AWS credential, you + can use the 'predefined_queues' setting inside the 'transport_options' map. + This setting allows you to specify the SQS queue URL and AWS credentials for + each of your queues. For example, if you have two queues which both already + exist in AWS) you can tell this transport about them as follows: + + transport_options = { + 'predefined_queues': { + 'queue-1': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa', + 'access_key_id': 'a', + 'secret_access_key': 'b', + }, + 'queue-2': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb', + 'access_key_id': 'c', + 'secret_access_key': 'd', + }, + } + } """ # noqa: E501 from __future__ import absolute_import, unicode_literals @@ -76,6 +103,10 @@ return x +class UndefinedQueueException(Exception): + """Predefined queues are being used and an undefined queue was used.""" + + class Channel(virtual.Channel): """SQS Channel.""" @@ -84,7 +115,9 @@ default_wait_time_seconds = 10 # up to 20 seconds max domain_format = 'kombu%(vhost)s' _asynsqs = None + _predefined_queue_async_clients = {} # A client for each predefined queue _sqs = None + _predefined_queue_clients = {} # A client for each predefined queue _queue_cache = {} _noack_queues = set() @@ -102,7 +135,12 @@ self.hub = kwargs.get('hub') or get_event_loop() def _update_queue_cache(self, queue_name_prefix): - resp = self.sqs.list_queues(QueueNamePrefix=queue_name_prefix) + if self.predefined_queues: + for queue_name, q in self.predefined_queues.items(): + self._queue_cache[queue_name] = q['url'] + return + + resp = self.sqs().list_queues(QueueNamePrefix=queue_name_prefix) for url in resp.get('QueueUrls', []): queue_name = url.split('/')[-1] self._queue_cache[queue_name] = url @@ -177,6 +215,12 @@ try: return self._queue_cache[queue] except KeyError: + if self.predefined_queues: + raise UndefinedQueueException(( + "Queue with name '{}' must be " + "defined in 'predefined_queues'." + ).format(queue)) + attributes = {'VisibilityTimeout': str(self.visibility_timeout)} if queue.endswith('.fifo'): attributes['FifoQueue'] = 'true' @@ -189,17 +233,22 @@ """Create an SQS queue with a given name and nominal attributes.""" # Allow specifying additional boto create_queue Attributes # via transport options + if self.predefined_queues: + return None + attributes.update( self.transport_options.get('sqs-creation-attributes') or {}, ) - return self.sqs.create_queue( + return self.sqs(queue=queue_name).create_queue( QueueName=queue_name, Attributes=attributes, ) def _delete(self, queue, *args, **kwargs): """Delete queue by name.""" + if self.predefined_queues: + return super(Channel, self)._delete(queue) self._queue_cache.pop(queue, None) @@ -219,14 +268,16 @@ message['properties']['MessageDeduplicationId'] else: kwargs['MessageDeduplicationId'] = str(uuid.uuid4()) + + c = self.sqs(queue=self.canonical_queue_name(queue)) if message.get('redelivered'): - self.sqs.change_message_visibility( + c.change_message_visibility( QueueUrl=q_url, ReceiptHandle=message['properties']['delivery_tag'], VisibilityTimeout=0 ) else: - self.sqs.send_message(**kwargs) + c.send_message(**kwargs) def _message_to_python(self, message, queue_name, queue): try: @@ -236,7 +287,10 @@ payload = loads(bytes_to_str(body)) if queue_name in self._noack_queues: queue = self._new_queue(queue_name) - self.asynsqs.delete_message(queue, message['ReceiptHandle']) + self.asynsqs(queue=queue_name).delete_message( + queue, + message['ReceiptHandle'], + ) else: try: properties = payload['properties'] @@ -303,7 +357,7 @@ max_count = self._get_message_estimate() if max_count: q_url = self._new_queue(queue) - resp = self.sqs.receive_message( + resp = self.sqs(queue=queue).receive_message( QueueUrl=q_url, MaxNumberOfMessages=max_count, WaitTimeSeconds=self.wait_time_seconds) if resp.get('Messages'): @@ -317,7 +371,7 @@ def _get(self, queue): """Try to retrieve a single message off ``queue``.""" q_url = self._new_queue(queue) - resp = self.sqs.receive_message( + resp = self.sqs(queue=queue).receive_message( QueueUrl=q_url, MaxNumberOfMessages=1, WaitTimeSeconds=self.wait_time_seconds) if resp.get('Messages'): @@ -359,7 +413,7 @@ q = self._new_queue(queue) qname = self.canonical_queue_name(queue) return self._get_from_sqs( - qname, count=count, connection=self.asynsqs, + qname, count=count, connection=self.asynsqs(queue=qname), callback=transform(self._on_messages_ready, callback, q, queue), ) @@ -377,8 +431,17 @@ Uses long polling and returns :class:`~vine.promises.promise`. """ connection = connection if connection is not None else queue.connection + if self.predefined_queues: + if queue not in self._queue_cache: + raise UndefinedQueueException(( + "Queue with name '{}' must be defined in " + "'predefined_queues'." + ).format(queue)) + queue_url = self._queue_cache[queue] + else: + queue_url = connection.get_queue_url(queue) return connection.receive_message( - queue, number_messages=count, + queue, queue_url, number_messages=count, wait_time_seconds=self.wait_time_seconds, callback=callback, ) @@ -397,14 +460,21 @@ except KeyError: pass else: - self.sqs.delete_message(QueueUrl=message['sqs_queue'], - ReceiptHandle=sqs_message['ReceiptHandle']) + queue = None + if 'routing_key' in message: + queue = self.canonical_queue_name(message['routing_key']) + + self.sqs(queue=queue).delete_message( + QueueUrl=message['sqs_queue'], + ReceiptHandle=sqs_message['ReceiptHandle'], + ) super(Channel, self).basic_ack(delivery_tag) def _size(self, queue): """Return the number of messages in a queue.""" url = self._new_queue(queue) - resp = self.sqs.get_queue_attributes( + c = self.sqs(queue=self.canonical_queue_name(queue)) + resp = c.get_queue_attributes( QueueUrl=url, AttributeNames=['ApproximateNumberOfMessages']) return int(resp['Attributes']['ApproximateNumberOfMessages']) @@ -419,43 +489,83 @@ size += int(self._size(queue)) if not size: break - self.sqs.purge_queue(QueueUrl=q) + self.sqs(queue=queue).purge_queue(QueueUrl=q) return size def close(self): super(Channel, self).close() # if self._asynsqs: # try: - # self.asynsqs.close() + # self.asynsqs().close() # except AttributeError as exc: # FIXME ??? # if "can't set attribute" not in str(exc): # raise - @property - def sqs(self): - if self._sqs is None: - session = boto3.session.Session( - region_name=self.region, - aws_access_key_id=self.conninfo.userid, - aws_secret_access_key=self.conninfo.password, + def new_sqs_client(self, region, access_key_id, secret_access_key): + session = boto3.session.Session( + region_name=region, + aws_access_key_id=access_key_id, + aws_secret_access_key=secret_access_key, + ) + is_secure = self.is_secure if self.is_secure is not None else True + client_kwargs = { + 'use_ssl': is_secure + } + if self.endpoint_url is not None: + client_kwargs['endpoint_url'] = self.endpoint_url + return session.client('sqs', **client_kwargs) + + def sqs(self, queue=None): + if queue is not None and self.predefined_queues: + if queue in self._predefined_queue_clients: + return self._predefined_queue_clients[queue] + if queue not in self.predefined_queues: + raise UndefinedQueueException(( + "Queue with name '{}' must be defined in " + "'predefined_queues'." + ).format(queue)) + q = self.predefined_queues[queue] + c = self._predefined_queue_clients[queue] = self.new_sqs_client( + region=q.get('region', self.region), + access_key_id=q['access_key_id'] or self.conninfo.userid, + secret_access_key=q['secret_access_key'] or self.conninfo.password, # noqa: E501 ) - is_secure = self.is_secure if self.is_secure is not None else True - client_kwargs = { - 'use_ssl': is_secure - } - if self.endpoint_url is not None: - client_kwargs['endpoint_url'] = self.endpoint_url - self._sqs = session.client('sqs', **client_kwargs) - return self._sqs + return c - @property - def asynsqs(self): - if self._asynsqs is None: - self._asynsqs = AsyncSQSConnection( - sqs_connection=self.sqs, - region=self.region + if self._sqs is not None: + return self._sqs + + c = self._sqs = self.new_sqs_client( + region=self.region, + access_key_id=self.conninfo.userid, + secret_access_key=self.conninfo.password, + ) + return c + + def asynsqs(self, queue=None): + if queue is not None and self.predefined_queues: + if queue in self._predefined_queue_async_clients: + return self._predefined_queue_async_clients[queue] + if queue not in self.predefined_queues: + raise UndefinedQueueException(( + "Queue with name '{}' must be defined in " + "'predefined_queues'." + ).format(queue)) + q = self.predefined_queues[queue] + c = self._predefined_queue_async_clients[queue] = AsyncSQSConnection( # noqa: E501 + sqs_connection=self.sqs(queue=queue), + region=q.get('region', self.region) ) - return self._asynsqs + return c + + if self._asynsqs is not None: + return self._asynsqs + + c = self._asynsqs = AsyncSQSConnection( + sqs_connection=self.sqs(queue=queue), + region=self.region + ) + return c @property def conninfo(self): @@ -471,6 +581,11 @@ self.default_visibility_timeout) @cached_property + def predefined_queues(self): + """Map of queue_name to predefined queue settings.""" + return self.transport_options.get('predefined_queues', None) + + @cached_property def queue_name_prefix(self): return self.transport_options.get('queue_name_prefix', '') diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/pyamqp.py new/kombu-4.6.8/kombu/transport/pyamqp.py --- old/kombu-4.6.7/kombu/transport/pyamqp.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/kombu/transport/pyamqp.py 2020-03-02 11:00:55.000000000 +0100 @@ -176,4 +176,5 @@ super(SSLTransport, self).__init__(*args, **kwargs) # ugh, not exactly pure, but hey, it's python. - self.client.ssl = True + if not self.client.ssl: # not dict or False + self.client.ssl = True diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/transport/redis.py new/kombu-4.6.8/kombu/transport/redis.py --- old/kombu-4.6.7/kombu/transport/redis.py 2019-12-07 15:31:27.000000000 +0100 +++ new/kombu-4.6.8/kombu/transport/redis.py 2020-03-02 11:00:55.000000000 +0100 @@ -124,7 +124,7 @@ try: with client.pipeline(True) as pipe: pipe.watch(name) - if pipe.get(name) == lock_id: + if bytes_to_str(pipe.get(name)) == lock_id: pipe.multi() pipe.delete(name) pipe.execute() @@ -349,9 +349,9 @@ for channel in self._channels: # only if subclient property is cached client = channel.__dict__.get('subclient') - if client is not None: - if callable(getattr(client, "check_health", None)): - client.check_health() + if client is not None \ + and callable(getattr(client, 'check_health', None)): + client.check_health() def on_readable(self, fileno): chan, type = self._fd_to_chan[fileno] @@ -427,7 +427,9 @@ socket_connect_timeout = None socket_keepalive = None socket_keepalive_options = None + retry_on_timeout = None max_connections = 10 + health_check_interval = DEFAULT_HEALTH_CHECK_INTERVAL #: Transport option to disable fanout keyprefix. #: Can also be string, in which case it changes the default #: prefix ('/{db}.') into to something else. The prefix must @@ -491,14 +493,15 @@ 'socket_keepalive_options', 'queue_order_strategy', 'max_connections', + 'health_check_interval', + 'retry_on_timeout', 'priority_steps') # <-- do not add comma here! ) connection_class = redis.Connection if redis else None def __init__(self, *args, **kwargs): - super_ = super(Channel, self) - super_.__init__(*args, **kwargs) + super(Channel, self).__init__(*args, **kwargs) if not self.ack_emulation: # disable visibility timeout self.QoS = virtual.QoS @@ -775,7 +778,9 @@ def _q_for_pri(self, queue, pri): pri = self.priority(pri) - return '%s%s%s' % ((queue, self.sep, pri) if pri else (queue, '', '')) + if pri: + return "{{{}}}{}{}".format(queue, self.sep, pri) + return queue def priority(self, n): steps = self.priority_steps @@ -905,14 +910,19 @@ 'socket_connect_timeout': self.socket_connect_timeout, 'socket_keepalive': self.socket_keepalive, 'socket_keepalive_options': self.socket_keepalive_options, + 'health_check_interval': self.health_check_interval, + 'retry_on_timeout': self.retry_on_timeout, } conn_class = self.connection_class + + # If the connection class does not support the `health_check_interval` + # argument then remove it. if ( hasattr(conn_class, '__init__') and - accepts_argument(conn_class.__init__, 'health_check_interval') + not accepts_argument(conn_class.__init__, 'health_check_interval') ): - connparams['health_check_interval'] = DEFAULT_HEALTH_CHECK_INTERVAL + connparams.pop('health_check_interval') if conninfo.ssl: # Connection(ssl={}) must be a dict containing the keys: @@ -1064,8 +1074,12 @@ [add_reader(fd, on_readable, fd) for fd in cycle.fds] loop.on_tick.add(on_poll_start) loop.call_repeatedly(10, cycle.maybe_restore_messages) + health_check_interval = connection.client.transport_options.get( + 'health_check_interval', + DEFAULT_HEALTH_CHECK_INTERVAL + ) loop.call_repeatedly( - DEFAULT_HEALTH_CHECK_INTERVAL, + health_check_interval, cycle.maybe_check_subclient_health ) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu/utils/url.py new/kombu-4.6.8/kombu/utils/url.py --- old/kombu-4.6.7/kombu/utils/url.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/kombu/utils/url.py 2020-03-02 11:00:55.000000000 +0100 @@ -49,10 +49,8 @@ keys = [key for key in query.keys() if key.startswith('ssl_')] for key in keys: if key == 'ssl_cert_reqs': - if ssl_available: - query[key] = getattr(ssl, query[key]) - else: - query[key] = None + query[key] = parse_ssl_cert_reqs(query[key]) + if query[key] is None: logger.warning('Defaulting to insecure SSL behaviour.') if 'ssl' not in query: @@ -120,3 +118,20 @@ if isinstance(url, string_t) and '://' in url: return sanitize_url(url, mask) return url + + +def parse_ssl_cert_reqs(query_value): + # type: (str) -> Any + """Given the query parameter for ssl_cert_reqs, return the SSL constant or None.""" + if ssl_available: + query_value_to_constant = { + 'CERT_REQUIRED': ssl.CERT_REQUIRED, + 'CERT_OPTIONAL': ssl.CERT_OPTIONAL, + 'CERT_NONE': ssl.CERT_NONE, + 'required': ssl.CERT_REQUIRED, + 'optional': ssl.CERT_OPTIONAL, + 'none': ssl.CERT_NONE, + } + return query_value_to_constant[query_value] + else: + return None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/kombu.egg-info/PKG-INFO new/kombu-4.6.8/kombu.egg-info/PKG-INFO --- old/kombu-4.6.7/kombu.egg-info/PKG-INFO 2019-12-07 15:45:55.000000000 +0100 +++ new/kombu-4.6.8/kombu.egg-info/PKG-INFO 2020-03-02 11:08:47.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 2.1 Name: kombu -Version: 4.6.7 +Version: 4.6.8 Summary: Messaging library for Python. Home-page: https://kombu.readthedocs.io Author: Ask Solem @@ -28,17 +28,17 @@ Classifier: Topic :: System :: Networking Classifier: Topic :: Software Development :: Libraries :: Python Modules Requires-Python: >=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.* -Provides-Extra: redis -Provides-Extra: yaml Provides-Extra: msgpack -Provides-Extra: zookeeper +Provides-Extra: yaml +Provides-Extra: redis +Provides-Extra: mongodb Provides-Extra: sqs -Provides-Extra: pyro +Provides-Extra: zookeeper +Provides-Extra: sqlalchemy Provides-Extra: librabbitmq -Provides-Extra: qpid +Provides-Extra: pyro Provides-Extra: slmq -Provides-Extra: azureservicebus Provides-Extra: azurestoragequeues -Provides-Extra: mongodb -Provides-Extra: sqlalchemy +Provides-Extra: azureservicebus +Provides-Extra: qpid Provides-Extra: consul diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/asynchronous/aws/sqs/test_connection.py new/kombu-4.6.8/t/unit/asynchronous/aws/sqs/test_connection.py --- old/kombu-4.6.7/t/unit/asynchronous/aws/sqs/test_connection.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/t/unit/asynchronous/aws/sqs/test_connection.py 2020-03-02 11:00:55.000000000 +0100 @@ -86,7 +86,12 @@ def test_receive_message(self): queue = Mock(name='queue') - self.x.receive_message(queue, 4, callback=self.callback) + self.x.receive_message( + queue, + self.x.get_queue_url('queue'), + 4, + callback=self.callback, + ) self.x.get_list.assert_called_with( 'ReceiveMessage', {'MaxNumberOfMessages': 4}, [('Message', AsyncMessage)], @@ -96,7 +101,13 @@ def test_receive_message__with_visibility_timeout(self): queue = Mock(name='queue') - self.x.receive_message(queue, 4, 3666, callback=self.callback) + self.x.receive_message( + queue, + self.x.get_queue_url('queue'), + 4, + 3666, + callback=self.callback, + ) self.x.get_list.assert_called_with( 'ReceiveMessage', { 'MaxNumberOfMessages': 4, @@ -110,7 +121,11 @@ def test_receive_message__with_wait_time_seconds(self): queue = Mock(name='queue') self.x.receive_message( - queue, 4, wait_time_seconds=303, callback=self.callback, + queue, + self.x.get_queue_url('queue'), + 4, + wait_time_seconds=303, + callback=self.callback, ) self.x.get_list.assert_called_with( 'ReceiveMessage', { @@ -125,7 +140,11 @@ def test_receive_message__with_attributes(self): queue = Mock(name='queue') self.x.receive_message( - queue, 4, attributes=['foo', 'bar'], callback=self.callback, + queue, + self.x.get_queue_url('queue'), + 4, + attributes=['foo', 'bar'], + callback=self.callback, ) self.x.get_list.assert_called_with( 'ReceiveMessage', { diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/test_connection.py new/kombu-4.6.8/t/unit/test_connection.py --- old/kombu-4.6.7/t/unit/test_connection.py 2019-12-07 15:31:27.000000000 +0100 +++ new/kombu-4.6.8/t/unit/test_connection.py 2020-03-02 11:00:55.000000000 +0100 @@ -117,6 +117,7 @@ clone = deepcopy(conn) assert clone.alt == ['amqp://host'] + @skip.unless_module('sqlalchemy') def test_parse_generated_as_uri_pg(self): conn = Connection(self.pg_url) assert conn.as_uri() == self.pg_nopass diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_SQS.py new/kombu-4.6.8/t/unit/transport/test_SQS.py --- old/kombu-4.6.7/t/unit/transport/test_SQS.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/t/unit/transport/test_SQS.py 2020-03-02 11:00:55.000000000 +0100 @@ -23,6 +23,20 @@ SQS_Channel_sqs = SQS.Channel.sqs +example_predefined_queues = { + 'queue-1': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-1', + 'access_key_id': 'a', + 'secret_access_key': 'b', + }, + 'queue-2': { + 'url': 'https://sqs.us-east-1.amazonaws.com/xxx/queue-2', + 'access_key_id': 'c', + 'secret_access_key': 'd', + }, +} + + class SQSMessageMock(object): def __init__(self): """ @@ -49,19 +63,14 @@ class SQSClientMock(object): - def __init__(self): + def __init__(self, QueueName='unittest_queue'): """ Imitate the SQS Client from boto3. """ self._receive_messages_calls = 0 # _queues doesn't exist on the real client, here for testing. self._queues = {} - for n in range(1): - name = 'q_{}'.format(n) - url = 'sqs://q_{}'.format(n) - self.create_queue(QueueName=name) - - url = self.create_queue(QueueName='unittest_queue')['QueueUrl'] + url = self.create_queue(QueueName=QueueName)['QueueUrl'] self.send_message(QueueUrl=url, MessageBody='hello') def _get_q(self, url): @@ -73,7 +82,7 @@ def create_queue(self, QueueName=None, Attributes=None): q = self._queues[QueueName] = QueueMock( - 'sqs://' + QueueName, + 'https://sqs.us-east-1.amazonaws.com/xxx/' + QueueName, Attributes, ) return {'QueueUrl': q.url} @@ -133,10 +142,21 @@ # Mock the sqs() method that returns an SQSConnection object and # instead return an SQSConnectionMock() object. - self.sqs_conn_mock = SQSClientMock() + sqs_conn_mock = SQSClientMock() + self.sqs_conn_mock = sqs_conn_mock + + predefined_queues_sqs_conn_mocks = { + 'queue-1': SQSClientMock(QueueName='queue-1'), + 'queue-2': SQSClientMock(QueueName='queue-2'), + } def mock_sqs(): - return self.sqs_conn_mock + def sqs(self, queue=None): + if queue in predefined_queues_sqs_conn_mocks: + return predefined_queues_sqs_conn_mocks[queue] + return sqs_conn_mock + + return sqs SQS.Channel.sqs = mock_sqs() @@ -218,7 +238,7 @@ expected_endpoint_url = 'http://localhost:5493' assert self.channel.endpoint_url == expected_endpoint_url boto3_sqs = SQS_Channel_sqs.__get__(self.channel, SQS.Channel) - assert boto3_sqs._endpoint.host == expected_endpoint_url + assert boto3_sqs()._endpoint.host == expected_endpoint_url def test_none_hostname_persists(self): conn = Connection(hostname=None, transport=SQS.Transport) @@ -350,7 +370,7 @@ assert message == results def test_redelivered(self): - self.channel.sqs.change_message_visibility = \ + self.channel.sqs().change_message_visibility = \ Mock(name='change_message_visibility') message = { 'redelivered': True, @@ -480,9 +500,61 @@ mock_messages = Mock() mock_messages.delivery_info = message self.channel.qos.append(mock_messages, 1) - self.channel.sqs.delete_message = Mock() + self.channel.sqs().delete_message = Mock() self.channel.basic_ack(1) self.sqs_conn_mock.delete_message.assert_called_with( QueueUrl=message['sqs_queue'], ReceiptHandle=message['sqs_message']['ReceiptHandle'] ) + + def test_predefined_queues_primes_queue_cache(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + assert 'queue-1' in channel._queue_cache + assert 'queue-2' in channel._queue_cache + + def test_predefined_queues_new_queue_raises_if_queue_not_exists(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + with pytest.raises(SQS.UndefinedQueueException): + channel._new_queue('queue-99') + + def test_predefined_queues_get_from_sqs(self): + connection = Connection(transport=SQS.Transport, transport_options={ + 'predefined_queues': example_predefined_queues, + }) + channel = connection.channel() + + def message_to_python(message, queue_name, queue): + return message + + channel._message_to_python = Mock(side_effect=message_to_python) + + queue_name = "queue-1" + + exchange = Exchange('test_SQS', type='direct') + p = messaging.Producer(channel, exchange, routing_key=queue_name) + queue = Queue(queue_name, exchange, queue_name) + queue(channel).declare() + + # Getting a single message + p.publish('message') + result = channel._get(queue_name) + + assert 'Body' in result.keys() + + # Getting many messages + for i in range(3): + p.publish('message: {0}'.format(i)) + + channel.connection._deliver = Mock(name='_deliver') + channel._get_bulk(queue_name, max_if_unlimited=3) + channel.connection._deliver.assert_called() + + assert len(channel.sqs(queue_name)._queues[queue_name].messages) == 0 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_pyamqp.py new/kombu-4.6.8/t/unit/transport/test_pyamqp.py --- old/kombu-4.6.7/t/unit/transport/test_pyamqp.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/t/unit/transport/test_pyamqp.py 2020-03-02 11:00:55.000000000 +0100 @@ -91,6 +91,15 @@ self.transport.create_channel(connection) connection.channel.assert_called_with() + def test_ssl_cert_passed(self): + ssl_dict={ + 'ca_certs': '/etc/pki/tls/certs/something.crt', + 'cert_reqs': "ssl.CERT_REQUIRED", + } + ssl_dict_copy = {k: ssl_dict[k] for k in ssl_dict} + connection = Connection('amqps://', ssl=ssl_dict_copy) + assert connection.transport.client.ssl == ssl_dict + def test_driver_version(self): assert self.transport.driver_version() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/transport/test_redis.py new/kombu-4.6.8/t/unit/transport/test_redis.py --- old/kombu-4.6.7/t/unit/transport/test_redis.py 2019-12-07 15:31:27.000000000 +0100 +++ new/kombu-4.6.8/t/unit/transport/test_redis.py 2020-03-02 11:00:55.000000000 +0100 @@ -14,6 +14,7 @@ from kombu.five import Empty, Queue as _Queue, bytes_if_py2 from kombu.transport import virtual from kombu.utils import eventio # patch poll +from kombu.utils.encoding import str_to_bytes from kombu.utils.json import dumps @@ -311,6 +312,18 @@ conn.channel() pool.disconnect.assert_not_called() + def test_get_redis_ConnectionError(self): + from redis.exceptions import ConnectionError + from kombu.transport.redis import get_redis_ConnectionError + connection_error = get_redis_ConnectionError() + assert connection_error == ConnectionError + + def test_after_fork_cleanup_channel(self): + from kombu.transport.redis import _after_fork_cleanup_channel + channel = Mock() + _after_fork_cleanup_channel(channel) + channel._after_fork.assert_called_once() + def test_after_fork(self): self.channel._pool = None self.channel._after_fork() @@ -696,6 +709,20 @@ path = connection_parameters['path'] assert (password, path) == (None, '/var/run/redis.sock') + def test_connparams_health_check_interval_not_supported(self): + with patch('kombu.transport.redis.Channel._create_client'): + with Connection('redis+socket:///tmp/redis.sock') as conn: + conn.default_channel.connection_class = \ + Mock(name='connection_class') + connparams = conn.default_channel._connparams() + assert 'health_check_interval' not in connparams + + def test_connparams_health_check_interval_supported(self): + with patch('kombu.transport.redis.Channel._create_client'): + with Connection('redis+socket:///tmp/redis.sock') as conn: + connparams = conn.default_channel._connparams() + assert connparams['health_check_interval'] == 25 + def test_rotate_cycle_ValueError(self): cycle = self.channel._queue_cycle cycle.update(['kramer', 'jerry']) @@ -733,6 +760,7 @@ transport.cycle = Mock(name='cycle') transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} conn = Mock(name='conn') + conn.client = Mock(name='client', transport_options={}) loop = Mock(name='loop') redis.Transport.register_with_event_loop(transport, conn, loop) transport.cycle.on_poll_init.assert_called_with(loop.poller) @@ -750,6 +778,31 @@ call(13, transport.on_readable, 13), ]) + def test_configurable_health_check(self): + transport = self.connection.transport + transport.cycle = Mock(name='cycle') + transport.cycle.fds = {12: 'LISTEN', 13: 'BRPOP'} + conn = Mock(name='conn') + conn.client = Mock(name='client', transport_options={ + 'health_check_interval': 15, + }) + loop = Mock(name='loop') + redis.Transport.register_with_event_loop(transport, conn, loop) + transport.cycle.on_poll_init.assert_called_with(loop.poller) + loop.call_repeatedly.assert_has_calls([ + call(10, transport.cycle.maybe_restore_messages), + call(15, transport.cycle.maybe_check_subclient_health), + ]) + loop.on_tick.add.assert_called() + on_poll_start = loop.on_tick.add.call_args[0][0] + + on_poll_start() + transport.cycle.on_poll_start.assert_called_with() + loop.add_reader.assert_has_calls([ + call(12, transport.on_readable, 12), + call(13, transport.on_readable, 13), + ]) + def test_transport_on_readable(self): transport = self.connection.transport cycle = transport.cycle = Mock(name='cyle') @@ -1311,13 +1364,13 @@ client.setnx.return_value = True client.pipeline = ContextMock() pipe = client.pipeline.return_value - pipe.get.return_value = lock_id + pipe.get.return_value = str_to_bytes(lock_id) # redis gives bytes held = False with redis.Mutex(client, 'foo1', 100): held = True assert held client.setnx.assert_called_with('foo1', lock_id) - pipe.get.return_value = 'yyy' + pipe.get.return_value = b'yyy' held = False with redis.Mutex(client, 'foo1', 100): held = True @@ -1325,7 +1378,7 @@ # Did not win client.expire.reset_mock() - pipe.get.return_value = lock_id + pipe.get.return_value = str_to_bytes(lock_id) client.setnx.return_value = False with pytest.raises(redis.MutexHeld): held = False @@ -1389,7 +1442,8 @@ connection_class=mock.ANY, db=0, max_connections=10, min_other_sentinels=0, password=None, sentinel_kwargs=None, socket_connect_timeout=None, socket_keepalive=None, - socket_keepalive_options=None, socket_timeout=None) + socket_keepalive_options=None, socket_timeout=None, + retry_on_timeout=None) master_for = patched.return_value.master_for master_for.assert_called() @@ -1411,7 +1465,8 @@ connection_class=mock.ANY, db=0, max_connections=10, min_other_sentinels=0, password=None, sentinel_kwargs=None, socket_connect_timeout=None, socket_keepalive=None, - socket_keepalive_options=None, socket_timeout=None) + socket_keepalive_options=None, socket_timeout=None, + retry_on_timeout=None) master_for = patched.return_value.master_for master_for.assert_called() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-4.6.7/t/unit/utils/test_url.py new/kombu-4.6.8/t/unit/utils/test_url.py --- old/kombu-4.6.7/t/unit/utils/test_url.py 2019-07-12 07:26:52.000000000 +0200 +++ new/kombu-4.6.8/t/unit/utils/test_url.py 2020-03-02 11:00:55.000000000 +0100 @@ -2,7 +2,6 @@ try: from urllib.parse import urlencode - except ImportError: from urllib import urlencode @@ -12,6 +11,7 @@ import kombu.utils.url from kombu.utils.url import as_url, parse_url, maybe_sanitize_url +from kombu.utils.url import parse_ssl_cert_reqs def test_parse_url(): @@ -51,7 +51,7 @@ def test_ssl_parameters(): url = 'rediss://user:password@host:6379/0?' querystring = urlencode({ - 'ssl_cert_reqs': 'CERT_REQUIRED', + 'ssl_cert_reqs': 'required', 'ssl_ca_certs': '/var/ssl/myca.pem', 'ssl_certfile': '/var/ssl/server-cert.pem', 'ssl_keyfile': '/var/ssl/priv/worker-key.pem', @@ -69,3 +69,24 @@ assert kwargs['ssl']['ssl_cert_reqs'] is None kombu.utils.url.ssl_available = True + + [email protected]('query_param,ssl_available,expected', [ + ('CERT_REQUIRED', True, ssl.CERT_REQUIRED), + ('CERT_OPTIONAL', True, ssl.CERT_OPTIONAL), + ('CERT_NONE', True, ssl.CERT_NONE), + ('required', True, ssl.CERT_REQUIRED), + ('optional', True, ssl.CERT_OPTIONAL), + ('none', True, ssl.CERT_NONE), + ('CERT_REQUIRED', None, None), +]) +def test_parse_ssl_cert_reqs(query_param, ssl_available, expected): + kombu.utils.url.ssl_available = ssl_available + result = parse_ssl_cert_reqs(query_param) + kombu.utils.url.ssl_available = True + assert result == expected + + +def test_parse_ssl_cert_reqs_bad_value(): + with pytest.raises(KeyError): + parse_ssl_cert_reqs('badvalue')
