Author: rhs
Date: Thu May 8 13:52:28 2008
New Revision: 654618
URL: http://svn.apache.org/viewvc?rev=654618&view=rev
Log:
QPID-979: added access to enums through the session so that symbolic constants
can be used rather than hard coded ones; also added default loading of the spec
Modified:
incubator/qpid/trunk/qpid/python/qpid/connection.py
incubator/qpid/trunk/qpid/python/qpid/invoker.py
incubator/qpid/trunk/qpid/python/qpid/session.py
incubator/qpid/trunk/qpid/python/qpid/spec.py
incubator/qpid/trunk/qpid/python/qpid/spec010.py
incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
incubator/qpid/trunk/qpid/python/tests_0-10/example.py
incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
incubator/qpid/trunk/qpid/python/tests_0-10/message.py
incubator/qpid/trunk/qpid/python/tests_0-10/persistence.py
incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
Modified: incubator/qpid/trunk/qpid/python/qpid/connection.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/connection.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/connection.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/connection.py Thu May 8 13:52:28 2008
@@ -25,7 +25,8 @@
from codec010 import StringCodec
from session import Session
from invoker import Invoker
-from spec010 import Control, Command
+from spec010 import Control, Command, load
+from spec import default
from exceptions import *
from logging import getLogger
import delegates
@@ -44,8 +45,10 @@
class Connection(Assembler):
- def __init__(self, sock, spec, delegate=client):
+ def __init__(self, sock, spec=None, delegate=client):
Assembler.__init__(self, sock)
+ if spec == None:
+ spec = load(default())
self.spec = spec
self.track = self.spec["track"]
@@ -162,9 +165,9 @@
def resolve_method(self, name):
inst = self.connection.spec.instructions.get(name)
if inst is not None and isinstance(inst, Control):
- return inst
+ return self.METHOD, inst
else:
- return None
+ return self.ERROR, None
def invoke(self, type, args, kwargs):
ctl = type.new(args, kwargs)
Modified: incubator/qpid/trunk/qpid/python/qpid/invoker.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/invoker.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/invoker.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/invoker.py Thu May 8 13:52:28 2008
@@ -17,16 +17,26 @@
# under the License.
#
+# TODO: need a better naming for this class now that it does the value
+# stuff
class Invoker:
- def resolve_method(self, name):
- pass
-
- def __getattr__(self, name):
- resolved = self.resolve_method(name)
- if resolved == None:
- raise AttributeError("%s instance has no attribute '%s'" %
- (self.__class__.__name__, name))
+ def METHOD(self, name, resolved):
method = lambda *args, **kwargs: self.invoke(resolved, args, kwargs)
self.__dict__[name] = method
return method
+
+ def VALUE(self, name, resolved):
+ self.__dict__[name] = resolved
+ return resolved
+
+ def ERROR(self, name, resolved):
+ raise AttributeError("%s instance has no attribute '%s'" %
+ (self.__class__.__name__, name))
+
+ def resolve_method(self, name):
+ return ERROR, None
+
+ def __getattr__(self, name):
+ disp, resolved = self.resolve_method(name)
+ return disp(name, resolved)
Modified: incubator/qpid/trunk/qpid/python/qpid/session.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/session.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/session.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/session.py Thu May 8 13:52:28 2008
@@ -112,15 +112,17 @@
def resolve_method(self, name):
cmd = self.spec.instructions.get(name)
if cmd is not None and cmd.track == self.spec["track.command"].value:
- return cmd
+ return self.METHOD, cmd
else:
# XXX
for st in self.spec.structs.values():
if st.name == name:
- return st
- if self.spec.structs_by_name.has_key(name):
- return self.spec.structs_by_name[name]
- return None
+ return self.METHOD, st
+ if self.spec.structs_by_name.has_key(name):
+ return self.METHOD, self.spec.structs_by_name[name]
+ if self.spec.enums.has_key(name):
+ return self.VALUE, self.spec.enums[name]
+ return self.ERROR, None
def invoke(self, type, args, kwargs):
# XXX
Modified: incubator/qpid/trunk/qpid/python/qpid/spec.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec.py Thu May 8 13:52:28 2008
@@ -31,6 +31,19 @@
import os, mllib, spec08, spec010
+def default():
+ try:
+ specfile = os.environ["AMQP_SPEC"]
+ return specfile
+ except KeyError:
+ try:
+ from AMQP_SPEC import location as specfile
+ return specfile
+ except ImportError:
+ raise Exception("unable to locate the amqp specification, please set "
+ "the AMQP_SPEC environment variable or supply a "
+ "configured AMQP_SPEC.py")
+
def load(specfile, *errata):
for name in (specfile,) + errata:
if not os.path.exists(name):
Modified: incubator/qpid/trunk/qpid/python/qpid/spec010.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/qpid/spec010.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/qpid/spec010.py (original)
+++ incubator/qpid/trunk/qpid/python/qpid/spec010.py Thu May 8 13:52:28 2008
@@ -166,6 +166,15 @@
def decode(self, codec):
return self.type.decode(codec)
+class Enum:
+
+ def __init__(self, name):
+ self.name = name
+
+ def __repr__(self):
+ return "%s(%s)" % (self.name, ", ".join([k for k in self.__dict__.keys()
+ if k != "name"]))
+
class Choice(Named, Node):
def __init__(self, name, value, children):
@@ -177,6 +186,12 @@
Named.register(self, node)
node.choices[self.value] = self
Node.register(self)
+ try:
+ enum = node.spec.enums[node.name]
+ except KeyError:
+ enum = Enum(node.name)
+ node.spec.enums[node.name] = enum
+ setattr(enum, self.name, self.value)
class Composite(Type, Coded):
@@ -450,6 +465,7 @@
self.commands = {}
self.structs = {}
self.structs_by_name = {}
+ self.enums = {}
def encoding(self, klass):
if Spec.ENCODINGS.has_key(klass):
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/alternate_exchange.py Thu May
8 13:52:28 2008
@@ -41,16 +41,16 @@
session.queue_declare(queue="returns", exclusive=True,
auto_delete=True)
session.exchange_bind(queue="returns", exchange="secondary")
session.message_subscribe(destination="a", queue="returns")
- session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="a",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
returned = session.incoming("a")
#declare, bind (to the primary exchange) and consume from a queue for
'processed' messages
session.queue_declare(queue="processed", exclusive=True,
auto_delete=True)
session.exchange_bind(queue="processed", exchange="primary",
binding_key="my-key")
session.message_subscribe(destination="b", queue="processed")
- session.message_flow(destination="b", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="b", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="b",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="b", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
processed = session.incoming("b")
#publish to the primary exchange
@@ -81,8 +81,8 @@
session.queue_declare(queue="deleted", exclusive=True,
auto_delete=True)
session.exchange_bind(exchange="dlq", queue="deleted")
session.message_subscribe(destination="dlq", queue="deleted")
- session.message_flow(destination="dlq", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="dlq", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="dlq",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="dlq", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
dlq = session.incoming("dlq")
#create a queue using the dlq as its alternate exchange:
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/broker.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/broker.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/broker.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/broker.py Thu May 8 13:52:28
2008
@@ -36,8 +36,8 @@
# No ack consumer
ctag = "tag1"
session.message_subscribe(queue = "myqueue", destination = ctag)
- session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
- session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag,
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag, unit=session.credit_unit.byte,
value=0xFFFFFFFF)
body = "test no-ack"
session.message_transfer(message=Message(session.delivery_properties(routing_key="myqueue"),
body))
msg = session.incoming(ctag).get(timeout = 5)
@@ -47,8 +47,8 @@
session.queue_declare(queue = "otherqueue", exclusive=True,
auto_delete=True)
ctag = "tag2"
session.message_subscribe(queue = "otherqueue", destination = ctag,
accept_mode = 1)
- session.message_flow(destination=ctag, unit=0, value=0xFFFFFFFF)
- session.message_flow(destination=ctag, unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag,
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination=ctag, unit=session.credit_unit.byte,
value=0xFFFFFFFF)
body = "test ack"
session.message_transfer(message=Message(session.delivery_properties(routing_key="otherqueue"),
body))
msg = session.incoming(ctag).get(timeout = 5)
@@ -64,8 +64,8 @@
session.exchange_bind(queue="test-queue", exchange="amq.fanout")
consumer_tag = "tag1"
session.message_subscribe(queue="test-queue", destination=consumer_tag)
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination =
consumer_tag)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = consumer_tag)
queue = session.incoming(consumer_tag)
body = "Immediate Delivery"
@@ -86,8 +86,8 @@
consumer_tag = "tag1"
session.message_subscribe(queue="test-queue", destination=consumer_tag)
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination =
consumer_tag)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = consumer_tag)
queue = session.incoming(consumer_tag)
msg = queue.get(timeout=5)
self.assert_(msg.body == body)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/dtx.py Thu May 8 13:52:28 2008
@@ -493,8 +493,8 @@
session2.dtx_select()
session2.dtx_start(xid=tx)
session2.message_subscribe(queue="dummy", destination="dummy")
- session2.message_flow(destination="dummy", unit=0, value=1)
- session2.message_flow(destination="dummy", unit=1, value=0xFFFFFFFF)
+ session2.message_flow(destination="dummy",
unit=session2.credit_unit.message, value=1)
+ session2.message_flow(destination="dummy",
unit=session2.credit_unit.byte, value=0xFFFFFFFF)
msg = session2.incoming("dummy").get(timeout=1)
session2.message_accept(RangedSet(msg.id))
session2.message_cancel(destination="dummy")
@@ -629,8 +629,8 @@
def swap(self, session, src, dest):
#consume from src:
session.message_subscribe(destination="temp-swap", queue=src)
- session.message_flow(destination="temp-swap", unit=0, value=1)
- session.message_flow(destination="temp-swap", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="temp-swap",
unit=session.credit_unit.message, value=1)
+ session.message_flow(destination="temp-swap",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
msg = session.incoming("temp-swap").get(timeout=1)
session.message_cancel(destination="temp-swap")
session.message_accept(RangedSet(msg.id))
@@ -646,8 +646,8 @@
def assertMessageId(self, expected, queue):
self.session.message_subscribe(queue=queue, destination="results")
- self.session.message_flow(destination="results", unit=0, value=1)
- self.session.message_flow(destination="results", unit=1,
value=0xFFFFFFFF)
+ self.session.message_flow(destination="results",
unit=self.session.credit_unit.message, value=1)
+ self.session.message_flow(destination="results",
unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
self.assertEqual(expected,
self.getMessageProperty(self.session.incoming("results").get(timeout=1),
'correlation_id'))
self.session.message_cancel(destination="results")
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/example.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/example.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/example.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/example.py Thu May 8 13:52:28
2008
@@ -69,8 +69,8 @@
# field that is filled if the reply includes content. In this case the
# interesting field is the consumer_tag.
session.message_subscribe(queue="test-queue",
destination="consumer_tag")
- session.message_flow(destination="consumer_tag", unit=0,
value=0xFFFFFFFF)
- session.message_flow(destination="consumer_tag", unit=1,
value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
# We can use the session.incoming(...) method to access the messages
# delivered for our consumer_tag.
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/exchange.py Thu May 8 13:52:28
2008
@@ -108,8 +108,8 @@
else: self.uniqueTag += 1
consumer_tag = "tag" + str(self.uniqueTag)
self.session.message_subscribe(queue=queueName,
destination=consumer_tag)
- self.session.message_flow(destination=consumer_tag, unit=0,
value=0xFFFFFFFF)
- self.session.message_flow(destination=consumer_tag, unit=1,
value=0xFFFFFFFF)
+ self.session.message_flow(destination=consumer_tag,
unit=self.session.credit_unit.message, value=0xFFFFFFFF)
+ self.session.message_flow(destination=consumer_tag,
unit=self.session.credit_unit.byte, value=0xFFFFFFFF)
return self.session.incoming(consumer_tag)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/message.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/message.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/message.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/message.py Thu May 8 13:52:28
2008
@@ -229,8 +229,8 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue-4"),
"One"))
session.message_subscribe(destination="my-consumer",
queue="test-queue-4")
- session.message_flow(destination="my-consumer", unit=0,
value=0xFFFFFFFF)
- session.message_flow(destination="my-consumer", unit=1,
value=0xFFFFFFFF)
+ session.message_flow(destination="my-consumer",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="my-consumer",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
#should flush here
@@ -258,8 +258,8 @@
session.queue_declare(queue="test-ack-queue", auto_delete=True)
session.message_subscribe(queue = "test-ack-queue", destination =
"consumer")
- session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("consumer")
delivery_properties =
session.delivery_properties(routing_key="test-ack-queue")
@@ -289,8 +289,8 @@
session.close(timeout=10)
session = self.session
- session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("checker")
msg3b = queue.get(timeout=1)
@@ -504,16 +504,16 @@
session.exchange_bind(queue = "r", exchange = "amq.fanout")
session.message_subscribe(queue = "q", destination = "consumer")
- session.message_flow(destination="consumer", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="consumer", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"blah, blah"))
msg = session.incoming("consumer").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
session.message_reject(RangedSet(msg.id))
session.message_subscribe(queue = "r", destination = "checker")
- session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
msg = session.incoming("checker").get(timeout = 1)
self.assertEquals(msg.body, "blah, blah")
@@ -532,9 +532,9 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"Message %d" % i))
#set message credit to finite amount (less than enough for all
messages)
- session.message_flow(unit = 0, value = 5, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 5,
destination = "c")
#set infinite byte credit
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(1, 6):
@@ -543,7 +543,7 @@
#increase credit again and check more are received
for i in range(6, 11):
- session.message_flow(unit = 0, value = 1, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value =
1, destination = "c")
self.assertDataEquals(session, q.get(timeout = 1), "Message %d" %
i)
self.assertEmpty(q)
@@ -565,9 +565,9 @@
msg_size = 21
#set byte credit to finite amount (less than enough for all messages)
- session.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
msg_size*5, destination = "c")
#set infinite message credit
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(5):
@@ -576,7 +576,7 @@
#increase credit again and check more are received
for i in range(5):
- session.message_flow(unit = 1, value = msg_size, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
msg_size, destination = "c")
self.assertDataEquals(session, q.get(timeout = 1), "abcdefgh")
self.assertEmpty(q)
@@ -596,9 +596,9 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"Message %d" % i))
#set message credit to finite amount (less than enough for all
messages)
- session.message_flow(unit = 0, value = 5, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 5,
destination = "c")
#set infinite byte credit
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
for i in range(1, 6):
@@ -634,9 +634,9 @@
msg_size = 19
#set byte credit to finite amount (less than enough for all messages)
- session.message_flow(unit = 1, value = msg_size*5, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
msg_size*5, destination = "c")
#set infinite message credit
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = "c")
#check that expected number were received
q = session.incoming("c")
msgs = []
@@ -665,11 +665,11 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"Message %s" % i))
session.message_subscribe(queue = "q", destination = "a", acquire_mode
= 1)
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
session.message_subscribe(queue = "q", destination = "b", acquire_mode
= 1)
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "b")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "b")
for i in range(6, 11):
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"Message %s" % i))
@@ -700,8 +700,8 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"acquire me"))
session.message_subscribe(queue = "q", destination = "a", acquire_mode
= 1)
- session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="a",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
msg = session.incoming("a").get(timeout = 1)
self.assertEquals("acquire me", msg.body)
#message should still be on the queue:
@@ -726,8 +726,8 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"release me"))
session.message_subscribe(queue = "q", destination = "a")
- session.message_flow(destination="a", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="a", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="a",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="a", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
msg = session.incoming("a").get(timeout = 1)
self.assertEquals("release me", msg.body)
session.message_cancel(destination = "a")
@@ -746,8 +746,8 @@
session.message_transfer(message=Message(session.delivery_properties(routing_key="q"),
"released message %s" % (i)))
session.message_subscribe(queue = "q", destination = "a")
- session.message_flow(unit = 0, value = 10, destination = "a")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
queue = session.incoming("a")
first = queue.get(timeout = 1)
for i in range(2, 10):
@@ -779,8 +779,8 @@
session.message_transfer(message=Message(delivery_properties,
"message %s" % (i)))
session.message_subscribe(queue = "q", destination = "a")
- session.message_flow(unit = 0, value = 10, destination = "a")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
queue = session.incoming("a")
ids = []
for i in range (1, 11):
@@ -805,8 +805,8 @@
session.close(timeout=10)
session = self.session
- session.message_flow(destination="checker", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="checker", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="checker",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("checker")
self.assertEquals("message 4", queue.get(timeout = 1).body)
@@ -823,8 +823,8 @@
#consume some of them
session.message_subscribe(queue = "q", destination = "a")
session.message_set_flow_mode(flow_mode = 0, destination = "a")
- session.message_flow(unit = 0, value = 5, destination = "a")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 5,
destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
queue = session.incoming("a")
for i in range(1, 6):
@@ -839,11 +839,11 @@
#now create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "b",
acquire_mode=1)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "b")
#check it gets those not consumed
queue = session.incoming("b")
- session.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1,
destination = "b")
for i in range(6, 11):
msg = queue.get(timeout = 1)
self.assertEquals("message-%d" % (i), msg.body)
@@ -851,7 +851,7 @@
#TODO: tidy up completion
session.receiver._completed.add(msg.id)
session.channel.session_completed(session.receiver._completed)
- session.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1,
destination = "b")
self.assertEmpty(queue)
#check all 'browsed' messages are still on the queue
@@ -867,8 +867,8 @@
#create a not-acquired subscriber
session.message_subscribe(queue = "q", destination = "a",
acquire_mode=1)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
#browse through messages
queue = session.incoming("a")
@@ -889,8 +889,8 @@
#create a second not-acquired subscriber
session.message_subscribe(queue = "q", destination = "b",
acquire_mode=1)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- session.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1,
destination = "b")
#check it gets those not consumed
queue = session.incoming("b")
for i in [2,4,6,8,10]:
@@ -899,7 +899,7 @@
session.message_release(RangedSet(msg.id))
session.receiver._completed.add(msg.id)
session.channel.session_completed(session.receiver._completed)
- session.message_flow(unit = 0, value = 1, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 1,
destination = "b")
self.assertEmpty(queue)
#check all 'browsed' messages are still on the queue
@@ -916,13 +916,13 @@
#create two 'browsers'
session.message_subscribe(queue = "q", destination = "a",
acquire_mode=1)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
queueA = session.incoming("a")
session.message_subscribe(queue = "q", destination = "b",
acquire_mode=1)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "b")
- session.message_flow(unit = 0, value = 10, destination = "b")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "b")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "b")
queueB = session.incoming("b")
#have each browser release the message
@@ -938,8 +938,8 @@
#create consumer
session.message_subscribe(queue = "q", destination = "c")
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "c")
- session.message_flow(unit = 0, value = 10, destination = "c")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "c")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "c")
queueC = session.incoming("c")
#consume the message then ack it
msgC = queueC.get(timeout = 1)
@@ -950,12 +950,12 @@
def test_no_size(self):
self.queue_declare(queue = "q", exclusive=True, auto_delete=True)
- ch = self.session
- ch.message_transfer(content=SizelessContent(properties={'routing_key'
: "q"}, body="message-body"))
+ ssn = self.session
+ ssn.message_transfer(content=SizelessContent(properties={'routing_key'
: "q"}, body="message-body"))
- ch.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
- ch.message_flow(unit = 0, value = 0xFFFFFFFF, destination = "d")
- ch.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "d")
+ ssn.message_subscribe(queue = "q", destination="d", confirm_mode = 0)
+ ssn.message_flow(unit = ssn.credit_unit.message, value = 0xFFFFFFFF,
destination = "d")
+ ssn.message_flow(unit = ssn.credit_unit.byte, value = 0xFFFFFFFF,
destination = "d")
queue = session.incoming("d")
msg = queue.get(timeout = 3)
@@ -969,8 +969,8 @@
consumer_tag = "tag1"
session.message_subscribe(queue="xyz", destination=consumer_tag)
- session.message_flow(unit = 0, value = 0xFFFFFFFF, destination =
consumer_tag)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination =
consumer_tag)
+ session.message_flow(unit = session.credit_unit.message, value =
0xFFFFFFFF, destination = consumer_tag)
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = consumer_tag)
queue = session.incoming(consumer_tag)
msg = queue.get(timeout=1)
self.assertEquals("", msg.body)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/persistence.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/persistence.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/persistence.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/persistence.py Thu May 8
13:52:28 2008
@@ -49,8 +49,8 @@
#create consumer
session.message_subscribe(queue = "q", destination = "a", accept_mode
= 1, acquire_mode=0)
- session.message_flow(unit = 1, value = 0xFFFFFFFF, destination = "a")
- session.message_flow(unit = 0, value = 10, destination = "a")
+ session.message_flow(unit = session.credit_unit.byte, value =
0xFFFFFFFF, destination = "a")
+ session.message_flow(unit = session.credit_unit.message, value = 10,
destination = "a")
queue = session.incoming("a")
#consume the message, cancel subscription (triggering auto-delete),
then ack it
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/queue.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/queue.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/queue.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/queue.py Thu May 8 13:52:28
2008
@@ -49,8 +49,8 @@
#send a further message and consume it, ensuring that the other
messages are really gone
session.message_transfer(message=Message(session.delivery_properties(routing_key="test-queue"),
"four"))
session.message_subscribe(queue="test-queue", destination="tag")
- session.message_flow(destination="tag", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="tag", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="tag",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="tag", unit=session.credit_unit.byte,
value=0xFFFFFFFF)
queue = session.incoming("tag")
msg = queue.get(timeout=1)
self.assertEqual("four", msg.body)
@@ -166,11 +166,11 @@
session.queue_declare(queue="queue-2", exclusive=True,
auto_delete=True)
session.message_subscribe(queue="queue-1", destination="queue-1")
- session.message_flow(destination="queue-1", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="queue-1", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-1",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-1",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
session.message_subscribe(queue="queue-2", destination="queue-2")
- session.message_flow(destination="queue-2", unit=0, value=0xFFFFFFFF)
- session.message_flow(destination="queue-2", unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-2",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="queue-2",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue1 = session.incoming("queue-1")
queue2 = session.incoming("queue-2")
@@ -267,8 +267,8 @@
#empty queue:
session.message_subscribe(destination="consumer_tag",
queue="delete-me-2")
- session.message_flow(destination="consumer_tag", unit=0,
value=0xFFFFFFFF)
- session.message_flow(destination="consumer_tag", unit=1,
value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag",
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination="consumer_tag",
unit=session.credit_unit.byte, value=0xFFFFFFFF)
queue = session.incoming("consumer_tag")
msg = queue.get(timeout=1)
self.assertEqual("message", msg.body)
Modified: incubator/qpid/trunk/qpid/python/tests_0-10/tx.py
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-10/tx.py?rev=654618&r1=654617&r2=654618&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-10/tx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-10/tx.py Thu May 8 13:52:28 2008
@@ -251,13 +251,13 @@
session = session or self.session
consumer_tag = keys["destination"]
session.message_subscribe(**keys)
- session.message_flow(destination=consumer_tag, unit=0,
value=0xFFFFFFFF)
- session.message_flow(destination=consumer_tag, unit=1,
value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag,
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination=consumer_tag,
unit=session.credit_unit.byte, value=0xFFFFFFFF)
def enable_flow(self, tag, session=None):
session = session or self.session
- session.message_flow(destination=tag, unit=0, value=0xFFFFFFFF)
- session.message_flow(destination=tag, unit=1, value=0xFFFFFFFF)
+ session.message_flow(destination=tag,
unit=session.credit_unit.message, value=0xFFFFFFFF)
+ session.message_flow(destination=tag, unit=session.credit_unit.byte,
value=0xFFFFFFFF)
def complete(self, session, msg):
session.receiver._completed.add(msg.id)#TODO: this may be done
automatically