On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione
<francescorav.es...@gmail.com> wrote:
>
> Dear all,
> I'm experiencing some issues in writing an AMQP client with Qpid Proton C++.
> My client should only send messages to a particular queue on an ActiveMQ
> broker and it is not supposed to receive any message over that connection.
> The client should not send the messages as soon as it is started (with
> "proton::container(AMQP_client).run();"), but it should wait for the data
> to be provided by an external thread, which may become available even after
> some minutes (I cannot tell in advance when the data will be available, and
> there may be even a long time between two consecutive chunks of available
> data).
>
> If I try to supply the AMQP client loop with new data through a pipe (on
> which data is written by the external thread), I can write an "AMQP_client"
> class like the following:
>
> void AMQP_client::on_container_start(proton::container& c) {
>      c.connect(broker_address);
> }
>
> void AMQP_client::on_connection_open(proton::connection& c) {
>      c.open_sender(queue_name);
> }
>
> void AMQP_client::on_sendable(proton::sender &s) {
>      uint8_t buffer[1024];
>      int bufsize;
>      proton::message amqp_msg;
>
>      // Wait for new data to be sent (wait for data to be written on the
> pipe)
>      if((bufsize=read(pipe_read_end,&buffer,1024))==-1) {
>          perror("read() error");
>          return;
>      }
>
>      amqp_msg.body(proton::binary(buffer,buffer+bufsize));
>
>      s.send(amqp_msg);
> }
>
> In this case, however, "on_sendable" blocks on the read() operation and, if
> the data becomes available few minutes after, the broker closes the
> connection as the client loop is completely blocked and cannot even send
> the heartbeat messages.
>

Yes, as the container thread is also responsible for performing the
IO. By blocking it, you simply stop it doing anything at all for the
connection (and any others in the container), both processing of
[not-]arriving data and sending of any more, such as for heartbeats if
not actual messaging work. So when the thread is eventually unblocked,
its likely going to find either it needs to disconnect the peer for
not sending the client heartbeats (if requested to) or live traffic in
time to satisfy the clients timeout, or the client has itself already
been disconnected by the peer for not sending the peer heartbeats (if
requested to) or live traffic in time to satisfy the peers timeout
(the idle timeouts operate independently in each direction).


> If, instead, I do not block on the read() operation (for instance I read()
> with a timeout, by using poll()), "on_sendable" is triggered only once and
> I cannot find any other event to trigger the transmission of a message when
> data becomes available.
>
> I know that, in Python, I could solve this issue for instance by relying on
> "EventInjector", but I'm unable to find a similar solution with the C++
> version of the library (I would prefer to stick with C++, in this case, and
> not to fall back to Qpid Proton C).
>
> Do you know how I can solve this problem? Is there a way to "inject"
> external aperiodic events/data to be sent via AMQP?
>

Hopefully those with more/any clue about the C++ bits can hopefully
provide a better answer, but...

I believe that is what
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html
is aimed at. An example with multiple threads using it is at
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html,
and 
http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html
also makes use of it, though only from the single container thread
with some scheduling.




> Thank you very much in advance,
> Francesco Raviglione

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org
For additional commands, e-mail: users-h...@qpid.apache.org

Reply via email to