Hi,
I'm trying to create an async client that will read messages from two
broker queues.
This client needs to be resilient enough to survive broker failover
tests (usually i stop the broker between 30 seconds to 2mins max.)
Most of the time it works fine, however sometimes there are some framing
errors[1] that is not able to recover[2].
All my attempts to fix it work failed and I've been looking into the
proton examples without success.
Can i get someone help me or point me in the right direction?
Code attached (84 lines).
Best regards,
David Santiago
[1] on_transport_error:708] ERROR: amqp:connection:framing-error:
connection aborted.
[2] The error indicated in [1] keeps appearing after the failover.
import threading
from proton.handlers import MessagingHandler
from proton.reactor import Container, EventInjector, ApplicationEvent, Backoff
from proton import Transport
class AsyncConsumer(MessagingHandler):
def __init__(self, server, queues, function=None, **options):
super(AsyncConsumer, self).__init__()
self.server = server
self.injector = EventInjector()
self.queues = queues
self.saved_messages = []
self.function = function
self.options = options
self.receivers = []
def start_listening(self):
self.reactor = Container(self)
self.thread = threading.Thread(target=self.reactor.run)
self.thread.daemon = True
self.thread.start()
def stop_listening(self):
self.injector.trigger(ApplicationEvent('shutdown'))
self.injector.close()
self.thread.join()
return self.saved_messages
def on_start(self, event):
self.container = event.container
self.container.selectable(self.injector)
self.conn = self.container.connect(self.server, reconnect=Backoff(initial=30, max_delay=180, factor=1)) # ,reconnect=False
for queue in self.queues:
self.receivers.append(self.container.create_receiver(self.conn, queue))
def on_message(self, event):
queue = event.receiver.source.address
message = event.message
if 'decode' in self.options and self.options['decode']:
message = self._extract_message_string(message)
if 'save' in self.options and self.options['save']:
self.saved_messages.append({'queue': queue, 'body': message})
if self.function:
self.function(queue, message)
def on_shutdown(self, event):
for recv in self.receivers:
recv.close()
self.receivers=[]
self.conn.close()
self.container.stop()
self.container._global_handler = None
self.container._handler = None
def on_transport_error(self, event):
super(AsyncConsumer, self).on_transport_error(event) # - it will perform the reconnect
def _extract_message_string(self, message) -> str:
# Documentation:
# http://qpid.apache.org/releases/qpid-proton-0.30.0/proton/python/docs/_modules/proton/_message.html#Message
if isinstance(message.body, list):
# This is just playing safe
if len(message.body) == 1:
return message.body[0]
else:
raise ValueError('The message contains more than one messages. Please debug!')
elif isinstance(message.body, str):
return message.body
elif isinstance(message.body, bytes):
return message.body.decode('utf-8')
else:
log.error(type(message.body))
log.error('Unable to process the message. Please debug the message:')
log.error(message)
raise ValueError('Unable to process the message. Please debug the message!')
---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org