I have been seeing some strange behavior with the 0.20 C++ broker (I see the
same behavior in the 0.14 broker as well) when sending / receiving messages
with the Python API. It seems that the response time increases for each
consecutive message.
The client sends a message with a reply-to field and waits for a response.
The service receives all messages and immediately replies to the reply-to
address.
I have attached my test code to the bottom of this email. Please let me know
if I am not cleaning something up properly or misusing a class.
Here is the result of running the client code 10 times in a row without cycling
the service.
Sent 100 messages in 5.32662820816 seconds at 18.7736023789 messages / second
Sent 100 messages in 6.41928505898 seconds at 15.578058784 messages / second
Sent 100 messages in 6.92662286758 seconds at 14.4370499032 messages / second
Sent 100 messages in 7.94101595879 seconds at 12.5928471267 messages / second
Sent 100 messages in 8.71410989761 seconds at 11.475641365 messages / second
Sent 100 messages in 9.71762895584 seconds at 10.290576071 messages / second
Sent 100 messages in 10.6746881008 seconds at 9.36795520914 messages / second
Sent 100 messages in 11.2466139793 seconds at 8.89156506871 messages / second
Sent 100 messages in 12.0393908024 seconds at 8.30606810938 messages / second
Sent 100 messages in 12.906840086 seconds at 7.74782978125 messages / second
Cycled the service.
And another result of running the same client code 10 more times.
Sent 100 messages in 5.36381697655 seconds at 18.643440005 messages / second
Sent 100 messages in 6.09826803207 seconds at 16.3980985214 messages / second
Sent 100 messages in 7.14273500443 seconds at 14.0002393954 messages / second
Sent 100 messages in 8.16057085991 seconds at 12.2540446884 messages / second
Sent 100 messages in 8.86364603043 seconds at 11.2820389777 messages / second
Sent 100 messages in 9.45885896683 seconds at 10.5720996952 messages / second
Sent 100 messages in 10.3048520088 seconds at 9.70416653382 messages / second
Sent 100 messages in 11.9087610245 seconds at 8.39717916872 messages / second
Sent 100 messages in 12.8042399883 seconds at 7.80991297345 messages / second
Sent 100 messages in 12.7495720387 seconds at 7.84340052332 messages / second
Thanks,
Wes
=== pythonQpidService.py ===
bash-4.1$ cat pythonQpidService.py
#!/usr/bin/env python
from qpid.messaging import Connection
from qpid.messaging import Message
from threading import Thread
class PythonQpidService(Thread):
BROKER = "localhost:5672"
def __init__(self, broker=BROKER):
Thread.__init__(self)
self._running = True
self._connection = None
self._session = None
self._broker = broker
self._receiver = None
self.initQpidReceiver()
self._totalMessagesReceived = 0
def stop(self):
self._running = False
try:
self._receiver.close()
self._session.close()
self._connection.close()
except Exception, ex:
pass
def initQpidReceiver(self):
try:
self._connection = Connection(self._broker)
self._connection.open()
self._session = self._connection.session()
self._receiver = self._session.receiver("test.address.receive")
self._receiver.capacity = 100
self._receiver.name = "TestReceiver"
except Exception, ex:
print "Caught exception in initQpidReceiver: %s" % (ex)
def getNextMessage(self):
message = self._receiver.fetch()
self._session.acknowledge()
return (self._receiver, message)
def sendMessage(self, address, message):
try:
sender = self._session.sender(address)
sender.send(message)
self._session.acknowledge()
except Exception, ex:
print "Caught exception in sendMessage: %s" % (ex)
def run(self):
while self._running:
(receiver, message) = self.getNextMessage()
print "[%s] Received message: %s" % (self._totalMessagesReceived,
message)
self._totalMessagesReceived += 1
if message.reply_to:
responseText = {"text": "[%s] Reply-to response" %
(self._totalMessagesReceived)}
self.sendMessage(message.reply_to, responseText)
if __name__ == '__main__':
pqs = PythonQpidService()
pqs.start()
retVal = raw_input("Press enter to quit...")
pqs.stop()
pqs.join()
=== pythonQpidClient.py ===
bash-4.1$ cat pythonQpidClient.py
#!/usr/bin/env python
from qpid.messaging import Connection
from qpid.messaging import Message
from multiprocessing import Pool
connection = Connection("localhost:5672")
connection.open()
def sendSynchronousCommand(address, message):
session = connection.session()
receiver = session.receiver('#test.response.q; {create:receiver,
delete:receiver, node:{durable:False}}')
message.reply_to = receiver.source
sender = session.sender(address)
sender.send(message)
responseMsg = receiver.fetch()
session.acknowledge()
receiver.close()
return responseMsg
testMessage = Message({"testKey":"TestValue"})
def sendCommandFunction((blastNum,)):
response = sendSynchronousCommand("test.address.receive", testMessage)
print "[%s] response = %s" % (blastNum, response)
if __name__ == "__main__":
NUM_MESSAGES = 100
import time
startTime = time.time()
for blastNum in xrange(0,NUM_MESSAGES):
sendCommandFunction((blastNum,))
endTime = time.time()
timeDiff = endTime - startTime
print "Sent %s messages in %s seconds at %s messages / second" %
(NUM_MESSAGES,timeDiff,NUM_MESSAGES / timeDiff)
print "Exiting gracefully..."