Hello everyone!
I'm currently stuck with a problem while using flow control on the Java broker
and a python client.
Info about the setup:
Ubuntu Server 14.04 LTS
Java Broker and Python client library are the most current stable version
0..28, the broker is running with a default configuration using a derbystore
persistence layer. The queue in question is both durable and permanent. The
sent messages are _not_ marked as durable.
We are running into severe problems because the QPID server crashes under heavy
load when some of the consumers fail to drain their respective queues. This
fills up the available heap space and eventually leads to an OutOfMemory
Exception which crashes the Java Broker. I suppose that messages which are not
explicitly marked as durable will never be written to the permanent storage,
even though the heap space is full and there is no explicit consumer
throttling, so this would be the expected result. Is this assumption correct?
To avoid this condition I've set up a queue with a Flow Control Threshold of
100MB and a Flow Resume Threshold of 80MB on the Broker (Low values for
testing). While filling up the queue with some random messages, as soon as it
hits the size limit, the python client crashes with an exception that doesn't
seem to be the intended reaction in this case:
Traceback (most recent call last):
File "qpid_test_flow.py", line 104, in <module>
sys.exit(main())
File "qpid_test_flow.py", line 86, in main
produce_messages()
File "qpid_test_flow.py", line 59, in produce_messages
qpid_sender.send(msg)
File "<string>", line 6, in send
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 889, in send
self.sync(timeout=timeout)
File "<string>", line 6, in sync
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 901, in sync
if not self._ewait(lambda: self.acked >= mno, timeout=timeout):
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 814, in _ewait
result = self.session._ewait(lambda: self.error or predicate(), timeout)
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 580, in _ewait
result = self.connection._ewait(lambda: self.error or predicate(), timeout)
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 219, in _ewait
self.check_error()
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/endpoints.py",
line 212, in check_error
raise e
qpid.messaging.exceptions.InternalError: Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/qpid/messaging/driver.py", line
669, in write
op.dispatch(self)
File "/usr/local/lib/python2.7/dist-packages/qpid/ops.py", line 84, in
dispatch
getattr(target, handler)(self, *args)
AttributeError: Engine instance has no attribute 'do_message_set_flow_mode'
The official documentation is rather quiet about client-side handling of
server-side enforced Flow Control Limits. In the source code, there is a
TargetCapacityExceeded Exception, which is a subclass of SenderError, so this
would be the expected Exception when the capacity limit is reached.
As a temporary workaround, If I catch the InternalError, set the 'error'
Attribute of the qpid connection to None and wait for a short time, I can loop
over the sender until the queue reaches the Flow Resume Threshold. But I'm
afraid that this is not the intended way of handling this, as the Traceback
suggests that the Python client is unable to handle some unknown server request.
Can somebody help me out here?
Kind regards,
Alex.
____________
Virus checked by G Data MailSecurity
Version: AVA 24.2950 dated 01.07.2014
Virus news: www.antiviruslab.com
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]