Hi,

I'm trying to create an async client that will read messages from two broker queues.

This client needs to be resilient enough to survive broker failover tests (usually i stop the broker between 30 seconds to 2mins max.)

Most of the time it works fine, however sometimes there are some framing errors[1] that is not able to recover[2].

All my attempts to fix it work failed and I've been looking into the proton examples without success.

Can i get someone help me or point me in the right direction?


Code attached (84 lines).


Best regards,
David Santiago


[1] on_transport_error:708] ERROR: amqp:connection:framing-error: connection aborted.
[2] The error indicated in [1] keeps appearing after the failover.
import threading
from proton.handlers import MessagingHandler
from proton.reactor import Container, EventInjector, ApplicationEvent, Backoff
from proton import Transport


class AsyncConsumer(MessagingHandler):

    def __init__(self, server, queues, function=None, **options):
        super(AsyncConsumer, self).__init__()
        self.server = server
        self.injector = EventInjector()
        self.queues = queues
        self.saved_messages = []
        self.function = function
        self.options = options
        self.receivers = []

    def start_listening(self):
        self.reactor = Container(self)
        self.thread = threading.Thread(target=self.reactor.run)
        self.thread.daemon = True
        self.thread.start()

    def stop_listening(self):
        self.injector.trigger(ApplicationEvent('shutdown'))
        self.injector.close()
        self.thread.join()
        return self.saved_messages

    def on_start(self, event):
        self.container = event.container
        self.container.selectable(self.injector)
        self.conn = self.container.connect(self.server, reconnect=Backoff(initial=30, max_delay=180, factor=1)) # ,reconnect=False
        for queue in self.queues:
            self.receivers.append(self.container.create_receiver(self.conn, queue))

    def on_message(self, event):
        queue = event.receiver.source.address
        message = event.message

        if 'decode' in self.options and self.options['decode']:
            message = self._extract_message_string(message)

        if 'save' in self.options and self.options['save']:
            self.saved_messages.append({'queue': queue, 'body': message})

        if self.function:
            self.function(queue, message)

    def on_shutdown(self, event):
        for recv in self.receivers:
            recv.close()

        self.receivers=[]
        self.conn.close()
        self.container.stop()
        self.container._global_handler = None
        self.container._handler = None

    def on_transport_error(self, event):
        super(AsyncConsumer, self).on_transport_error(event) # - it will perform the reconnect

    def _extract_message_string(self, message) -> str:
        # Documentation:
        # http://qpid.apache.org/releases/qpid-proton-0.30.0/proton/python/docs/_modules/proton/_message.html#Message
        if isinstance(message.body, list):
            # This is just playing safe
            if len(message.body) == 1:
                return message.body[0]
            else:
                raise ValueError('The message contains more than one messages. Please debug!')

        elif isinstance(message.body, str):
            return message.body
        elif isinstance(message.body, bytes):
            return message.body.decode('utf-8')
        else:
            log.error(type(message.body))
            log.error('Unable to process the message. Please debug the message:')
            log.error(message)
            raise ValueError('Unable to process the message. Please debug the message!')


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to