I'd wonder whether if you are piling up lots of events for different senders then it's potentially (again, I haven't used the Injector bits, and Python isn't particularly my thing) just that many such events are all piled up, and all processed before the resulting IO can then be, effectively batching things up. That might be observed as 'higher latency' for the earlier sends, tapering to 'lower latency' for the later ones in a sequence processed together. When you switch to using a BlockingConnection, you presumably change it to processing a single case at a time in full and see 'lower latency', by virtue of doing and measuring rather different things.
Or it could be something completely different. Or there could be some problem/issue, potentially even on the server too, hard to speculate. If you think there is one, try to narrow it down and supply a reproducer someone could look at. As an aside: rather than having hundreds of senders, if the server you are using supports it and your overall use case suits it, you can also open a single sender to the 'anonymous relay' by omitting an address when creating it (I believe by using None in Python) and then have each message specifically carry its destination address (I believe passing address=<value> when creating the Message object in Python) Robbie On Fri, 17 Jul 2020 at 19:21, Francesco Raviglione <francescorav.es...@gmail.com> wrote: > > Dear Robbie, dear Gordon, > Thank you very much for your replies and for your very useful and helpful > suggestions. > Looking better at the "db_send.py" and "tx_recv_interactive.py" examples, I > was finally able to code a working example by using the same loop as > before, but adding an EventInjector with: > data_event=EventInjector() > amqp_container=Container(AMQPHandler(arg1,arg2)) > amqp_container.selectable(ue_data_event) > > Then, by defining a new event inside the event loop (the AMQPHandler class): > def on_data_event: > <tab> # Manage event... > > And then by "provoking" the event from another loop, which waits from data > from a pipe and calls "data_event.trigger()" when new data is available to > be sent via AMQP: > while True: > <tab># Read from pipe > <tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv() > <tab># ... > <tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP)) > > Everything seems to work fine now. > > I noticed that, however, especially when managing more senders (even quite > a lot of them, reaching > 100) inside the same event loop (inside the same > AMQPHandler class, using the same connection), the per-message delay > between when data is produced and when it can be consumed/it is > successfully consumed by an external consumer, is significantly higher, > sometimes, that what I can achieve with a Python's BlockingConnection. > In particular, sometimes it is even higher than 2 or 3 seconds. > Is this due to the event loop being "too busy" managing a lot of senders, > and thus delaying the actual transmission of data? > > As I needed to write a client application trying to ensure the lowest > producer-to-consumer latency possible, I ended up in trying another (much > less regular) solution, with BlockingConnection(), that I just reported as > a reply to the other thread ( > http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html > ). > > Thank you very much again, > Francesco > > Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell < > robbie.gemm...@gmail.com> ha scritto: > > > On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione > > <francescorav.es...@gmail.com> wrote: > > > > > > Dear Adrian, > > > Thank you very much for your reply. > > > > > > I tried following your suggestion, by attempting to find a way to call an > > > external function reading the data from an external source, and returning > > > it when available. > > > > > > However, unfortunately, it seems to be slightly easier to control the > > > behaviour of the event loop in C than in Pyhton. > > > In particular, in Python, if I call any blocking function (even a > > > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable", the > > > messages are only buffered and never sent (do you know why? Is the > > > "on_sendable" handler supposed to be executed as fast as possible without > > > blocking to have the messages being sent immediately?). > > > > There is only one thread, it is used for running the callbacks and > > performing the IO. Anynew IO is processed after the handlers return, > > so if you block the handlers, you block the IO thread, meaning it > > can't do any work and thus cant send anything. > > > > > If, instead, I just do not block and send the data only if available (for > > > instance by implementing a mechanism equivalent to a non-blocking read > > from > > > a pipe, with poll(), letting "on_sendable" finish without sending > > anything, > > > if there is no data available), only one "on_sendable" event is generated > > > and the event loop does not seem to return any other event which I could > > > use to send the data, which may be now available (for example, the > > > equivalent of C's PN_DELIVERY does not seem to exist). > > > > > > > The on_sendable is generated when credit to send initially arrives > > from the server, or does again later. I believe it's also > > 'regenerated' locally when you send, thus using some of the credit (so > > long as some credit remains). As neither occurred, no more on_sendable > > calls. > > > > I believe the EventInjector was the intended route for things like > > this, performing/provoking work from outwith the container thread: > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector > > > > Its used in these examples: > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html > > > > You can also provoke your own callbacks by scheduling, so an > > alternative might be scheduling (while using the container thread, not > > your own) a task of your own to periodically run. Example: > > > > http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html > > > > > > > Do you know if there are other ways to solve this issue? > > > When you were "yielding" the control of the main loop to an external > > > function, was it blocking in some way, waiting for data to be available? > > > > > > Thank you very much, > > > Francesco > > > > > > > > > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea < > > florea....@gmail.com> > > > ha scritto: > > > > > > > Hi Francesco, > > > > > > > > I achieved similar behavior but using Qpid Proton-C (AMQP send-only > > > > program). > > > > > > > > After initialiazing proton and getting the first FLOW event, I simply > > > > "yield" main event loop control to an external/looping function that > > can > > > > read from a different source and then send the same message to an AMQP > > > > destination. > > > > > > > > It may seem hard in the beggining but once you get a hold on how to > > > > integrate proton events loop into your program loop, it has good > > chances to > > > > work. > > > > > > > > Adrian > > > > > > > > > > > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione < > > > > francescorav.es...@gmail.com> wrote: > > > > > > > > > Dear all, > > > > > I'm trying to use the Python version of Qpid Proton to send data to > > an > > > > AMQP > > > > > 1.0 broker. > > > > > I've seen that several examples are sending data which was > > > > > available/defined before the AMQP event loop is started. > > > > > However, I need to send data only when it becomes available from an > > > > > external process (for example by waiting for it on a Pipe, with > > > > > self.amqp_pipe_end.recv()), and then send it immediately to the > > broker. > > > > > If I try to wait on a pipe inside "on_sendable", no message is > > actually > > > > > transferred to the broker, as the event loop is "blocked" waiting > > for new > > > > > data and cannot manage properly the AMQP 1.0 connection (is this > > > > correct?). > > > > > How could I achieve the desired result? How can I make the loop wait > > for > > > > > external data and then immediately send it? > > > > > > > > > > Thank you very much in advance, > > > > > Francesco Raviglione > > > > > > > > > --------------------------------------------------------------------- > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > > For additional commands, e-mail: users-h...@qpid.apache.org > > > > --------------------------------------------------------------------- To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org For additional commands, e-mail: users-h...@qpid.apache.org