Hello community, here is the log from the commit of package python-kombu for openSUSE:Factory checked in at 2012-11-12 11:37:00 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-kombu (Old) and /work/SRC/openSUSE:Factory/.python-kombu.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-kombu", Maintainer is "cth...@suse.com" Changes: -------- --- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes 2012-10-07 19:59:41.000000000 +0200 +++ /work/SRC/openSUSE:Factory/.python-kombu.new/python-kombu.changes 2012-11-12 11:37:03.000000000 +0100 @@ -1,0 +2,18 @@ +Sat Nov 3 02:58:14 UTC 2012 - alexan...@exatati.com.br + +- Update to 2.4.8: + - Redis: Fair queue cyle implementation improved (Issue #166). + Contributed by Kevin McCarthy. + - Redis: Number of messages to restore in one iteration is now + unlimited, but can be configured using the unacked_restore_limit + transport option. + - Redis: A Redis based mutex is now used while restoring messages. + - LamportClock.adjust now returns the new clock value. + - Heartbeats can now be specified in URLs. + Fix contributed by Mher Movsisyan. + - Kombu can now be used with PyDev, PyCharm and other static + analysis tools. + - Fixes problem with msgpack on Python 3 (Issue #162). + Fix contributed by Jasper Bryant-Greene + +------------------------------------------------------------------- Old: ---- kombu-2.4.7.tar.gz New: ---- kombu-2.4.8.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-kombu.spec ++++++ --- /var/tmp/diff_new_pack.24EIwi/_old 2012-11-12 11:37:05.000000000 +0100 +++ /var/tmp/diff_new_pack.24EIwi/_new 2012-11-12 11:37:05.000000000 +0100 @@ -16,7 +16,7 @@ # Name: python-kombu -Version: 2.4.7 +Version: 2.4.8 Release: 0 License: BSD-2-Clause Summary: AMQP Messaging Framework for Python ++++++ kombu-2.4.7.tar.gz -> kombu-2.4.8.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/AUTHORS new/kombu-2.4.8/AUTHORS --- old/kombu-2.4.7/AUTHORS 2012-08-29 16:46:46.000000000 +0200 +++ new/kombu-2.4.8/AUTHORS 2012-11-02 17:35:08.000000000 +0100 @@ -38,6 +38,7 @@ John Watson <j...@disqus.com> Joseph Crosland <jcrosl...@flumotion.com> Keith Fitzgerald <ghostroc...@me.com> +Kevin McCarthy <m...@kevinmccarthy.org> Mahendra M <mahendr...@infosys.com> Marcin Lulek (ergo) <i...@webreactor.eu> Mher Movsisyan <mher.movsis...@gmail.com> diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/Changelog new/kombu-2.4.8/Changelog --- old/kombu-2.4.7/Changelog 2012-09-18 16:11:56.000000000 +0200 +++ new/kombu-2.4.8/Changelog 2012-11-02 17:54:07.000000000 +0100 @@ -4,6 +4,36 @@ Change history ================ +.. _version-2.4.8: + +2.4.8 +===== +:release-date: 2012-11-02 05:00 P.M UTC + +- Redis: Fair queue cyle implementation improved (Issue #166). + + Contributed by Kevin McCarthy. + +- Redis: Number of messages to restore in one iteration is now unlimited, + but can be configured using the unacked_restore_limit transport option. + +- Redis: A Redis based mutex is now used while restoring messages. + +- LamportClock.adjust now returns the new clock value. + +- Heartbeats can now be specified in URLs. + + Fix contributed by Mher Movsisyan. + +- Kombu can now be used with PyDev, PyCharm and other static analysis tools. + +- Fixes problem with msgpack on Python 3 (Issue #162). + + Fix contributed by Jasper Bryant-Greene + + + + .. _version-2.4.7: 2.4.7 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/PKG-INFO new/kombu-2.4.8/PKG-INFO --- old/kombu-2.4.7/PKG-INFO 2012-09-18 16:13:08.000000000 +0200 +++ new/kombu-2.4.8/PKG-INFO 2012-11-02 17:55:55.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.0 Name: kombu -Version: 2.4.7 +Version: 2.4.8 Summary: Messaging Framework for Python Home-page: http://kombu.readthedocs.org Author: Ask Solem @@ -10,7 +10,7 @@ kombu - Messaging Framework for Python ======================================== - :Version: 2.4.7 + :Version: 2.4.8 `Kombu` is a messaging framework for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/README new/kombu-2.4.8/README --- old/kombu-2.4.7/README 2012-09-18 16:12:13.000000000 +0200 +++ new/kombu-2.4.8/README 2012-11-02 17:54:19.000000000 +0100 @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.4.7 +:Version: 2.4.8 `Kombu` is a messaging framework for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/README.rst new/kombu-2.4.8/README.rst --- old/kombu-2.4.7/README.rst 2012-09-18 16:12:13.000000000 +0200 +++ new/kombu-2.4.8/README.rst 2012-11-02 17:54:19.000000000 +0100 @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.4.7 +:Version: 2.4.8 `Kombu` is a messaging framework for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/docs/changelog.rst new/kombu-2.4.8/docs/changelog.rst --- old/kombu-2.4.7/docs/changelog.rst 2012-09-18 16:11:56.000000000 +0200 +++ new/kombu-2.4.8/docs/changelog.rst 2012-11-02 17:54:07.000000000 +0100 @@ -4,6 +4,36 @@ Change history ================ +.. _version-2.4.8: + +2.4.8 +===== +:release-date: 2012-11-02 05:00 P.M UTC + +- Redis: Fair queue cyle implementation improved (Issue #166). + + Contributed by Kevin McCarthy. + +- Redis: Number of messages to restore in one iteration is now unlimited, + but can be configured using the unacked_restore_limit transport option. + +- Redis: A Redis based mutex is now used while restoring messages. + +- LamportClock.adjust now returns the new clock value. + +- Heartbeats can now be specified in URLs. + + Fix contributed by Mher Movsisyan. + +- Kombu can now be used with PyDev, PyCharm and other static analysis tools. + +- Fixes problem with msgpack on Python 3 (Issue #162). + + Fix contributed by Jasper Bryant-Greene + + + + .. _version-2.4.7: 2.4.7 diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/docs/introduction.rst new/kombu-2.4.8/docs/introduction.rst --- old/kombu-2.4.7/docs/introduction.rst 2012-09-18 16:12:13.000000000 +0200 +++ new/kombu-2.4.8/docs/introduction.rst 2012-11-02 17:54:19.000000000 +0100 @@ -2,7 +2,7 @@ kombu - Messaging Framework for Python ======================================== -:Version: 2.4.7 +:Version: 2.4.8 `Kombu` is a messaging framework for Python. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/examples/simple_task_queue/client.py new/kombu-2.4.8/examples/simple_task_queue/client.py --- old/kombu-2.4.7/examples/simple_task_queue/client.py 2012-07-30 15:20:21.000000000 +0200 +++ new/kombu-2.4.8/examples/simple_task_queue/client.py 2012-11-02 15:47:51.000000000 +0100 @@ -18,6 +18,7 @@ maybe_declare(task_exchange, producer.channel) producer.publish(payload, serializer='pickle', compression='bzip2', + exchange=task_exchange, routing_key=routing_key) if __name__ == '__main__': diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/__init__.py new/kombu-2.4.8/kombu/__init__.py --- old/kombu-2.4.7/kombu/__init__.py 2012-09-18 16:12:13.000000000 +0200 +++ new/kombu-2.4.8/kombu/__init__.py 2012-11-02 17:54:19.000000000 +0100 @@ -1,7 +1,7 @@ """Messaging Framework for Python""" from __future__ import absolute_import -VERSION = (2, 4, 7) +VERSION = (2, 4, 8) __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:]) __author__ = 'Ask Solem' __contact__ = 'a...@celeryproject.org' @@ -21,6 +21,19 @@ else: raise Exception('Kombu requires Python versions 2.5 or later.') +STATICA_HACK = True +globals()['kcah_acitats'[::-1].upper()] = False +if STATICA_HACK: + # This is never executed, but tricks static analyzers (PyDev, PyCharm, + # pylint, etc.) into knowing the types of these symbols, and what + # they contain. + from kombu.connection import Connection, BrokerConnection # noqa + from kombu.entitiy import Exchange, Queue, binding # noqa + from kombu.messaging import Consumer, Producer # noqa + from kombu.pools import connections, producers # noqa + from kombu.utils.url import parse_url # noqa + from kombu.common import eventloop, uuid # noqa + # Lazy loading. # - See werkzeug/__init__.py for the rationale behind this. from types import ModuleType diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/clocks.py new/kombu-2.4.8/kombu/clocks.py --- old/kombu-2.4.7/kombu/clocks.py 2012-08-29 16:46:46.000000000 +0200 +++ new/kombu-2.4.8/kombu/clocks.py 2012-11-02 17:35:09.000000000 +0100 @@ -62,6 +62,7 @@ def adjust(self, other): with self.mutex: self.value = max(self.value, other) + 1 + return self.value def forward(self): with self.mutex: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/connection.py new/kombu-2.4.8/kombu/connection.py --- old/kombu-2.4.7/kombu/connection.py 2012-09-18 16:10:42.000000000 +0200 +++ new/kombu-2.4.8/kombu/connection.py 2012-11-02 17:35:09.000000000 +0100 @@ -149,7 +149,7 @@ self.connect_timeout = connect_timeout self.ssl = ssl self.transport_cls = transport - self.heartbeat = heartbeat + self.heartbeat = heartbeat and float(heartbeat) def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs): if self._logger: # pragma: no cover diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/pidbox.py new/kombu-2.4.8/kombu/pidbox.py --- old/kombu-2.4.7/kombu/pidbox.py 2012-09-18 16:10:42.000000000 +0200 +++ new/kombu-2.4.8/kombu/pidbox.py 2012-11-02 17:35:09.000000000 +0100 @@ -242,14 +242,16 @@ responses.append(body) consumer.register_callback(on_message) - with consumer: - for i in limit and range(limit) or count(): - try: - self.connection.drain_events(timeout=timeout) - except socket.timeout: - break - return responses - chan.after_reply_message_received(queue.name) + try: + with consumer: + for i in limit and range(limit) or count(): + try: + self.connection.drain_events(timeout=timeout) + except socket.timeout: + break + return responses + finally: + chan.after_reply_message_received(queue.name) def _get_exchange(self, namespace, type): return Exchange(self.exchange_fmt % namespace, diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/serialization.py new/kombu-2.4.8/kombu/serialization.py --- old/kombu-2.4.7/kombu/serialization.py 2012-08-29 16:46:46.000000000 +0200 +++ new/kombu-2.4.8/kombu/serialization.py 2012-11-02 17:35:09.000000000 +0100 @@ -349,7 +349,8 @@ """See http://msgpack.sourceforge.net/""" try: try: - from msgpack import packb as dumps, unpackb as loads + from msgpack import packb as dumps, unpackb + loads = lambda s: unpackb(s, encoding='utf-8') except ImportError: # msgpack < 0.2.0 and Python 2.5 from msgpack import packs as dumps, unpacks as loads # noqa diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/transport/amqplib.py new/kombu-2.4.8/kombu/transport/amqplib.py --- old/kombu-2.4.7/kombu/transport/amqplib.py 2012-08-29 16:46:46.000000000 +0200 +++ new/kombu-2.4.8/kombu/transport/amqplib.py 2012-11-02 15:47:36.000000000 +0100 @@ -195,6 +195,9 @@ # http://bugs.python.org/issue10272 if 'timed out' in str(exc): raise socket.timeout() + # Non-blocking SSL sockets can throw SSLError + if 'The operation did not complete' in str(exc): + raise socket.timeout() raise finally: if prev != timeout: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu/transport/redis.py new/kombu-2.4.8/kombu/transport/redis.py --- old/kombu-2.4.7/kombu/transport/redis.py 2012-09-17 11:42:32.000000000 +0200 +++ new/kombu-2.4.8/kombu/transport/redis.py 2012-11-02 17:55:12.000000000 +0100 @@ -12,7 +12,7 @@ from __future__ import with_statement from bisect import bisect -from itertools import cycle, islice +from contextlib import contextmanager from time import time from Queue import Empty @@ -58,6 +58,33 @@ # queues manually. +class MutexHeld(Exception): + pass + + +@contextmanager +def Mutex(client, name, expire): + lock_id = uuid() + if client.setnx(name, lock_id): + client.expire(name, expire) + yield + else: + if not client.ttl(name): + client.expire(name, expire) + raise MutexHeld() + + pipe = client.pipeline(True) + try: + pipe.watch(name) + if pipe.get(name) == lock_id: + pipe.multi() + pipe.delete(name) + pipe.execute() + pipe.unwatch() + except redis.WatchError: + pass + + class QoS(virtual.QoS): restore_at_shutdown = True @@ -96,20 +123,26 @@ self._vrestore_count += 1 if (self._vrestore_count - 1) % interval: return + client = self.client ceil = time() - self.visibility_timeout - visible = self.client.zrevrangebyscore( - self.unacked_index_key, ceil, 0, - start=start, num=num, withscores=True) - for tag, score in visible or []: - self.restore_by_tag(tag) + try: + with Mutex(client, self.unacked_mutex_key, + self.unacked_mutex_expire): + visible = client.zrevrangebyscore( + self.unacked_index_key, ceil, 0, + start=num and start, num=num, withscores=True) + for tag, score in visible or []: + self.restore_by_tag(tag, client) + except MutexHeld: + pass - def restore_by_tag(self, tag): + def restore_by_tag(self, tag, client=None): + client = client or self.client p, _, _ = self._remove_from_indices(tag, - self.client.pipeline().hget(self.unacked_key, tag)) \ - .execute() + client.pipeline().hget(self.unacked_key, tag)).execute() if p: M, EX, RK = loads(p) - self.channel._do_restore_message(M, EX, RK) + self.channel._do_restore_message(M, EX, RK, client) @property def client(self): @@ -124,6 +157,14 @@ return self.channel.unacked_index_key @cached_property + def unacked_mutex_key(self): + return self.channel.unacked_mutex_key + + @cached_property + def unacked_mutex_expire(self): + return self.channel.unacked_mutex_expire + + @cached_property def visibility_timeout(self): return self.channel.visibility_timeout @@ -201,13 +242,17 @@ def on_poll_init(self, poller): self.poller = poller for channel in self._channels: - channel.qos.restore_visible() + return channel.qos.restore_visible( + num=channel.unacked_restore_limit, + ) def on_poll_empty(self): for channel in self._channels: if channel.active_queues: # only need to do this once, as they are not local to channel. - return channel.qos.restore_visible() + return channel.qos.restore_visible( + num=channel.unacked_restore_limit, + ) def handle_event(self, fileno, event): if event & READ: @@ -256,20 +301,26 @@ _fanout_queues = {} unacked_key = 'unacked' unacked_index_key = 'unacked_index' - visibility_timeout = 3600 # 1 hour + unacked_mutex_key = 'unacked_mutex' + unacked_mutex_expire = 300 # 5 minutes + unacked_restore_limit = None + visibility_timeout = 3600 # 1 hour priority_steps = PRIORITY_STEPS from_transport_options = (virtual.Channel.from_transport_options + ('unacked_key', 'unacked_index_key', + 'unacked_mutex_key', + 'unacked_mutex_expire', 'visibility_timeout', + 'unacked_restore_limit', 'priority_steps')) def __init__(self, *args, **kwargs): super_ = super(Channel, self) super_.__init__(*args, **kwargs) - self._queue_cycle = cycle([]) + self._queue_cycle = [] self.Client = self._get_client() self.ResponseError = self._get_response_error() self.active_fanout_queues = set() @@ -285,27 +336,29 @@ # are still waiting for data. self.connection_errors = self.connection.connection_errors - def _do_restore_message(self, payload, exchange, routing_key): + def _do_restore_message(self, payload, exchange, routing_key, client=None): + client = client or self._avail_client try: try: payload['headers']['redelivered'] = True except KeyError: pass for queue in self._lookup(exchange, routing_key): - self._avail_client.lpush(queue, dumps(payload)) + client.lpush(queue, dumps(payload)) except Exception: logger.critical('Could not restore message: %r', payload, exc_info=True) def _restore(self, message, payload=None): tag = message.delivery_tag - P, _ = self._avail_client.pipeline() \ - .hget(self.unacked_key, tag) \ - .hdel(self.unacked_key, tag) \ - .execute() + client = self._avail_client + P, _ = client.pipeline() \ + .hget(self.unacked_key, tag) \ + .hdel(self.unacked_key, tag) \ + .execute() if P: M, EX, RK = loads(P) - self._do_restore_message(M, EX, RK) + self._do_restore_message(M, EX, RK, client) def _next_delivery_tag(self): return uuid() @@ -391,6 +444,7 @@ if dest__item: dest, item = dest__item dest = dest.rsplit(self.sep, 1)[0] + self._rotate_cycle(dest) return loads(item), dest else: raise Empty() @@ -585,13 +639,25 @@ each queue is equally likely to be consumed from, so that a very busy queue will not block others. + This works by using Redis's `BRPOP` command and + by rotating the most recently used queue to the + and of the list. See Kombu github issue #166 for + more discussion of this method. + """ - self._queue_cycle = cycle(self.active_queues) + self._queue_cycle = list(self.active_queues) def _consume_cycle(self): """Get a fresh list of queues from the queue cycle.""" active = len(self.active_queues) - return list(islice(self._queue_cycle, 0, active + 1))[:active] + return self._queue_cycle[0:active] + + def _rotate_cycle(self, used): + """ + Move most recently used queue to end of list + """ + index = self._queue_cycle.index(used) + self._queue_cycle.append(self._queue_cycle.pop(index)) def _get_response_error(self): from redis import exceptions diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/kombu-2.4.7/kombu.egg-info/PKG-INFO new/kombu-2.4.8/kombu.egg-info/PKG-INFO --- old/kombu-2.4.7/kombu.egg-info/PKG-INFO 2012-09-18 16:13:04.000000000 +0200 +++ new/kombu-2.4.8/kombu.egg-info/PKG-INFO 2012-11-02 17:55:47.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.0 Name: kombu -Version: 2.4.7 +Version: 2.4.8 Summary: Messaging Framework for Python Home-page: http://kombu.readthedocs.org Author: Ask Solem @@ -10,7 +10,7 @@ kombu - Messaging Framework for Python ======================================== - :Version: 2.4.7 + :Version: 2.4.8 `Kombu` is a messaging framework for Python. -- To unsubscribe, e-mail: opensuse-commit+unsubscr...@opensuse.org For additional commands, e-mail: opensuse-commit+h...@opensuse.org