Dear Robbie, dear Gordon, Thank you very much for your replies and for your very useful and helpful suggestions. Looking better at the "db_send.py" and "tx_recv_interactive.py" examples, I was finally able to code a working example by using the same loop as before, but adding an EventInjector with: data_event=EventInjector() amqp_container=Container(AMQPHandler(arg1,arg2)) amqp_container.selectable(ue_data_event)
Then, by defining a new event inside the event loop (the AMQPHandler class): def on_data_event: <tab> # Manage event... And then by "provoking" the event from another loop, which waits from data from a pipe and calls "data_event.trigger()" when new data is available to be sent via AMQP: while True: <tab># Read from pipe <tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv() <tab># ... <tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP)) Everything seems to work fine now. I noticed that, however, especially when managing more senders (even quite a lot of them, reaching > 100) inside the same event loop (inside the same AMQPHandler class, using the same connection), the per-message delay between when data is produced and when it can be consumed/it is successfully consumed by an external consumer, is significantly higher, sometimes, that what I can achieve with a Python's BlockingConnection. In particular, sometimes it is even higher than 2 or 3 seconds. Is this due to the event loop being "too busy" managing a lot of senders, and thus delaying the actual transmission of data? As I needed to write a client application trying to ensure the lowest producer-to-consumer latency possible, I ended up in trying another (much less regular) solution, with BlockingConnection(), that I just reported as a reply to the other thread ( http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html ). Thank you very much again, Francesco Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell < robbie.gemm...@gmail.com> ha scritto: > On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione > <francescorav.es...@gmail.com> wrote: > > > > Dear Adrian, > > Thank you very much for your reply. > > > > I tried following your suggestion, by attempting to find a way to call an > > external function reading the data from an external source, and returning > > it when available. > > > > However, unfortunately, it seems to be slightly easier to control the > > behaviour of the event loop in C than in Pyhton. > > In particular, in Python, if I call any blocking function (even a > > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable", the > > messages are only buffered and never sent (do you know why? Is the > > "on_sendable" handler supposed to be executed as fast as possible without > > blocking to have the messages being sent immediately?). > > There is only one thread, it is used for running the callbacks and > performing the IO. Anynew IO is processed after the handlers return, > so if you block the handlers, you block the IO thread, meaning it > can't do any work and thus cant send anything. > > > If, instead, I just do not block and send the data only if available (for > > instance by implementing a mechanism equivalent to a non-blocking read > from > > a pipe, with poll(), letting "on_sendable" finish without sending > anything, > > if there is no data available), only one "on_sendable" event is generated > > and the event loop does not seem to return any other event which I could > > use to send the data, which may be now available (for example, the > > equivalent of C's PN_DELIVERY does not seem to exist). > > > > The on_sendable is generated when credit to send initially arrives > from the server, or does again later. I believe it's also > 'regenerated' locally when you send, thus using some of the credit (so > long as some credit remains). As neither occurred, no more on_sendable > calls. > > I believe the EventInjector was the intended route for things like > this, performing/provoking work from outwith the container thread: > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector > > Its used in these examples: > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html > > You can also provoke your own callbacks by scheduling, so an > alternative might be scheduling (while using the container thread, not > your own) a task of your own to periodically run. Example: > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html > > > > Do you know if there are other ways to solve this issue? > > When you were "yielding" the control of the main loop to an external > > function, was it blocking in some way, waiting for data to be available? > > > > Thank you very much, > > Francesco > > > > > > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea < > florea....@gmail.com> > > ha scritto: > > > > > Hi Francesco, > > > > > > I achieved similar behavior but using Qpid Proton-C (AMQP send-only > > > program). > > > > > > After initialiazing proton and getting the first FLOW event, I simply > > > "yield" main event loop control to an external/looping function that > can > > > read from a different source and then send the same message to an AMQP > > > destination. > > > > > > It may seem hard in the beggining but once you get a hold on how to > > > integrate proton events loop into your program loop, it has good > chances to > > > work. > > > > > > Adrian > > > > > > > > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione < > > > francescorav.es...@gmail.com> wrote: > > > > > > > Dear all, > > > > I'm trying to use the Python version of Qpid Proton to send data to > an > > > AMQP > > > > 1.0 broker. > > > > I've seen that several examples are sending data which was > > > > available/defined before the AMQP event loop is started. > > > > However, I need to send data only when it becomes available from an > > > > external process (for example by waiting for it on a Pipe, with > > > > self.amqp_pipe_end.recv()), and then send it immediately to the > broker. > > > > If I try to wait on a pipe inside "on_sendable", no message is > actually > > > > transferred to the broker, as the event loop is "blocked" waiting > for new > > > > data and cannot manage properly the AMQP 1.0 connection (is this > > > correct?). > > > > How could I achieve the desired result? How can I make the loop wait > for > > > > external data and then immediately send it? > > > > > > > > Thank you very much in advance, > > > > Francesco Raviglione > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > For additional commands, e-mail: users-h...@qpid.apache.org > >