Dear Robbie, dear Gordon,
Thank you very much for your replies and for your very useful and helpful
suggestions.
Looking better at the "db_send.py" and "tx_recv_interactive.py" examples, I
was finally able to code a working example by using the same loop as
before, but adding an EventInjector with:
data_event=EventInjector()
amqp_container=Container(AMQPHandler(arg1,arg2))
amqp_container.selectable(ue_data_event)

Then, by defining a new event inside the event loop (the AMQPHandler class):
def on_data_event:
<tab> # Manage event...

And then by "provoking" the event from another loop, which waits from data
from a pipe and calls "data_event.trigger()" when new data is available to
be sent via AMQP:
while True:
<tab># Read from pipe
<tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv()
<tab># ...
<tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP))

Everything seems to work fine now.

I noticed that, however, especially when managing more senders (even quite
a lot of them, reaching > 100) inside the same event loop (inside the same
AMQPHandler class, using the same connection), the per-message delay
between when data is produced and when it can be consumed/it is
successfully consumed by an external consumer, is significantly higher,
sometimes, that what I can achieve with a Python's BlockingConnection.
In particular, sometimes it is even higher than 2 or 3 seconds.
Is this due to the event loop being "too busy" managing a lot of senders,
and thus delaying the actual transmission of data?

As I needed to write a client application trying to ensure the lowest
producer-to-consumer latency possible, I ended up in trying another (much
less regular) solution, with BlockingConnection(), that I just reported as
a reply to the other thread (
http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html
).

Thank you very much again,
Francesco

Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell <
robbie.gemm...@gmail.com> ha scritto:

> On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione
> <francescorav.es...@gmail.com> wrote:
> >
> > Dear Adrian,
> > Thank you very much for your reply.
> >
> > I tried following your suggestion, by attempting to find a way to call an
> > external function reading the data from an external source, and returning
> > it when available.
> >
> > However, unfortunately, it seems to be slightly easier to control the
> > behaviour of the event loop in C than in Pyhton.
> > In particular, in Python, if I call any blocking function (even a
> > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable", the
> > messages are only buffered and never sent (do you know why? Is the
> > "on_sendable" handler supposed to be executed as fast as possible without
> > blocking to have the messages being sent immediately?).
>
> There is only one thread, it is used for running the callbacks and
> performing the IO. Anynew IO is processed after the handlers return,
> so if you block the handlers, you block the IO thread, meaning it
> can't do any work and thus cant send anything.
>
> > If, instead, I just do not block and send the data only if available (for
> > instance by implementing a mechanism equivalent to a non-blocking read
> from
> > a pipe, with poll(), letting "on_sendable" finish without sending
> anything,
> > if there is no data available), only one "on_sendable" event is generated
> > and the event loop does not seem to return any other event which I could
> > use to send the data, which may be now available (for example, the
> > equivalent of C's PN_DELIVERY does not seem to exist).
> >
>
> The on_sendable is generated when credit to send initially arrives
> from the server, or does again later. I believe it's also
> 'regenerated' locally when you send, thus using some of the credit (so
> long as some credit remains). As neither occurred, no more on_sendable
> calls.
>
> I believe the EventInjector was the intended route for things like
> this, performing/provoking work from outwith the container thread:
>
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector
>
> Its used in these examples:
>
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html
>
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html
>
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html
>
> You can also provoke your own callbacks by scheduling, so an
> alternative might be scheduling (while using the container thread, not
> your own) a task of your own to periodically run. Example:
>
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html
>
>
> > Do you know if there are other ways to solve this issue?
> > When you were "yielding" the control of the main loop to an external
> > function, was it blocking in some way, waiting for data to be available?
> >
> > Thank you very much,
> > Francesco
> >
> >
> > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea <
> florea....@gmail.com>
> > ha scritto:
> >
> > > Hi Francesco,
> > >
> > > I achieved similar behavior but using Qpid Proton-C (AMQP send-only
> > > program).
> > >
> > > After initialiazing proton and getting the first FLOW event, I simply
> > > "yield" main event loop control to an external/looping function that
> can
> > > read from a different source and then send the same message to an AMQP
> > > destination.
> > >
> > > It may seem hard in the beggining but once you get a hold on how to
> > > integrate proton events loop into your program loop, it has good
> chances to
> > > work.
> > >
> > > Adrian
> > >
> > >
> > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione <
> > > francescorav.es...@gmail.com> wrote:
> > >
> > > > Dear all,
> > > > I'm trying to use the Python version of Qpid Proton to send data to
> an
> > > AMQP
> > > > 1.0 broker.
> > > > I've seen that several examples are sending data which was
> > > > available/defined before the AMQP event loop is started.
> > > > However, I need to send data only when it becomes available from an
> > > > external process (for example by waiting for it on a Pipe, with
> > > > self.amqp_pipe_end.recv()), and then send it immediately to the
> broker.
> > > > If I try to wait on a pipe inside "on_sendable", no message is
> actually
> > > > transferred to the broker, as the event loop is "blocked" waiting
> for new
> > > > data and cannot manage properly the AMQP 1.0 connection (is this
> > > correct?).
> > > > How could I achieve the desired result? How can I make the loop wait
> for
> > > > external data and then immediately send it?
> > > >
> > > > Thank you very much in advance,
> > > > Francesco Raviglione
> > > >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
>

Reply via email to