Hello community, here is the log from the commit of package python-qpid for openSUSE:Factory checked in at 2018-01-22 16:21:34 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Comparing /work/SRC/openSUSE:Factory/python-qpid (Old) and /work/SRC/openSUSE:Factory/.python-qpid.new (New) ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Package is "python-qpid" Mon Jan 22 16:21:34 2018 rev:4 rq:568186 version:1.37.0 Changes: -------- --- /work/SRC/openSUSE:Factory/python-qpid/python-qpid.changes 2017-03-13 15:33:42.288399767 +0100 +++ /work/SRC/openSUSE:Factory/.python-qpid.new/python-qpid.changes 2018-01-22 16:22:29.642777104 +0100 @@ -1,0 +2,24 @@ +Fri Jan 19 12:28:14 UTC 2018 - [email protected] + +- Upgrade to upstream version 1.37.0 + * Bugs fixed + + QPID-2524 - Fails loading dtd in Python 2.6 on Ubuntu 9.10 + (Karmic) with "ValueError: unknown url type: + /.../specs/amqp.0-10.dtd" + + QPID-7809 - Python 0-10 messaging driver does not handle + heartbeat timeouts, "assert rcv.received < rcv.impending" + occurs + + QPID-7833 - Batch file for Windows is missing from source + distribution + + QPID-7884 - Python client should not raise exception on + close() after stop. + + QPID-7317 - Deadlock on publish + + QPID-7423 - [0-8...0-91] Chunk message content that exceeds + the capacity of a single frame into multiple frames + + QPID-7424 - [0-8..0-91] Consuming python client application + is not notified of remotely closed connection + + QPID-7588 - [Python Client 0-8..0-91] The received message + ocasionally might not be dispatched into the application + queue in timely manner + +------------------------------------------------------------------- Old: ---- qpid-python-1.35.0.tar.gz New: ---- qpid-python-1.37.0.tar.gz ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ Other differences: ------------------ ++++++ python-qpid.spec ++++++ --- /var/tmp/diff_new_pack.Wph3K9/_old 2018-01-22 16:22:30.430740251 +0100 +++ /var/tmp/diff_new_pack.Wph3K9/_new 2018-01-22 16:22:30.434740064 +0100 @@ -1,7 +1,7 @@ # # spec file for package python-qpid # -# Copyright (c) 2017 SUSE LINUX GmbH, Nuernberg, Germany. +# Copyright (c) 2018 SUSE LINUX GmbH, Nuernberg, Germany. # # All modifications and additions to the file contributed by third parties # remain the property of their copyright owners, unless otherwise agreed @@ -17,7 +17,7 @@ Name: python-qpid -Version: 1.35.0 +Version: 1.37.0 Release: 0 Summary: Apache Qpid Python client library for AMQP License: Apache-2.0 @@ -29,7 +29,6 @@ BuildRequires: python-xml Provides: %{name}-common = %{version} Obsoletes: %{name}-common < %{version} -BuildRoot: %{_tmppath}/%{name}-%{version}-build %if 0%{?suse_version} >= 1200 BuildArch: noarch %endif @@ -38,7 +37,6 @@ The Apache Qpid Python client library for AMQP. %files -%defattr(-,root,root,-) %doc LICENSE.txt %doc NOTICE.txt README.md examples %dir %{python_sitelib}/qpid @@ -49,6 +47,7 @@ %{python_sitelib}/qpid/specs %{python_sitelib}/qpid/tests %{_bindir}/qpid-python-test +%exclude %{_bindir}/qpid-python-test.bat %if "%{python_version}" >= "2.6" %{python_sitelib}/qpid_python-*.egg-info %endif @@ -63,7 +62,6 @@ Conformance tests for Apache Qpid. %files -n qpid-tests -%defattr(-,root,root,-) %{python_sitelib}/qpid_tests/ %doc NOTICE.txt %doc LICENSE.txt ++++++ qpid-python-1.35.0.tar.gz -> qpid-python-1.37.0.tar.gz ++++++ diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/NOTICE.txt new/qpid-python-1.37.0/NOTICE.txt --- old/qpid-python-1.35.0/NOTICE.txt 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/NOTICE.txt 2017-11-22 19:26:38.000000000 +0100 @@ -1,5 +1,5 @@ Apache Qpid Python Client -Copyright 2006-2016 The Apache Software Foundation +Copyright 2006-2017 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/PKG-INFO new/qpid-python-1.37.0/PKG-INFO --- old/qpid-python-1.35.0/PKG-INFO 2016-08-17 15:21:22.000000000 +0200 +++ new/qpid-python-1.37.0/PKG-INFO 2017-11-22 20:13:45.000000000 +0100 @@ -1,6 +1,6 @@ Metadata-Version: 1.0 Name: qpid-python -Version: 1.35.0 +Version: 1.37.0 Summary: Python client implementation and AMQP conformance tests for Apache Qpid Home-page: http://qpid.apache.org/ Author: Apache Qpid diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/RELEASE.md new/qpid-python-1.37.0/RELEASE.md --- old/qpid-python-1.35.0/RELEASE.md 1970-01-01 01:00:00.000000000 +0100 +++ new/qpid-python-1.37.0/RELEASE.md 2017-11-22 19:55:09.000000000 +0100 @@ -0,0 +1,29 @@ +### Building a release for vote: + +1. Grab a clean checkout for safety. +2. Run: "git checkout ${BRANCH}" if needed to switch to branch of the intended release point. +3. Update the versions etc: + - setup.py +4. Commit the changes, tag them. + - Run: "git add ." + - Run: 'git commit -m "update versions for ${TAG}"' + - Run: 'git tag -m "tag ${TAG}" ${TAG}' +5. Run: "python setup.py sdist" to create the qpid-python-${VERSION}.tar.gz release archive in the dist/ subdir. +6. Create signature and checksums for the archive: + - e.g "gpg --detach-sign --armor qpid-python-${VERSION}.tar.gz" + - e.g "sha512sum qpid-python-${VERSION}.tar.gz > qpid-python-${VERSION}.tar.gz.sha512" + - e.g "md5sum qpid-python-${VERSION}.tar.gz > qpid-python-${VERSION}.tar.gz.md5" +7. Push branch changes and tag. + - Also update versions to the applicable snapshot version for future work on it. +8. Commit artifacts to dist dev repo in https://dist.apache.org/repos/dist/dev/qpid/python/${TAG} dir. +9. Send vote email, provide links to dist dev repo and JIRA release notes. + +### After a vote succeeds: + +1. If needed, tag the RC bits with the final name/version. +2. Commit the artifacts to dist release repo in https://dist.apache.org/repos/dist/release/qpid/python/${VERSION} dir: + - e.g: svn cp -m "add files for qpid-python-${VERSION}" https://dist.apache.org/repos/dist/dev/qpid/python/${TAG}/ https://dist.apache.org/repos/dist/release/qpid/python/${VERSION}/ +3. Give the mirrors some time to distribute things. Usually 24hrs to be safe, less if needed. + - https://www.apache.org/mirrors/ gives stats on mirror age + last check etc. +4. Update the website with release content. +5. Send release announcement email. diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/mllib/__init__.py new/qpid-python-1.37.0/mllib/__init__.py --- old/qpid-python-1.35.0/mllib/__init__.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/mllib/__init__.py 2017-11-22 19:26:38.000000000 +0100 @@ -66,12 +66,7 @@ return InputSource(systemId) def xml_parse(filename, path=()): - if sys.version_info[0:2] == (2,3): - # XXX: this is for older versions of python - from urllib import pathname2url - source = "file:%s" % pathname2url( os.path.abspath( filename ) ) - else: - source = filename + source = "file://%s" % os.path.abspath(filename) h = parsers.XMLParser() p = xml.sax.make_parser() p.setContentHandler(h) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/client.py new/qpid-python-1.37.0/qpid/client.py --- old/qpid-python-1.35.0/qpid/client.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/client.py 2017-11-22 19:26:38.000000000 +0100 @@ -184,12 +184,11 @@ msg.secure_ok(response=self.client.sasl.response(msg.challenge)) def connection_tune(self, ch, msg): + tune_params = dict(zip(('channel_max', 'frame_max', 'heartbeat'), (msg.frame.args))) if self.client.tune_params: - #todo: just override the params, i.e. don't require them - # all to be included in tune_params - msg.tune_ok(**self.client.tune_params) - else: - msg.tune_ok(*msg.frame.args) + tune_params.update(self.client.tune_params) + msg.tune_ok(**tune_params) + self.client.tune_params = tune_params self.client.started.set() def message_transfer(self, ch, msg): @@ -247,6 +246,9 @@ self.client.closed = True self.client.reason = reason self.client.started.set() + with self.client.lock: + for queue in self.client.queues.values(): + queue.close(reason) class StructFactory: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/connection08.py new/qpid-python-1.37.0/qpid/connection08.py --- old/qpid-python-1.35.0/qpid/connection08.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/connection08.py 2017-11-22 19:26:38.000000000 +0100 @@ -157,10 +157,13 @@ class Connection: + AMQP_HEADER_SIZE = 8 + def __init__(self, io, spec): self.codec = codec.Codec(io, spec) self.spec = spec self.FRAME_END = self.spec.constants.byname["frame_end"].id + self.FRAME_MIN_SIZE = spec.constants.byname['frame_min_size'].id self.write = getattr(self, "write_%s_%s" % (self.spec.major, self.spec.minor)) self.read = getattr(self, "read_%s_%s" % (self.spec.major, self.spec.minor)) self.io = io diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/exceptions.py new/qpid-python-1.37.0/qpid/exceptions.py --- old/qpid-python-1.35.0/qpid/exceptions.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/exceptions.py 2017-11-22 19:26:38.000000000 +0100 @@ -20,3 +20,4 @@ class Closed(Exception): pass class Timeout(Exception): pass class VersionError(Exception): pass +class ContentError(Exception): pass diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/messaging/driver.py new/qpid-python-1.37.0/qpid/messaging/driver.py --- old/qpid-python-1.35.0/qpid/messaging/driver.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/messaging/driver.py 2017-11-22 19:26:38.000000000 +0100 @@ -402,6 +402,7 @@ waiting for output bandwidth (call the self.writeable() callback) """ return self._transport is not None and \ + self.engine is not None and \ self._transport.writing(self.engine.pending()) @synchronized @@ -431,7 +432,7 @@ reconnect the transport, declare the reconnect ok, then fail again after 2 missed heartbeats and so on. """ - if self._retrying and self.engine._connected: # Means we have received open-ok. + if self._retrying and self.engine is not None and self.engine._connected: # Means we have received open-ok. if self._reconnect_log: log.warn("reconnect succeeded: %s:%s", *self._last_host) self._next_retry = None @@ -441,6 +442,8 @@ @synchronized def readable(self): + if self.engine is None: + return try: data = self._transport.recv(64*1024) if data is None: @@ -486,6 +489,7 @@ self.engine.close() else: self.engine.close(e) + self.st_closed() self.schedule() @@ -507,6 +511,8 @@ @synchronized def writeable(self): + if self.engine is None: + return notify = False try: n = self._transport.send(self.engine.peek()) @@ -543,9 +549,10 @@ if self._transport is None: if self.connection._connected and not self.connection.error: self.connect() - else: + elif self.engine is not None: self.engine.dispatch() except HeartbeatTimeout, e: + log.warn("Heartbeat timeout") self.close_engine(e) except ContentError, e: msg = compat.format_exc() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/peer.py new/qpid-python-1.37.0/qpid/peer.py --- old/qpid-python-1.35.0/qpid/peer.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/peer.py 2017-11-22 19:26:38.000000000 +0100 @@ -31,7 +31,7 @@ from content import Content from cStringIO import StringIO from time import time -from exceptions import Closed, Timeout +from exceptions import Closed, Timeout, ContentError from logging import getLogger log = getLogger("qpid.peer") @@ -105,7 +105,7 @@ try: frame = self.conn.read() except EOF, e: - self.work.close() + self.work.close("Connection lost") break ch = self.channel(frame.channel) ch.receive(frame, self.work) @@ -121,6 +121,7 @@ self.delegate.closed(reason) for ch in self.channels.values(): ch.closed(reason) + self.outgoing.close() def writer(self): try: @@ -149,8 +150,8 @@ content = None self.delegate(channel, Message(channel, frame, content)) - except QueueClosed: - self.closed("worker closed") + except QueueClosed, e: + self.closed(str(e) or "worker closed") except: self.fatal() @@ -264,9 +265,14 @@ self.write(header) for child in content.children: self.write_content(klass, child) - # should split up if content.body exceeds max frame size if content.body: - self.write(Body(content.body)) + if not isinstance(content.body, (basestring, buffer)): + # The 0-8..0-91 client does not support the messages bodies apart from string/buffer - fail early + # if other type + raise ContentError("Content body must be string or buffer, not a %s" % type(content.body)) + frame_max = self.client.tune_params['frame_max'] - self.client.conn.AMQP_HEADER_SIZE + for chunk in (content.body[i:i + frame_max] for i in xrange(0, len(content.body), frame_max)): + self.write(Body(chunk)) def receive(self, frame, work): if isinstance(frame, Method): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/queue.py new/qpid-python-1.37.0/qpid/queue.py --- old/qpid-python-1.35.0/qpid/queue.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/queue.py 2017-11-22 19:26:38.000000000 +0100 @@ -40,7 +40,8 @@ self.thread = None def close(self, error = None): - self.error = error + if error and self.error is None: + self.error = error self.put(Queue.END) if self.thread is not None: self.thread.join() diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/selector.py new/qpid-python-1.37.0/qpid/selector.py --- old/qpid-python-1.35.0/qpid/selector.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/selector.py 2017-11-22 19:26:38.000000000 +0100 @@ -6,9 +6,9 @@ # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at -# +# # http://www.apache.org/licenses/LICENSE-2.0 -# +# # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -16,10 +16,27 @@ # specific language governing permissions and limitations # under the License. # -import atexit, time, errno, os +import time, errno, os, atexit, traceback from compat import select, SelectError, set, selectable_waiter, format_exc from threading import Thread, Lock from logging import getLogger +from qpid.messaging import InternalError + +def _stack(skip=0): + return ("".join(traceback.format_stack()[:-(1+skip)])).strip() + +class SelectorStopped(InternalError): + def __init__(self, msg, where=None): + InternalError.__init__(self, text=msg) + self.where = _stack(1) + +def _check(ex, skip=0): + if ex: + log.error("illegal use of qpid.messaging at:\n%s\n%s" % (_stack(1), ex)) + where = getattr(ex, 'where') + if where: + log.error("qpid.messaging was previously stopped at:\n%s\n%s" % (where, ex)) + raise ex log = getLogger("qpid.messaging") @@ -53,9 +70,13 @@ Selector.lock.acquire() try: if Selector.DEFAULT is None or Selector._current_pid != os.getpid(): + # If we forked, mark the existing Selector dead. + if Selector.DEFAULT is not None: + log.warning("process forked, child must not use parent qpid.messaging") + Selector.DEFAULT.dead(SelectorStopped("forked child using parent qpid.messaging")) sel = Selector() - atexit.register(sel.stop) sel.start() + atexit.register(sel.stop) Selector.DEFAULT = sel Selector._current_pid = os.getpid() return Selector.DEFAULT @@ -69,10 +90,10 @@ self.waiter = selectable_waiter() self.reading.add(self.waiter) self.stopped = False - self.thread = None self.exception = None def wakeup(self): + _check(self.exception) self.waiter.wakeup() def register(self, selectable): @@ -101,14 +122,14 @@ self.wakeup() def start(self): - self.stopped = False + _check(self.exception) self.thread = Thread(target=self.run) self.thread.setDaemon(True) self.thread.start(); def run(self): try: - while not self.stopped: + while not self.stopped and not self.exception: wakeup = None for sel in self.selectables.copy(): t = self._update(sel) @@ -152,16 +173,47 @@ if w is not None and now > w: sel.timeout() except Exception, e: - self.exception = e - info = format_exc() - log.error("qpid.messaging I/O thread has died: %s" % str(e)) - for sel in self.selectables.copy(): - if hasattr(sel, "abort"): - sel.abort(e, info) - raise + log.error("qpid.messaging thread died: %s" % e) + self.exception = SelectorStopped(str(e)) + self.exception = self.exception or self.stopped + self.dead(self.exception or SelectorStopped("qpid.messaging thread died: reason unknown")) def stop(self, timeout=None): - self.stopped = True - self.wakeup() - self.thread.join(timeout) - self.thread = None + """Stop the selector and wait for it's thread to exit. It cannot be re-started""" + if self.thread and not self.stopped: + self.stopped = SelectorStopped("qpid.messaging thread has been stopped") + self.wakeup() + self.thread.join(timeout) + + def dead(self, e): + """Mark the Selector as dead if it is stopped for any reason. Ensure there any future + attempt to use the selector or any of its connections will throw an exception. + """ + self.exception = e + try: + for sel in self.selectables.copy(): + c = sel.connection + for ssn in c.sessions.values(): + for l in ssn.senders + ssn.receivers: + disable(l, self.exception) + disable(ssn, self.exception) + disable(c, self.exception) + except Exception, e: + log.error("error stopping qpid.messaging (%s)\n%s", self.exception, format_exc()) + try: + self.waiter.close() + except Exception, e: + log.error("error stopping qpid.messaging (%s)\n%s", self.exception, format_exc()) + +# Disable an object to avoid hangs due to forked mutex locks or a stopped selector thread +import inspect +def disable(obj, exception): + assert(exception) + # Replace methods to raise exception or be a no-op + for m in inspect.getmembers( + obj, predicate=lambda m: inspect.ismethod(m) and not inspect.isbuiltin(m)): + name = m[0] + if name in ["close", "detach", "detach_all"]: # No-ops for these + setattr(obj, name, lambda *args, **kwargs: None) + else: # Raise exception for all others + setattr(obj, name, lambda *args, **kwargs: _check(exception, 1)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/testlib.py new/qpid-python-1.37.0/qpid/testlib.py --- old/qpid-python-1.35.0/qpid/testlib.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/testlib.py 2017-11-22 19:26:38.000000000 +0100 @@ -46,6 +46,9 @@ ...) which are wrappers for the Channel functions that note resources to clean up later. """ + DEFAULT_USERNAME = "guest" + DEFAULT_PASSWORD = "guest" + DEFAULT_PORT = 5672 def configure(self, config): self.config = config @@ -73,12 +76,20 @@ self.client.close() + def recv_timeout(self): + """Timeout used when a message is anticipated.""" + return float(self.config.defines.get("qpid.recv_timeout", "1")) + + def recv_timeout_negative(self): + """Timeout used when a message is NOT expected.""" + return float(self.config.defines.get("qpid.recv_timeout_negative", "0.5")) + def connect(self, host=None, port=None, user=None, password=None, tune_params=None, client_properties=None, channel_options=None): """Create a new connction, return the Client object""" host = host or self.config.broker.host - port = port or self.config.broker.port or 5672 - user = user or self.config.broker.user or "guest" - password = password or self.config.broker.password or "guest" + port = port or self.config.broker.port or self.DEFAULT_PORT + user = user or self.config.broker.user or self.DEFAULT_USERNAME + password = password or self.config.broker.password or self.DEFAULT_PASSWORD client = qpid.client.Client(host, port) try: client.start(username = user, password=password, tune_params=tune_params, client_properties=client_properties, channel_options=channel_options) @@ -182,6 +193,10 @@ """ Base class for Qpid test cases. using the final 0-10 spec """ + DEFAULT_USERNAME = "guest" + DEFAULT_PASSWORD = "guest" + DEFAULT_PORT = 5672 + DEFAULT_PORT_TLS = 5671 def configure(self, config): self.config = config @@ -210,17 +225,17 @@ def connect(self, host=None, port=None): url = self.broker if url.scheme == URL.AMQPS: - default_port = 5671 + default_port = self.DEFAULT_PORT_TLS else: - default_port = 5672 + default_port = self.DEFAULT_PORT try: sock = connect(host or url.host, port or url.port or default_port) except socket.error, e: raise Skipped(e) if url.scheme == URL.AMQPS: sock = ssl(sock) - conn = Connection(sock, username=url.user or "guest", - password=url.password or "guest") + conn = Connection(sock, username=url.user or self.DEFAULT_USERNAME, + password=url.password or self.DEFAULT_PASSWORD) try: conn.start(timeout=10) except VersionError, e: diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/tests/messaging/__init__.py new/qpid-python-1.37.0/qpid/tests/messaging/__init__.py --- old/qpid-python-1.35.0/qpid/tests/messaging/__init__.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid/tests/messaging/__init__.py 2017-11-22 19:26:38.000000000 +0100 @@ -233,4 +233,4 @@ s += ":%s" % self.port return s -import address, endpoints, message +import address, endpoints, message, selector diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid/tests/messaging/selector.py new/qpid-python-1.37.0/qpid/tests/messaging/selector.py --- old/qpid-python-1.35.0/qpid/tests/messaging/selector.py 1970-01-01 01:00:00.000000000 +0100 +++ new/qpid-python-1.37.0/qpid/tests/messaging/selector.py 2017-11-22 19:26:38.000000000 +0100 @@ -0,0 +1,92 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import sys, os +from logging import getLogger +from unittest import TestCase +from qpid.selector import Selector, SelectorStopped +from qpid.messaging import * + +class SelectorTests(TestCase): + """Make sure that using a connection after a selector stops raises and doesn't hang""" + + def setUp(self): + self.log = getLogger("qpid.messaging") + self.propagate = self.log.propagate + self.log.propagate = False # Disable for tests, expected log output is noisy + + def tearDown(self): + # Clear out any broken selector so next test can function + Selector.DEFAULT = None + self.log.propagate = self.propagate # Restore setting + + def configure(self, config): + self.broker = config.broker + + def test_use_after_stop(self): + """Create endpoints, stop the selector, try to use them""" + c = Connection.establish(self.broker) + cstr = str(c) + ssn = c.session() + ssnrepr = repr(ssn) + r = ssn.receiver("foo;{create:always,delete:always}") + rstr = str(r) + s = ssn.sender("foo;{create:always,delete:always}") + srepr = str(s) + + Selector.DEFAULT.stop() + + # The following should be no-ops + c.close() + c.detach("foo") + ssn.close() + s.close() + r.close() + + # str/repr should return the same result + self.assertEqual(cstr, str(c)) + self.assertEqual(ssnrepr, repr(ssn)) + self.assertEqual(rstr, str(r)) + self.assertEqual(srepr, repr(s)) + + # Other functions should raise exceptions + self.assertRaises(SelectorStopped, c.session) + self.assertRaises(SelectorStopped, ssn.sender, "foo") + self.assertRaises(SelectorStopped, s.send, "foo") + self.assertRaises(SelectorStopped, r.fetch) + self.assertRaises(SelectorStopped, Connection.establish, self.broker) + + def test_use_after_fork(self): + c = Connection.establish(self.broker) + pid = os.fork() + if pid: # Parent + self.assertEqual((pid, 0), os.waitpid(pid, 0)) + self.assertEqual("child", c.session().receiver("child;{create:always}").fetch().content) + else: # Child + try: + # Can establish new connections + s = Connection.establish(self.broker).session().sender("child;{create:always}") + self.assertRaises(SelectorStopped, c.session) # But can't use parent connection + s.send("child") + os._exit(0) + except Exception, e: + print >>sys.stderr, "test child process error: %s" % e + os.exit(1) + finally: + os._exit(1) # Hard exit from child to stop remaining tests running twice diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid-python-test.bat new/qpid-python-1.37.0/qpid-python-test.bat --- old/qpid-python-1.35.0/qpid-python-test.bat 1970-01-01 01:00:00.000000000 +0100 +++ new/qpid-python-1.37.0/qpid-python-test.bat 2017-11-22 19:26:38.000000000 +0100 @@ -0,0 +1,2 @@ +@echo off +python %~dp0\qpid-python-test %* diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/basic.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/basic.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/basic.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/basic.py 2017-11-22 19:26:38.000000000 +0100 @@ -43,10 +43,10 @@ #check the queues of the two consumers excluded = self.client.queue("local_excluded") included = self.client.queue("local_included") - msg = included.get(timeout=1) + msg = included.get(timeout=self.recv_timeout()) self.assertEqual("consume_no_local", msg.content.body) try: - excluded.get(timeout=1) + excluded.get(timeout=self.recv_timeout()) self.fail("Received locally published message though no_local=true") except Empty: None @@ -113,7 +113,7 @@ queue = durable_subscription_client.queue(subscription.consumer_tag) # consume and verify message content - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual(test_message, msg.content.body) consumerchannel.basic_ack(delivery_tag=msg.delivery_tag) finally: @@ -172,14 +172,14 @@ channel.basic_publish(routing_key="test-queue-4", content=Content("One")) myqueue = self.client.queue("my-consumer") - msg = myqueue.get(timeout=1) + msg = myqueue.get(timeout=self.recv_timeout()) self.assertEqual("One", msg.content.body) #cancel should stop messages being delivered channel.basic_cancel(consumer_tag="my-consumer") channel.basic_publish(routing_key="test-queue-4", content=Content("Two")) try: - msg = myqueue.get(timeout=1) + msg = myqueue.get(timeout=self.recv_timeout()) self.fail("Got message after cancellation: " + msg) except Empty: None @@ -204,11 +204,11 @@ channel.basic_publish(routing_key="test-ack-queue", content=Content("Four")) channel.basic_publish(routing_key="test-ack-queue", content=Content("Five")) - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) + msg1 = queue.get(timeout=self.recv_timeout()) + msg2 = queue.get(timeout=self.recv_timeout()) + msg3 = queue.get(timeout=self.recv_timeout()) + msg4 = queue.get(timeout=self.recv_timeout()) + msg5 = queue.get(timeout=self.recv_timeout()) self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) @@ -221,14 +221,14 @@ channel.basic_recover(requeue=False) - msg3b = queue.get(timeout=1) - msg5b = queue.get(timeout=1) + msg3b = queue.get(timeout=self.recv_timeout()) + msg5b = queue.get(timeout=self.recv_timeout()) self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -248,11 +248,11 @@ channel.basic_publish(routing_key="test-requeue", content=Content("Four")) channel.basic_publish(routing_key="test-requeue", content=Content("Five")) - msg1 = queue.get(timeout=1) - msg2 = queue.get(timeout=1) - msg3 = queue.get(timeout=1) - msg4 = queue.get(timeout=1) - msg5 = queue.get(timeout=1) + msg1 = queue.get(timeout=self.recv_timeout()) + msg2 = queue.get(timeout=self.recv_timeout()) + msg3 = queue.get(timeout=self.recv_timeout()) + msg4 = queue.get(timeout=self.recv_timeout()) + msg5 = queue.get(timeout=self.recv_timeout()) self.assertEqual("One", msg1.content.body) self.assertEqual("Two", msg2.content.body) @@ -270,8 +270,8 @@ subscription2 = channel.basic_consume(queue="test-requeue") queue2 = self.client.queue(subscription2.consumer_tag) - msg3b = queue2.get(timeout=1) - msg5b = queue2.get(timeout=1) + msg3b = queue2.get(timeout=self.recv_timeout()) + msg5b = queue2.get(timeout=self.recv_timeout()) self.assertEqual("Three", msg3b.content.body) self.assertEqual("Five", msg5b.content.body) @@ -280,11 +280,11 @@ self.assertEqual(True, msg5b.redelivered) try: - extra = queue2.get(timeout=1) + extra = queue2.get(timeout=self.recv_timeout()) self.fail("Got unexpected message in second queue: " + extra.content.body) except Empty: None try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected message in original queue: " + extra.content.body) except Empty: None @@ -308,10 +308,10 @@ #only 5 messages should have been delivered: for i in range(1, 6): - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None @@ -319,13 +319,13 @@ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) for i in range(6, 11): - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None @@ -350,11 +350,11 @@ #only 5 messages should have been delivered (i.e. 45 bytes worth): for i in range(1, 6): - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected 6th message in original queue: " + extra.content.body) except Empty: None @@ -362,13 +362,13 @@ channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) for i in range(6, 11): - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout()) self.fail("Got unexpected 11th message in original queue: " + extra.content.body) except Empty: None @@ -376,7 +376,7 @@ large = "abcdefghijklmnopqrstuvwxyz" large = large + "-" + large; channel.basic_publish(routing_key="test-prefetch-size", content=Content(large)) - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual(large, msg.content.body) def test_get(self): diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/broker.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/broker.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/broker.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/broker.py 2017-11-22 19:26:38.000000000 +0100 @@ -37,7 +37,7 @@ ctag = ch.basic_consume(queue = "myqueue", no_ack = True).consumer_tag body = "test no-ack" ch.basic_publish(routing_key = "myqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) + msg = self.client.queue(ctag).get(timeout=self.recv_timeout()) self.assert_(msg.content.body == body) # Acknowledging consumer @@ -45,7 +45,7 @@ ctag = ch.basic_consume(queue = "otherqueue", no_ack = False).consumer_tag body = "test ack" ch.basic_publish(routing_key = "otherqueue", content = Content(body)) - msg = self.client.queue(ctag).get(timeout = 5) + msg = self.client.queue(ctag).get(timeout=self.recv_timeout()) ch.basic_ack(delivery_tag = msg.delivery_tag) self.assert_(msg.content.body == body) @@ -62,7 +62,7 @@ body = "Immediate Delivery" channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body), immediate=True) - msg = queue.get(timeout=5) + msg = queue.get(timeout=self.recv_timeout()) self.assert_(msg.content.body == body) # TODO: Ensure we fail if immediate=True and there's no consumer. @@ -81,7 +81,7 @@ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content(body)) reply = channel.basic_consume(queue="test-queue", no_ack=True) queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=5) + msg = queue.get(timeout=self.recv_timeout()) self.assert_(msg.content.body == body) def test_invalid_channel(self): @@ -111,10 +111,10 @@ channel.channel_flow(active=False) channel.basic_publish(routing_key="flow_test_queue", content=Content("abcdefghijklmnopqrstuvwxyz")) try: - incoming.get(timeout=1) + incoming.get(timeout=self.recv_timeout_negative()) self.fail("Received message when flow turned off.") except Empty: None channel.channel_flow(active=True) - msg = incoming.get(timeout=1) + msg = incoming.get(timeout=self.recv_timeout()) self.assertEqual("abcdefghijklmnopqrstuvwxyz", msg.content.body) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/example.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/example.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/example.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/example.py 2017-11-22 19:26:38.000000000 +0100 @@ -84,7 +84,7 @@ # Now we'll wait for the message to arrive. We can use the timeout # argument in case the server hangs. By default queue.get() will wait # until a message arrives or the connection to the server dies. - msg = queue.get(timeout=10) + msg = queue.get(timeout=self.recv_timeout()) # And check that we got the right response with assertEqual self.assertEqual(body, msg.content.body) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/queue.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/queue.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/queue.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/queue.py 2017-11-22 19:26:38.000000000 +0100 @@ -51,7 +51,7 @@ channel.basic_publish(exchange="test-exchange", routing_key="key", content=Content("four")) reply = channel.basic_consume(queue="test-queue", no_ack=True) queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("four", msg.content.body) #check error conditions (use new channels): @@ -207,7 +207,7 @@ #empty queue: reply = channel.basic_consume(queue="delete-me-2", no_ack=True) queue = self.client.queue(reply.consumer_tag) - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("message", msg.content.body) channel.basic_cancel(consumer_tag=reply.consumer_tag) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/testlib.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/testlib.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/testlib.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/testlib.py 2017-11-22 19:26:38.000000000 +0100 @@ -42,7 +42,7 @@ q = self.consume("empty") self.assertEmpty(q) try: - q.get(timeout=1) + q.get(timeout=self.recv_timeout()) self.fail("Queue is not empty.") except Empty: None # Ignore diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_8/tx.py new/qpid-python-1.37.0/qpid_tests/broker_0_8/tx.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_8/tx.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_8/tx.py 2017-11-22 19:26:38.000000000 +0100 @@ -36,18 +36,18 @@ #check results for i in range(1, 5): - msg = queue_c.get(timeout=1) + msg = queue_c.get(timeout=self.recv_timeout()) self.assertEqual("TxMessage %d" % i, msg.content.body) - msg = queue_b.get(timeout=1) + msg = queue_b.get(timeout=self.recv_timeout()) self.assertEqual("TxMessage 6", msg.content.body) - msg = queue_a.get(timeout=1) + msg = queue_a.get(timeout=self.recv_timeout()) self.assertEqual("TxMessage 7", msg.content.body) for q in [queue_a, queue_b, queue_c]: try: - extra = q.get(timeout=1) + extra = q.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -64,7 +64,7 @@ for q in [queue_a, queue_b, queue_c]: try: - extra = q.get(timeout=1) + extra = q.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -72,18 +72,18 @@ #check results for i in range(1, 5): - msg = queue_a.get(timeout=1) + msg = queue_a.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) - msg = queue_b.get(timeout=1) + msg = queue_b.get(timeout=self.recv_timeout()) self.assertEqual("Message 6", msg.content.body) - msg = queue_c.get(timeout=1) + msg = queue_c.get(timeout=self.recv_timeout()) self.assertEqual("Message 7", msg.content.body) for q in [queue_a, queue_b, queue_c]: try: - extra = q.get(timeout=1) + extra = q.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -100,7 +100,7 @@ for q in [queue_a, queue_b, queue_c]: try: - extra = q.get(timeout=1) + extra = q.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -108,18 +108,18 @@ #check results for i in range(1, 5): - msg = queue_a.get(timeout=1) + msg = queue_a.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) - msg = queue_b.get(timeout=1) + msg = queue_b.get(timeout=self.recv_timeout()) self.assertEqual("Message 6", msg.content.body) - msg = queue_c.get(timeout=1) + msg = queue_c.get(timeout=self.recv_timeout()) self.assertEqual("Message 7", msg.content.body) for q in [queue_a, queue_b, queue_c]: try: - extra = q.get(timeout=1) + extra = q.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None @@ -155,19 +155,19 @@ sub_a = channel.basic_consume(queue=name_a, no_ack=False) queue_a = self.client.queue(sub_a.consumer_tag) for i in range(1, 5): - msg = queue_a.get(timeout=1) + msg = queue_a.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) channel.basic_ack(delivery_tag=msg.delivery_tag, multiple=True) sub_b = channel.basic_consume(queue=name_b, no_ack=False) queue_b = self.client.queue(sub_b.consumer_tag) - msg = queue_b.get(timeout=1) + msg = queue_b.get(timeout=self.recv_timeout()) self.assertEqual("Message 6", msg.content.body) channel.basic_ack(delivery_tag=msg.delivery_tag) sub_c = channel.basic_consume(queue=name_c, no_ack=False) queue_c = self.client.queue(sub_c.consumer_tag) - msg = queue_c.get(timeout=1) + msg = queue_c.get(timeout=self.recv_timeout()) self.assertEqual("Message 7", msg.content.body) channel.basic_ack(delivery_tag=msg.delivery_tag) @@ -195,7 +195,7 @@ sub = channel.basic_consume(queue="commit-overlapping", no_ack=False) queue = self.client.queue(sub.consumer_tag) for i in range(1, 10): - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) self.assertEqual("Message %d" % i, msg.content.body) if i in [3, 6, 10]: channel.basic_ack(delivery_tag=msg.delivery_tag) @@ -204,6 +204,6 @@ #check all have been acked: try: - extra = queue.get(timeout=1) + extra = queue.get(timeout=self.recv_timeout_negative()) self.fail("Got unexpected message: " + extra.content.body) except Empty: None diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_9/echo.py new/qpid-python-1.37.0/qpid_tests/broker_0_9/echo.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_9/echo.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_9/echo.py 2017-11-22 19:26:38.000000000 +0100 @@ -19,97 +19,96 @@ from qpid.testlib import TestBase from qpid.content import Content -import qpid.client - +from qpid.harness import Skipped +import qpid.client class EchoTests(TestBase): - """Verify that messages can be sent and received retaining fidelity""" - - def test_small_message(self): - - channel = self.channel - - self.queue_declare(queue="q") - - channel.tx_select() - consumer = self.consume("q", no_ack=False) - - body = self.uniqueString() - channel.basic_publish( - content=Content(body), - routing_key="q") - channel.tx_commit() - - msg = consumer.get(timeout=1) - channel.basic_ack(delivery_tag=msg.delivery_tag) - channel.tx_commit() - self.assertEqual(body, msg.content.body) - - def test_large_message(self): - - channel = self.channel + """Verify that messages can be sent and received retaining fidelity""" - self.queue_declare(queue="q") + def setUp(self): + super(EchoTests, self).setUp() + self.frame_max_size = self.client.tune_params['frame_max'] + self.assertTrue(self.frame_max_size >= self.client.conn.FRAME_MIN_SIZE) + + def test_empty_message(self): + body = '' + self.echo_message(body) + + def test_small_message(self): + body = self.uniqueString() + self.echo_message(body) + + def test_largest_single_frame_message(self): + max_size_within_single_frame = self.frame_max_size - self.client.conn.AMQP_HEADER_SIZE + body = self.randomLongString(max_size_within_single_frame) + self.echo_message(body) + + def test_multiple_frame_message(self): + size = self.frame_max_size * 2 - (self.client.conn.FRAME_MIN_SIZE / 2) + body = self.randomLongString(size) + self.echo_message(body) + + def echo_message(self, body): + channel = self.channel + self.queue_declare(queue="q") + channel.tx_select() + consumer = self.consume("q", no_ack=False) + channel.basic_publish( + content=Content(body), + routing_key="q") + channel.tx_commit() + msg = consumer.get(timeout=self.recv_timeout()) + channel.basic_ack(delivery_tag=msg.delivery_tag) + channel.tx_commit() + self.assertEqual(len(body), len(msg.content.body)) + self.assertEqual(body, msg.content.body) + + def test_large_message_received_in_many_content_frames(self): + if self.client.conn.FRAME_MIN_SIZE == self.frame_max_size: + raise Skipped("Test requires that frame_max_size (%d) exceeds frame_min_size (%d)" % (self.frame_max_size, self.frame_max_size)) + + channel = self.channel + + queue_name = "q" + self.queue_declare(queue=queue_name) + + channel.tx_select() + + body = self.randomLongString() + channel.basic_publish( + content=Content(body), + routing_key=queue_name) + channel.tx_commit() + + consuming_client = None + try: + # Create a second connection with minimum framesize. The Broker will then be forced to chunk + # the content in order to send it to us. + consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port or self.DEFAULT_PORT) + tune_params = { "frame_max" : self.client.conn.FRAME_MIN_SIZE } + consuming_client.start(username = self.config.broker.user or self.DEFAULT_USERNAME, + password = self.config.broker.password or self.DEFAULT_PASSWORD, + tune_params = tune_params) + + consuming_channel = consuming_client.channel(1) + consuming_channel.channel_open() + consuming_channel.tx_select() + + consumer_reply = consuming_channel.basic_consume(queue=queue_name, no_ack=False) + consumer = consuming_client.queue(consumer_reply.consumer_tag) + msg = consumer.get(timeout=self.recv_timeout()) + consuming_channel.basic_ack(delivery_tag=msg.delivery_tag) + consuming_channel.tx_commit() - channel.tx_select() - consumer = self.consume("q", no_ack=False) - - # This is default maximum frame size supported by the Java Broker. Python - # currently does not support framing of oversized messages in multiple frames. - body = self.randomLongString() - channel.basic_publish( - content=Content(body), - routing_key="q") - channel.tx_commit() - - msg = consumer.get(timeout=1) - channel.basic_ack(delivery_tag=msg.delivery_tag) - channel.tx_commit() self.assertEqual(len(body), len(msg.content.body)) self.assertEqual(body, msg.content.body) - - - def test_large_message_received_in_many_content_frames(self): - channel = self.channel - - queue_name = "q" - self.queue_declare(queue=queue_name) - - channel.tx_select() - - body = self.randomLongString() - channel.basic_publish( - content=Content(body), - routing_key=queue_name) - channel.tx_commit() - - consuming_client = None - try: - # Create a second connection with minimum framesize. The Broker will then be forced to chunk - # the content in order to send it to us. - consuming_client = qpid.client.Client(self.config.broker.host, self.config.broker.port) - tune_params = { "channel_max" : 256, "frame_max" : 4096 } - consuming_client.start(username = self.config.broker.user, password = self.config.broker.password, tune_params = tune_params) - - consuming_channel = consuming_client.channel(1) - consuming_channel.channel_open() - consuming_channel.tx_select() - - consumer_reply = consuming_channel.basic_consume(queue=queue_name, no_ack=False) - consumer = consuming_client.queue(consumer_reply.consumer_tag) - msg = consumer.get(timeout=1) - consuming_channel.basic_ack(delivery_tag=msg.delivery_tag) - consuming_channel.tx_commit() - - self.assertEqual(len(body), len(msg.content.body)) - self.assertEqual(body, msg.content.body) - finally: - if consuming_client: - consuming_client.close() + finally: + if consuming_client: + consuming_client.close() def test_commit_ok_possibly_interleaved_with_message_delivery(self): - """This test exposes an defect on the Java Broker (QPID-6094). The Java Client + """This test exposes an defect on the Java Broker (QPID-6094). The Java Broker (0.32 and below) can contravene the AMQP spec by sending other frames between the message header/frames. As this is a long standing defect in the Java Broker, QPID-6082 changed the Python client to allow it to tolerate such illegal interleaving. @@ -138,7 +137,7 @@ consumer = self.consume("q", no_ack=False) # Get and ack/commit the first message - msg = consumer.get(timeout=1) + msg = consumer.get(timeout=self.recv_timeout()) channel.basic_ack(delivery_tag=msg.delivery_tag) channel.tx_commit() # In the problematic case, the Broker interleaves our commit-ok response amongst the content @@ -150,7 +149,7 @@ self.assertEqual(expectedBody, msg.content.body) for i in range(1, len(bodies)): - msg = consumer.get(timeout=5) + msg = consumer.get(timeout=self.recv_timeout()) expectedBody = bodies[i] self.assertEqual(len(expectedBody), len(msg.content.body)) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/qpid_tests/broker_0_9/queue.py new/qpid-python-1.37.0/qpid_tests/broker_0_9/queue.py --- old/qpid-python-1.35.0/qpid_tests/broker_0_9/queue.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/qpid_tests/broker_0_9/queue.py 2017-11-22 19:26:38.000000000 +0100 @@ -66,16 +66,16 @@ content=Content("two", properties={"headers": headers})) #check one queue has both messages and the other has only one - self.assertEquals("one", queue1.get(timeout=1).content.body) + self.assertEquals("one", queue1.get(timeout=self.recv_timeout()).content.body) try: - msg = queue1.get(timeout=1) + msg = queue1.get(timeout=self.recv_timeout_negative()) self.fail("Got extra message: %s" % msg.body) except Empty: pass - self.assertEquals("one", queue2.get(timeout=1).content.body) - self.assertEquals("two", queue2.get(timeout=1).content.body) + self.assertEquals("one", queue2.get(timeout=self.recv_timeout()).content.body) + self.assertEquals("two", queue2.get(timeout=self.recv_timeout()).content.body) try: - msg = queue2.get(timeout=1) + msg = queue2.get(timeout=self.recv_timeout_negative()) self.fail("Got extra message: " + msg) except Empty: pass @@ -134,7 +134,7 @@ queue = self.client.queue(consumer_reply.consumer_tag) while True: try: - msg = queue.get(timeout=1) + msg = queue.get(timeout=self.recv_timeout()) except Empty: break channel.basic_cancel(consumer_tag=consumer_reply.consumer_tag) diff -urN '--exclude=CVS' '--exclude=.cvsignore' '--exclude=.svn' '--exclude=.svnignore' old/qpid-python-1.35.0/setup.py new/qpid-python-1.37.0/setup.py --- old/qpid-python-1.35.0/setup.py 2016-08-17 15:11:51.000000000 +0200 +++ new/qpid-python-1.37.0/setup.py 2017-11-22 20:11:24.000000000 +0100 @@ -297,13 +297,10 @@ extra.append(tgt) return outfiles + extra -scripts = ["qpid-python-test"] +scripts = ["qpid-python-test", "qpid-python-test.bat"] -if platform.system() == "Windows": - scripts.append("qpid-python-test.bat") - setup(name="qpid-python", - version="1.35.0", + version="1.37.0", author="Apache Qpid", author_email="[email protected]", packages=["mllib", "qpid", "qpid.messaging", "qpid.tests",
