Hello community,

here is the log from the commit of package python-kombu for openSUSE:Factory 
checked in at 2012-11-12 11:37:00
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Comparing /work/SRC/openSUSE:Factory/python-kombu (Old)
 and      /work/SRC/openSUSE:Factory/.python-kombu.new (New)
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Package is "python-kombu", Maintainer is "cth...@suse.com"

Changes:
--------
--- /work/SRC/openSUSE:Factory/python-kombu/python-kombu.changes        
2012-10-07 19:59:41.000000000 +0200
+++ /work/SRC/openSUSE:Factory/.python-kombu.new/python-kombu.changes   
2012-11-12 11:37:03.000000000 +0100
@@ -1,0 +2,18 @@
+Sat Nov  3 02:58:14 UTC 2012 - alexan...@exatati.com.br
+
+- Update to 2.4.8:
+  - Redis: Fair queue cyle implementation improved (Issue #166).
+    Contributed by Kevin McCarthy.
+  - Redis: Number of messages to restore in one iteration is now
+    unlimited, but can be configured using the unacked_restore_limit
+    transport option.
+  - Redis: A Redis based mutex is now used while restoring messages.
+  - LamportClock.adjust now returns the new clock value.
+  - Heartbeats can now be specified in URLs.
+    Fix contributed by Mher Movsisyan.
+  - Kombu can now be used with PyDev, PyCharm and other static
+    analysis tools.
+  - Fixes problem with msgpack on Python 3 (Issue #162).
+    Fix contributed by Jasper Bryant-Greene
+
+-------------------------------------------------------------------

Old:
----
  kombu-2.4.7.tar.gz

New:
----
  kombu-2.4.8.tar.gz

++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

Other differences:
------------------
++++++ python-kombu.spec ++++++
--- /var/tmp/diff_new_pack.24EIwi/_old  2012-11-12 11:37:05.000000000 +0100
+++ /var/tmp/diff_new_pack.24EIwi/_new  2012-11-12 11:37:05.000000000 +0100
@@ -16,7 +16,7 @@
 #
 
 Name:           python-kombu
-Version:        2.4.7
+Version:        2.4.8
 Release:        0
 License:        BSD-2-Clause
 Summary:        AMQP Messaging Framework for Python

++++++ kombu-2.4.7.tar.gz -> kombu-2.4.8.tar.gz ++++++
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/AUTHORS new/kombu-2.4.8/AUTHORS
--- old/kombu-2.4.7/AUTHORS     2012-08-29 16:46:46.000000000 +0200
+++ new/kombu-2.4.8/AUTHORS     2012-11-02 17:35:08.000000000 +0100
@@ -38,6 +38,7 @@
 John Watson <j...@disqus.com>
 Joseph Crosland <jcrosl...@flumotion.com>
 Keith Fitzgerald <ghostroc...@me.com>
+Kevin McCarthy <m...@kevinmccarthy.org>
 Mahendra M <mahendr...@infosys.com>
 Marcin Lulek (ergo) <i...@webreactor.eu>
 Mher Movsisyan <mher.movsis...@gmail.com>
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/Changelog new/kombu-2.4.8/Changelog
--- old/kombu-2.4.7/Changelog   2012-09-18 16:11:56.000000000 +0200
+++ new/kombu-2.4.8/Changelog   2012-11-02 17:54:07.000000000 +0100
@@ -4,6 +4,36 @@
  Change history
 ================
 
+.. _version-2.4.8:
+
+2.4.8
+=====
+:release-date: 2012-11-02 05:00 P.M UTC
+
+- Redis:  Fair queue cyle implementation improved (Issue #166).
+
+    Contributed by Kevin McCarthy.
+
+- Redis: Number of messages to restore in one iteration is now unlimited,
+  but can be configured using the unacked_restore_limit transport option.
+
+- Redis: A Redis based mutex is now used while restoring messages.
+
+- LamportClock.adjust now returns the new clock value.
+
+- Heartbeats can now be specified in URLs.
+
+    Fix contributed by Mher Movsisyan.
+
+- Kombu can now be used with PyDev, PyCharm and other static analysis tools.
+
+- Fixes problem with msgpack on Python 3 (Issue #162).
+
+    Fix contributed by Jasper Bryant-Greene
+
+
+
+
 .. _version-2.4.7:
 
 2.4.7
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/PKG-INFO new/kombu-2.4.8/PKG-INFO
--- old/kombu-2.4.7/PKG-INFO    2012-09-18 16:13:08.000000000 +0200
+++ new/kombu-2.4.8/PKG-INFO    2012-11-02 17:55:55.000000000 +0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: kombu
-Version: 2.4.7
+Version: 2.4.8
 Summary: Messaging Framework for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -10,7 +10,7 @@
          kombu - Messaging Framework for Python
         ========================================
         
-        :Version: 2.4.7
+        :Version: 2.4.8
         
         `Kombu` is a messaging framework for Python.
         
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/README new/kombu-2.4.8/README
--- old/kombu-2.4.7/README      2012-09-18 16:12:13.000000000 +0200
+++ new/kombu-2.4.8/README      2012-11-02 17:54:19.000000000 +0100
@@ -2,7 +2,7 @@
  kombu - Messaging Framework for Python
 ========================================
 
-:Version: 2.4.7
+:Version: 2.4.8
 
 `Kombu` is a messaging framework for Python.
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/README.rst new/kombu-2.4.8/README.rst
--- old/kombu-2.4.7/README.rst  2012-09-18 16:12:13.000000000 +0200
+++ new/kombu-2.4.8/README.rst  2012-11-02 17:54:19.000000000 +0100
@@ -2,7 +2,7 @@
  kombu - Messaging Framework for Python
 ========================================
 
-:Version: 2.4.7
+:Version: 2.4.8
 
 `Kombu` is a messaging framework for Python.
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/docs/changelog.rst 
new/kombu-2.4.8/docs/changelog.rst
--- old/kombu-2.4.7/docs/changelog.rst  2012-09-18 16:11:56.000000000 +0200
+++ new/kombu-2.4.8/docs/changelog.rst  2012-11-02 17:54:07.000000000 +0100
@@ -4,6 +4,36 @@
  Change history
 ================
 
+.. _version-2.4.8:
+
+2.4.8
+=====
+:release-date: 2012-11-02 05:00 P.M UTC
+
+- Redis:  Fair queue cyle implementation improved (Issue #166).
+
+    Contributed by Kevin McCarthy.
+
+- Redis: Number of messages to restore in one iteration is now unlimited,
+  but can be configured using the unacked_restore_limit transport option.
+
+- Redis: A Redis based mutex is now used while restoring messages.
+
+- LamportClock.adjust now returns the new clock value.
+
+- Heartbeats can now be specified in URLs.
+
+    Fix contributed by Mher Movsisyan.
+
+- Kombu can now be used with PyDev, PyCharm and other static analysis tools.
+
+- Fixes problem with msgpack on Python 3 (Issue #162).
+
+    Fix contributed by Jasper Bryant-Greene
+
+
+
+
 .. _version-2.4.7:
 
 2.4.7
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/docs/introduction.rst 
new/kombu-2.4.8/docs/introduction.rst
--- old/kombu-2.4.7/docs/introduction.rst       2012-09-18 16:12:13.000000000 
+0200
+++ new/kombu-2.4.8/docs/introduction.rst       2012-11-02 17:54:19.000000000 
+0100
@@ -2,7 +2,7 @@
  kombu - Messaging Framework for Python
 ========================================
 
-:Version: 2.4.7
+:Version: 2.4.8
 
 `Kombu` is a messaging framework for Python.
 
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/examples/simple_task_queue/client.py 
new/kombu-2.4.8/examples/simple_task_queue/client.py
--- old/kombu-2.4.7/examples/simple_task_queue/client.py        2012-07-30 
15:20:21.000000000 +0200
+++ new/kombu-2.4.8/examples/simple_task_queue/client.py        2012-11-02 
15:47:51.000000000 +0100
@@ -18,6 +18,7 @@
         maybe_declare(task_exchange, producer.channel)
         producer.publish(payload, serializer='pickle',
                                   compression='bzip2',
+                                  exchange=task_exchange,
                                   routing_key=routing_key)
 
 if __name__ == '__main__':
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/__init__.py 
new/kombu-2.4.8/kombu/__init__.py
--- old/kombu-2.4.7/kombu/__init__.py   2012-09-18 16:12:13.000000000 +0200
+++ new/kombu-2.4.8/kombu/__init__.py   2012-11-02 17:54:19.000000000 +0100
@@ -1,7 +1,7 @@
 """Messaging Framework for Python"""
 from __future__ import absolute_import
 
-VERSION = (2, 4, 7)
+VERSION = (2, 4, 8)
 __version__ = '.'.join(map(str, VERSION[0:3])) + ''.join(VERSION[3:])
 __author__ = 'Ask Solem'
 __contact__ = 'a...@celeryproject.org'
@@ -21,6 +21,19 @@
     else:
         raise Exception('Kombu requires Python versions 2.5 or later.')
 
+STATICA_HACK = True
+globals()['kcah_acitats'[::-1].upper()] = False
+if STATICA_HACK:
+    # This is never executed, but tricks static analyzers (PyDev, PyCharm,
+    # pylint, etc.) into knowing the types of these symbols, and what
+    # they contain.
+    from kombu.connection import Connection, BrokerConnection   # noqa
+    from kombu.entitiy import Exchange, Queue, binding          # noqa
+    from kombu.messaging import Consumer, Producer              # noqa
+    from kombu.pools import connections, producers              # noqa
+    from kombu.utils.url import parse_url                       # noqa
+    from kombu.common import eventloop, uuid                    # noqa
+
 # Lazy loading.
 # - See werkzeug/__init__.py for the rationale behind this.
 from types import ModuleType
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/clocks.py 
new/kombu-2.4.8/kombu/clocks.py
--- old/kombu-2.4.7/kombu/clocks.py     2012-08-29 16:46:46.000000000 +0200
+++ new/kombu-2.4.8/kombu/clocks.py     2012-11-02 17:35:09.000000000 +0100
@@ -62,6 +62,7 @@
     def adjust(self, other):
         with self.mutex:
             self.value = max(self.value, other) + 1
+            return self.value
 
     def forward(self):
         with self.mutex:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/connection.py 
new/kombu-2.4.8/kombu/connection.py
--- old/kombu-2.4.7/kombu/connection.py 2012-09-18 16:10:42.000000000 +0200
+++ new/kombu-2.4.8/kombu/connection.py 2012-11-02 17:35:09.000000000 +0100
@@ -149,7 +149,7 @@
         self.connect_timeout = connect_timeout
         self.ssl = ssl
         self.transport_cls = transport
-        self.heartbeat = heartbeat
+        self.heartbeat = heartbeat and float(heartbeat)
 
     def _debug(self, msg, ident='[Kombu connection:0x%(id)x] ', **kwargs):
         if self._logger:  # pragma: no cover
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/pidbox.py 
new/kombu-2.4.8/kombu/pidbox.py
--- old/kombu-2.4.7/kombu/pidbox.py     2012-09-18 16:10:42.000000000 +0200
+++ new/kombu-2.4.8/kombu/pidbox.py     2012-11-02 17:35:09.000000000 +0100
@@ -242,14 +242,16 @@
             responses.append(body)
 
         consumer.register_callback(on_message)
-        with consumer:
-            for i in limit and range(limit) or count():
-                try:
-                    self.connection.drain_events(timeout=timeout)
-                except socket.timeout:
-                    break
-            return responses
-        chan.after_reply_message_received(queue.name)
+        try:
+            with consumer:
+                for i in limit and range(limit) or count():
+                    try:
+                        self.connection.drain_events(timeout=timeout)
+                    except socket.timeout:
+                        break
+                return responses
+        finally:
+            chan.after_reply_message_received(queue.name)
 
     def _get_exchange(self, namespace, type):
         return Exchange(self.exchange_fmt % namespace,
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/serialization.py 
new/kombu-2.4.8/kombu/serialization.py
--- old/kombu-2.4.7/kombu/serialization.py      2012-08-29 16:46:46.000000000 
+0200
+++ new/kombu-2.4.8/kombu/serialization.py      2012-11-02 17:35:09.000000000 
+0100
@@ -349,7 +349,8 @@
     """See http://msgpack.sourceforge.net/""";
     try:
         try:
-            from msgpack import packb as dumps, unpackb as loads
+            from msgpack import packb as dumps, unpackb
+            loads = lambda s: unpackb(s, encoding='utf-8')
         except ImportError:
             # msgpack < 0.2.0 and Python 2.5
             from msgpack import packs as dumps, unpacks as loads  # noqa
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/transport/amqplib.py 
new/kombu-2.4.8/kombu/transport/amqplib.py
--- old/kombu-2.4.7/kombu/transport/amqplib.py  2012-08-29 16:46:46.000000000 
+0200
+++ new/kombu-2.4.8/kombu/transport/amqplib.py  2012-11-02 15:47:36.000000000 
+0100
@@ -195,6 +195,9 @@
                 # http://bugs.python.org/issue10272
                 if 'timed out' in str(exc):
                     raise socket.timeout()
+                # Non-blocking SSL sockets can throw SSLError
+                if 'The operation did not complete' in str(exc):
+                    raise socket.timeout()
                 raise
         finally:
             if prev != timeout:
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu/transport/redis.py 
new/kombu-2.4.8/kombu/transport/redis.py
--- old/kombu-2.4.7/kombu/transport/redis.py    2012-09-17 11:42:32.000000000 
+0200
+++ new/kombu-2.4.8/kombu/transport/redis.py    2012-11-02 17:55:12.000000000 
+0100
@@ -12,7 +12,7 @@
 from __future__ import with_statement
 
 from bisect import bisect
-from itertools import cycle, islice
+from contextlib import contextmanager
 from time import time
 from Queue import Empty
 
@@ -58,6 +58,33 @@
 # queues manually.
 
 
+class MutexHeld(Exception):
+    pass
+
+
+@contextmanager
+def Mutex(client, name, expire):
+    lock_id = uuid()
+    if client.setnx(name, lock_id):
+        client.expire(name, expire)
+        yield
+    else:
+        if not client.ttl(name):
+            client.expire(name, expire)
+        raise MutexHeld()
+
+    pipe = client.pipeline(True)
+    try:
+        pipe.watch(name)
+        if pipe.get(name) == lock_id:
+            pipe.multi()
+            pipe.delete(name)
+            pipe.execute()
+        pipe.unwatch()
+    except redis.WatchError:
+        pass
+
+
 class QoS(virtual.QoS):
     restore_at_shutdown = True
 
@@ -96,20 +123,26 @@
         self._vrestore_count += 1
         if (self._vrestore_count - 1) % interval:
             return
+        client = self.client
         ceil = time() - self.visibility_timeout
-        visible = self.client.zrevrangebyscore(
-                self.unacked_index_key, ceil, 0,
-                start=start, num=num, withscores=True)
-        for tag, score in visible or []:
-            self.restore_by_tag(tag)
+        try:
+            with Mutex(client, self.unacked_mutex_key,
+                               self.unacked_mutex_expire):
+                visible = client.zrevrangebyscore(
+                        self.unacked_index_key, ceil, 0,
+                        start=num and start, num=num, withscores=True)
+                for tag, score in visible or []:
+                    self.restore_by_tag(tag, client)
+        except MutexHeld:
+            pass
 
-    def restore_by_tag(self, tag):
+    def restore_by_tag(self, tag, client=None):
+        client = client or self.client
         p, _, _ = self._remove_from_indices(tag,
-                        self.client.pipeline().hget(self.unacked_key, tag)) \
-                            .execute()
+            client.pipeline().hget(self.unacked_key, tag)).execute()
         if p:
             M, EX, RK = loads(p)
-            self.channel._do_restore_message(M, EX, RK)
+            self.channel._do_restore_message(M, EX, RK, client)
 
     @property
     def client(self):
@@ -124,6 +157,14 @@
         return self.channel.unacked_index_key
 
     @cached_property
+    def unacked_mutex_key(self):
+        return self.channel.unacked_mutex_key
+
+    @cached_property
+    def unacked_mutex_expire(self):
+        return self.channel.unacked_mutex_expire
+
+    @cached_property
     def visibility_timeout(self):
         return self.channel.visibility_timeout
 
@@ -201,13 +242,17 @@
     def on_poll_init(self, poller):
         self.poller = poller
         for channel in self._channels:
-            channel.qos.restore_visible()
+            return channel.qos.restore_visible(
+                num=channel.unacked_restore_limit,
+            )
 
     def on_poll_empty(self):
         for channel in self._channels:
             if channel.active_queues:
                 # only need to do this once, as they are not local to channel.
-                return channel.qos.restore_visible()
+                return channel.qos.restore_visible(
+                    num=channel.unacked_restore_limit,
+                )
 
     def handle_event(self, fileno, event):
         if event & READ:
@@ -256,20 +301,26 @@
     _fanout_queues = {}
     unacked_key = 'unacked'
     unacked_index_key = 'unacked_index'
-    visibility_timeout = 3600  # 1 hour
+    unacked_mutex_key = 'unacked_mutex'
+    unacked_mutex_expire = 300  # 5 minutes
+    unacked_restore_limit = None
+    visibility_timeout = 3600   # 1 hour
     priority_steps = PRIORITY_STEPS
 
     from_transport_options = (virtual.Channel.from_transport_options
                             + ('unacked_key',
                                'unacked_index_key',
+                               'unacked_mutex_key',
+                               'unacked_mutex_expire',
                                'visibility_timeout',
+                               'unacked_restore_limit',
                                'priority_steps'))
 
     def __init__(self, *args, **kwargs):
         super_ = super(Channel, self)
         super_.__init__(*args, **kwargs)
 
-        self._queue_cycle = cycle([])
+        self._queue_cycle = []
         self.Client = self._get_client()
         self.ResponseError = self._get_response_error()
         self.active_fanout_queues = set()
@@ -285,27 +336,29 @@
         # are still waiting for data.
         self.connection_errors = self.connection.connection_errors
 
-    def _do_restore_message(self, payload, exchange, routing_key):
+    def _do_restore_message(self, payload, exchange, routing_key, client=None):
+        client = client or self._avail_client
         try:
             try:
                 payload['headers']['redelivered'] = True
             except KeyError:
                 pass
             for queue in self._lookup(exchange, routing_key):
-                self._avail_client.lpush(queue, dumps(payload))
+                client.lpush(queue, dumps(payload))
         except Exception:
             logger.critical('Could not restore message: %r', payload,
                     exc_info=True)
 
     def _restore(self, message, payload=None):
         tag = message.delivery_tag
-        P, _ = self._avail_client.pipeline() \
-                            .hget(self.unacked_key, tag) \
-                            .hdel(self.unacked_key, tag) \
-                         .execute()
+        client = self._avail_client
+        P, _ = client.pipeline() \
+                    .hget(self.unacked_key, tag) \
+                    .hdel(self.unacked_key, tag) \
+                .execute()
         if P:
             M, EX, RK = loads(P)
-            self._do_restore_message(M, EX, RK)
+            self._do_restore_message(M, EX, RK, client)
 
     def _next_delivery_tag(self):
         return uuid()
@@ -391,6 +444,7 @@
             if dest__item:
                 dest, item = dest__item
                 dest = dest.rsplit(self.sep, 1)[0]
+                self._rotate_cycle(dest)
                 return loads(item), dest
             else:
                 raise Empty()
@@ -585,13 +639,25 @@
         each queue is equally likely to be consumed from,
         so that a very busy queue will not block others.
 
+        This works by using Redis's `BRPOP` command and
+        by rotating the most recently used queue to the
+        and of the list.  See Kombu github issue #166 for
+        more discussion of this method.
+
         """
-        self._queue_cycle = cycle(self.active_queues)
+        self._queue_cycle = list(self.active_queues)
 
     def _consume_cycle(self):
         """Get a fresh list of queues from the queue cycle."""
         active = len(self.active_queues)
-        return list(islice(self._queue_cycle, 0, active + 1))[:active]
+        return self._queue_cycle[0:active]
+
+    def _rotate_cycle(self, used):
+        """
+        Move most recently used queue to end of list
+        """
+        index = self._queue_cycle.index(used)
+        self._queue_cycle.append(self._queue_cycle.pop(index))
 
     def _get_response_error(self):
         from redis import exceptions
diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' 
'--exclude=.svnignore' old/kombu-2.4.7/kombu.egg-info/PKG-INFO 
new/kombu-2.4.8/kombu.egg-info/PKG-INFO
--- old/kombu-2.4.7/kombu.egg-info/PKG-INFO     2012-09-18 16:13:04.000000000 
+0200
+++ new/kombu-2.4.8/kombu.egg-info/PKG-INFO     2012-11-02 17:55:47.000000000 
+0100
@@ -1,6 +1,6 @@
 Metadata-Version: 1.0
 Name: kombu
-Version: 2.4.7
+Version: 2.4.8
 Summary: Messaging Framework for Python
 Home-page: http://kombu.readthedocs.org
 Author: Ask Solem
@@ -10,7 +10,7 @@
          kombu - Messaging Framework for Python
         ========================================
         
-        :Version: 2.4.7
+        :Version: 2.4.8
         
         `Kombu` is a messaging framework for Python.
         

-- 
To unsubscribe, e-mail: opensuse-commit+unsubscr...@opensuse.org
For additional commands, e-mail: opensuse-commit+h...@opensuse.org

Reply via email to