Dear Robbie,
I'm very sorry for the late reply, but I had a few very busy days at work.

Thank you very much for your reply.

I will try to narrow down the issue and perform more tests, looking,
especially, if "high latency" peaks are always followed by "low latency"
measurements, as if transfers were batched.
In general, from what I observed so far, the "high latency" was typically
assuming the form of "peaks" during the execution of each test with a
producer and a consumer, more or less suddenly followed by "normal", lower,
latency values, but more investigation could be surely useful to try to
better understand the problem.

Thank you very much also for the information about the possibility
of opening a single "anonymous" sender. I wasn't aware about this feature,
which I will surely test soon.
If the broker I'm using is supporting it (but I really think so) and
everything is working well in my use case, I will update and optimize my
code by relying on a single sender.

Thank you very much,
Francesco


Il giorno lun 20 lug 2020 alle ore 12:23 Robbie Gemmell <
robbie.gemm...@gmail.com> ha scritto:

> I'd wonder whether if you are piling up lots of events for different
> senders then it's potentially (again, I haven't used the Injector
> bits, and Python isn't particularly my thing) just that many such
> events are all piled up, and all processed before the resulting IO can
> then be, effectively batching things up. That might be observed as
> 'higher latency' for the earlier sends, tapering to 'lower latency'
> for the later ones in a sequence processed together. When you switch
> to using a BlockingConnection, you presumably change it to processing
> a single case at a time in full and see 'lower latency', by virtue of
> doing and measuring rather different things.
>
> Or it could be something completely different. Or there could be some
> problem/issue, potentially even on the server too, hard to speculate.
> If you think there is one, try to narrow it down and supply a
> reproducer someone could look at.
>
> As an aside: rather than having hundreds of senders, if the server you
> are using supports it and your overall use case suits it, you can also
> open a single sender to the 'anonymous relay' by omitting an address
> when creating it (I believe by using None in Python) and then have
> each message specifically carry its destination address (I believe
> passing address=<value> when creating the Message object in Python)
>
> Robbie
>
> On Fri, 17 Jul 2020 at 19:21, Francesco Raviglione
> <francescorav.es...@gmail.com> wrote:
> >
> > Dear Robbie, dear Gordon,
> > Thank you very much for your replies and for your very useful and helpful
> > suggestions.
> > Looking better at the "db_send.py" and "tx_recv_interactive.py"
> examples, I
> > was finally able to code a working example by using the same loop as
> > before, but adding an EventInjector with:
> > data_event=EventInjector()
> > amqp_container=Container(AMQPHandler(arg1,arg2))
> > amqp_container.selectable(ue_data_event)
> >
> > Then, by defining a new event inside the event loop (the AMQPHandler
> class):
> > def on_data_event:
> > <tab> # Manage event...
> >
> > And then by "provoking" the event from another loop, which waits from
> data
> > from a pipe and calls "data_event.trigger()" when new data is available
> to
> > be sent via AMQP:
> > while True:
> > <tab># Read from pipe
> > <tab>data_to_be_sent_via_AMQP=pipe_end.recv() # blocking recv()
> > <tab># ...
> >
> <tab>data_event.trigger(ApplicationEvent("data_event",subject=data_to_be_sent_via_AMQP))
> >
> > Everything seems to work fine now.
> >
> > I noticed that, however, especially when managing more senders (even
> quite
> > a lot of them, reaching > 100) inside the same event loop (inside the
> same
> > AMQPHandler class, using the same connection), the per-message delay
> > between when data is produced and when it can be consumed/it is
> > successfully consumed by an external consumer, is significantly higher,
> > sometimes, that what I can achieve with a Python's BlockingConnection.
> > In particular, sometimes it is even higher than 2 or 3 seconds.
> > Is this due to the event loop being "too busy" managing a lot of senders,
> > and thus delaying the actual transmission of data?
> >
> > As I needed to write a client application trying to ensure the lowest
> > producer-to-consumer latency possible, I ended up in trying another (much
> > less regular) solution, with BlockingConnection(), that I just reported
> as
> > a reply to the other thread (
> >
> http://qpid.2158936.n2.nabble.com/Getting-local-idle-timeout-expired-on-blocking-connection-possible-bug-td7692985.html
> > ).
> >
> > Thank you very much again,
> > Francesco
> >
> > Il giorno ven 17 lug 2020 alle ore 12:14 Robbie Gemmell <
> > robbie.gemm...@gmail.com> ha scritto:
> >
> > > On Thu, 16 Jul 2020 at 14:36, Francesco Raviglione
> > > <francescorav.es...@gmail.com> wrote:
> > > >
> > > > Dear Adrian,
> > > > Thank you very much for your reply.
> > > >
> > > > I tried following your suggestion, by attempting to find a way to
> call an
> > > > external function reading the data from an external source, and
> returning
> > > > it when available.
> > > >
> > > > However, unfortunately, it seems to be slightly easier to control the
> > > > behaviour of the event loop in C than in Pyhton.
> > > > In particular, in Python, if I call any blocking function (even a
> > > > time.sleep(0.1), sleeping for just 100 ms) inside "on_sendable", the
> > > > messages are only buffered and never sent (do you know why? Is the
> > > > "on_sendable" handler supposed to be executed as fast as possible
> without
> > > > blocking to have the messages being sent immediately?).
> > >
> > > There is only one thread, it is used for running the callbacks and
> > > performing the IO. Anynew IO is processed after the handlers return,
> > > so if you block the handlers, you block the IO thread, meaning it
> > > can't do any work and thus cant send anything.
> > >
> > > > If, instead, I just do not block and send the data only if available
> (for
> > > > instance by implementing a mechanism equivalent to a non-blocking
> read
> > > from
> > > > a pipe, with poll(), letting "on_sendable" finish without sending
> > > anything,
> > > > if there is no data available), only one "on_sendable" event is
> generated
> > > > and the event loop does not seem to return any other event which I
> could
> > > > use to send the data, which may be now available (for example, the
> > > > equivalent of C's PN_DELIVERY does not seem to exist).
> > > >
> > >
> > > The on_sendable is generated when credit to send initially arrives
> > > from the server, or does again later. I believe it's also
> > > 'regenerated' locally when you send, thus using some of the credit (so
> > > long as some credit remains). As neither occurred, no more on_sendable
> > > calls.
> > >
> > > I believe the EventInjector was the intended route for things like
> > > this, performing/provoking work from outwith the container thread:
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/docs/proton.reactor.html#proton.reactor.EventInjector
> > >
> > > Its used in these examples:
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_send.py.html
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/db_recv.py.html
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/tx_recv_interactive.py.html
> > >
> > > You can also provoke your own callbacks by scheduling, so an
> > > alternative might be scheduling (while using the container thread, not
> > > your own) a task of your own to periodically run. Example:
> > >
> > >
> http://qpid.apache.org/releases/qpid-proton-0.31.0/proton/python/examples/recurring_timer.py.html
> > >
> > >
> > > > Do you know if there are other ways to solve this issue?
> > > > When you were "yielding" the control of the main loop to an external
> > > > function, was it blocking in some way, waiting for data to be
> available?
> > > >
> > > > Thank you very much,
> > > > Francesco
> > > >
> > > >
> > > > Il giorno mer 15 lug 2020 alle ore 18:40 Adrian Florea <
> > > florea....@gmail.com>
> > > > ha scritto:
> > > >
> > > > > Hi Francesco,
> > > > >
> > > > > I achieved similar behavior but using Qpid Proton-C (AMQP send-only
> > > > > program).
> > > > >
> > > > > After initialiazing proton and getting the first FLOW event, I
> simply
> > > > > "yield" main event loop control to an external/looping function
> that
> > > can
> > > > > read from a different source and then send the same message to an
> AMQP
> > > > > destination.
> > > > >
> > > > > It may seem hard in the beggining but once you get a hold on how to
> > > > > integrate proton events loop into your program loop, it has good
> > > chances to
> > > > > work.
> > > > >
> > > > > Adrian
> > > > >
> > > > >
> > > > > On Wed, Jul 15, 2020, 11:22 AM Francesco Raviglione <
> > > > > francescorav.es...@gmail.com> wrote:
> > > > >
> > > > > > Dear all,
> > > > > > I'm trying to use the Python version of Qpid Proton to send data
> to
> > > an
> > > > > AMQP
> > > > > > 1.0 broker.
> > > > > > I've seen that several examples are sending data which was
> > > > > > available/defined before the AMQP event loop is started.
> > > > > > However, I need to send data only when it becomes available from
> an
> > > > > > external process (for example by waiting for it on a Pipe, with
> > > > > > self.amqp_pipe_end.recv()), and then send it immediately to the
> > > broker.
> > > > > > If I try to wait on a pipe inside "on_sendable", no message is
> > > actually
> > > > > > transferred to the broker, as the event loop is "blocked" waiting
> > > for new
> > > > > > data and cannot manage properly the AMQP 1.0 connection (is this
> > > > > correct?).
> > > > > > How could I achieve the desired result? How can I make the loop
> wait
> > > for
> > > > > > external data and then immediately send it?
> > > > > >
> > > > > > Thank you very much in advance,
> > > > > > Francesco Raviglione
> > > > > >
> > >
> > > ---------------------------------------------------------------------
> > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> > > For additional commands, e-mail: users-h...@qpid.apache.org
> > >
> > >
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
> For additional commands, e-mail: users-h...@qpid.apache.org
>
>

Reply via email to