I am trying to write a python program that can receive a message, form a 
response, and send it back. I am getting errors on sending the response. Can 
someone help me see what I have wrong? The error appears to be -1 with no 
explanation. I'm hoping I just have some misunderstanding of how this is 
supposed to work.

Thank you,
-Steve

The code is:

import logging
 
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
from emp import EMP_msg
from transport_echo_request import TransportEchoRequest
from transport_echo_response import TransportEchoResponse
 
logger = logging.getLogger("ricochet")
 
class Plugin(MessagingHandler):
    def __init__(self):
        super(Plugin, self).__init__()
 
    def setup(self, config):
        self.broker = config['broker-url']
        self.recv_queue = config['recv-queue']
        self.send_queue = config['send-queue']
        self.message_to_send = None
 
    def on_start(self, event):
        conn = event.container.connect(self.broker)
        event.container.create_receiver(conn, self.recv_queue)
        self.sender = event.container.create_sender(conn, self.send_queue)
        logger.info("sender %r", self.sender)
 
    def on_message(self, event):
        bytes = event.message.body   # Should be transport echo request - 
translate?
        transport = 
event.message.properties['TransportUsed'].tobytes().decode("utf-8")
        logger.debug("body: %s", bytes.hex())
 
        msg = EMP_msg()
        ret = msg.decode(bytes)
        logger.debug("EMP header decode: %s", ret)
        if msg.msg_type_id != 4000 or msg.msg_ver != 1:
            logger.error("Received message type %dv%d not supported.", 
msg.msg_type_id, msg.msg_ver)
            return
        request = TransportEchoRequest()
        try:
            request.decode(bytes)
            request.decode_body()
        except Exception as e:
            logger.error("Decoding transport echo request: %s", e)
            return
        self.on_transport_echo_request(request, transport)
        self.release(event.delivery)
 
    def on_sendable(self, event):
        logger.info("Sendable.")
        if self.message_to_send:
            event.sender.send(self.message_to_send.msg_bytes)
            self.message_to_send = None
        return super().on_sendable(event)
 
    # These are local calls, not framework callbacks as the above are.
    def on_transport_echo_request(self, request, transport):
        logger.info("transport_echo from %s np %d on %s", request.src_addr, 
request.QoS_net_pref, transport)
        response = TransportEchoResponse(transport, request)
        response.encode_body()
        response.encode()
        self.sender.send(response.msg_bytes)
        #self.message_to_send = response
        #self.sender.offered(1)

When I run this (and the other side that initiates a request) I get:

INFO:ricochet:starting
INFO:ricochet:sender <proton._endpoints.Sender 0x10ff88c20 ~ 0x7fe6b88530a0>
INFO:proton:Connecting to Url('amqp://localhost:8000')...
INFO:proton:Connected to localhost
INFO:ricochet:Sendable.
INFO:ricochet:transport_echo from round-trip np 0 on att
Traceback (most recent call last):
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 87, in 
<module>
    main()
    ~~~~^^
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 83, in main
    asyncio.run(ricochet.run())
    ~~~~~~~~~~~^^^^^^^^^^^^^^^^
  File 
"/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
 line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File 
"/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/runners.py",
 line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File 
"/usr/local/Cellar/python@3.13/3.13.0_1/Frameworks/Python.framework/Versions/3.13/lib/python3.13/asyncio/base_events.py",
 line 721, in run_until_complete
    return future.result()
           ~~~~~~~~~~~~~^^
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 76, in run
    await asyncio.to_thread(self.run_in_thread())
                            ~~~~~~~~~~~~~~~~~~^^
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 73, in 
run_in_thread
    Container(self).run()
    ~~~~~~~~~~~~~~~~~~~^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py", 
line 197, in run
    while self.process():
          ~~~~~~~~~~~~^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_reactor.py", 
line 260, in process
    event.dispatch(handler)
    ~~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", 
line 160, in dispatch
    self.dispatch(h, type)
    ~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", 
line 160, in dispatch
    self.dispatch(h, type)
    ~~~~~~~~~~~~~^^^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", 
line 157, in dispatch
    _dispatch(handler, type.method, self)
    ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", 
line 128, in _dispatch
    m(*args)
    ~^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py", 
line 256, in on_delivery
    self.on_message(event)
    ~~~~~~~~~~~~~~~^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_handlers.py", 
line 281, in on_message
    _dispatch(self.delegate, 'on_message', event)
    ~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_events.py", 
line 128, in _dispatch
    m(*args)
    ~^^^^^^^
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 52, in 
on_message
    self.on_transport_echo_request(request, transport)
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
  File "/Users/user/rwn/roma/plugins/ricochet/./ricochet.py", line 68, in 
on_transport_echo_request
    self.sender.send(response.msg_bytes)
    ~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^
  File 
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line 
1180, in send
    return self.stream(obj)
           ~~~~~~~~~~~^^^^^
  File 
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line 
1163, in stream
    return self._check(pn_link_send(self._impl, data))
           ~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File 
"/Users/user/rwn/roma/lib/python3.13/site-packages/proton/_endpoints.py", line 
739, in _check
    raise exc("[%s]: %s" % (err, pn_error_text(pn_link_error(self._impl))))
proton._exceptions.LinkException: [-1]: None

The sender code is below. It does send the first message, but never gets a 
response - the above is supposed to send it but the send fails.

# Send one transport echo request and wait for a response
 
import logging
from proton import Message
from proton.handlers import MessagingHandler
from proton.reactor import Container
 
from transport_echo_request import TransportEchoRequest
from transport_echo_response import TransportEchoResponse
import emp
 
logger = logging.getLogger("round_trip")
 
class RoundTrip(MessagingHandler):
    def __init__(self, server, to_ricochet_address, from_ricochet_address):
        super(RoundTrip, self).__init__()
        self.server = server
        self.send_address = to_ricochet_address
        self.recv_address = from_ricochet_address
        logger.info("Setting up - broker %s", server)
 
    def on_start(self, event):
        logger.debug("on_start")
        conn = event.container.connect(self.server)
        event.container.create_receiver(conn, self.recv_address)
        event.container.create_sender(conn, self.send_address)
 
    def on_sendable(self, event):
        logger.debug("on_sendable")
        req = TransportEchoRequest()
        req.data_integrity_flag = emp.EMP_INTEG_NONE
        req.src_addr = "round-trip"
        req.dest_addr = "ricochet"
        req.guid = bytes(16)
        req.encode_body()
        req_bytes = req.encode()
        m = Message(memoryview(req_bytes))
        # The 'TransportUsed' property type was discovered by experimentation - 
use itcm-ping to
        # send a message to the ricochet plugin and print the class of the 
value.
        m.properties = { 'TransportUsed': memoryview(b'att') }
        event.sender.send(m)
       # event.sender.close()
 
    def on_message(self, event):
        logger.debug("on_message")
        msg = emp.EMP_msg()
        msg.decode(event.message.body)
        logger.debug("msg type %d, vers %d", msg.msg_type_id, msg.msg_ver)
        #assert 4001 == msg.msg_type_id
        #assert 1 == msg.msg_ver
        resp = TransportEchoResponse()
        resp.decode(event.message.body)
        resp.decode_body()
        print("Response from ", resp.src_addr, " to ", resp.dest_addr, " used 
", resp.sending_transport)
        event.receiver.close()
        event.connection.close()
 
logging.basicConfig(level=logging.DEBUG)
Container(RoundTrip("localhost:8000", "in", "out")).run()

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to