Dear Robbie, You are right. Today I had time to have a more close and better look at the examples and I noticed the the "multithreaded_client" ( http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html) could be exactly what I'm looking for, as it is showing how work (i.e. sending messages, other than having a separate thread for processing received messages) is fed to the Qpid Proton thread from other external threads (i.e., if I understood this correctly, the "sender" thread, calling "cl.send(msg)", which causes new work to be added to the Qpid Proton thread with "work_queue()->add([=]() { sender_.send(msg); });").
Thank you again for your assistance, Francesco Raviglione Il giorno gio 26 nov 2020 alle ore 16:00 Robbie Gemmell < robbie.gemm...@gmail.com> ha scritto: > I dont think you have looked at multithreaded examples closely enough? > That's exactly what it says it's doing and looks to me to do (well, it > also has a 3rd non-container thread for processing received messages). > > On Thu, 26 Nov 2020 at 14:30, Francesco Raviglione > <francescorav.es...@gmail.com> wrote: > > > > Dear Robbie, > > First of all, sorry for the very late reply. > > > > Thank you very much for your reply and for the references to the C++ Work > > Queues. > > I have been quite busy in the past days with other projects, but I will > > definitely look more in detail into Work Queues and their usage with Qpid > > Proton C++. > > > > Looking at the examples, however, they do not seem to tackle the case in > > which work is added to the work queue from an external, non-Qpid Proton, > > thread. > > I assume I will need to find a way to create the work queue from the > sender > > with "&s.work_queue()", when the sender is opened (in "on_sender_open()", > > like in the examples), and then make the proton::work_queue object > > available outside the Qpid Proton class to be able to "inject" work from > > other external threads (would, maybe, making the "proton::work_queue" > > public work and, ensure, at the same time, thread safety?). > > > > Thank you very much, > > Francesco Raviglione > > > > > > > > Il giorno mer 18 nov 2020 alle ore 11:25 Robbie Gemmell < > > robbie.gemm...@gmail.com> ha scritto: > > > > > On Tue, 17 Nov 2020 at 19:59, Francesco Raviglione > > > <francescorav.es...@gmail.com> wrote: > > > > > > > > Dear all, > > > > I'm experiencing some issues in writing an AMQP client with Qpid > Proton > > > C++. > > > > My client should only send messages to a particular queue on an > ActiveMQ > > > > broker and it is not supposed to receive any message over that > > > connection. > > > > The client should not send the messages as soon as it is started > (with > > > > "proton::container(AMQP_client).run();"), but it should wait for the > data > > > > to be provided by an external thread, which may become available even > > > after > > > > some minutes (I cannot tell in advance when the data will be > available, > > > and > > > > there may be even a long time between two consecutive chunks of > available > > > > data). > > > > > > > > If I try to supply the AMQP client loop with new data through a pipe > (on > > > > which data is written by the external thread), I can write an > > > "AMQP_client" > > > > class like the following: > > > > > > > > void AMQP_client::on_container_start(proton::container& c) { > > > > c.connect(broker_address); > > > > } > > > > > > > > void AMQP_client::on_connection_open(proton::connection& c) { > > > > c.open_sender(queue_name); > > > > } > > > > > > > > void AMQP_client::on_sendable(proton::sender &s) { > > > > uint8_t buffer[1024]; > > > > int bufsize; > > > > proton::message amqp_msg; > > > > > > > > // Wait for new data to be sent (wait for data to be written on > the > > > > pipe) > > > > if((bufsize=read(pipe_read_end,&buffer,1024))==-1) { > > > > perror("read() error"); > > > > return; > > > > } > > > > > > > > amqp_msg.body(proton::binary(buffer,buffer+bufsize)); > > > > > > > > s.send(amqp_msg); > > > > } > > > > > > > > In this case, however, "on_sendable" blocks on the read() operation > and, > > > if > > > > the data becomes available few minutes after, the broker closes the > > > > connection as the client loop is completely blocked and cannot even > send > > > > the heartbeat messages. > > > > > > > > > > Yes, as the container thread is also responsible for performing the > > > IO. By blocking it, you simply stop it doing anything at all for the > > > connection (and any others in the container), both processing of > > > [not-]arriving data and sending of any more, such as for heartbeats if > > > not actual messaging work. So when the thread is eventually unblocked, > > > its likely going to find either it needs to disconnect the peer for > > > not sending the client heartbeats (if requested to) or live traffic in > > > time to satisfy the clients timeout, or the client has itself already > > > been disconnected by the peer for not sending the peer heartbeats (if > > > requested to) or live traffic in time to satisfy the peers timeout > > > (the idle timeouts operate independently in each direction). > > > > > > > > > > If, instead, I do not block on the read() operation (for instance I > > > read() > > > > with a timeout, by using poll()), "on_sendable" is triggered only > once > > > and > > > > I cannot find any other event to trigger the transmission of a > message > > > when > > > > data becomes available. > > > > > > > > I know that, in Python, I could solve this issue for instance by > relying > > > on > > > > "EventInjector", but I'm unable to find a similar solution with the > C++ > > > > version of the library (I would prefer to stick with C++, in this > case, > > > and > > > > not to fall back to Qpid Proton C). > > > > > > > > Do you know how I can solve this problem? Is there a way to "inject" > > > > external aperiodic events/data to be sent via AMQP? > > > > > > > > > > Hopefully those with more/any clue about the C++ bits can hopefully > > > provide a better answer, but... > > > > > > I believe that is what > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/api/classproton_1_1work__queue.html > > > is aimed at. An example with multiple threads using it is at > > > > > > > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/multithreaded_client.cpp.html > > > , > > > and > > > > http://qpid.apache.org/releases/qpid-proton-0.32.0/proton/cpp/examples/scheduled_send.cpp.html > > > also makes use of it, though only from the single container thread > > > with some scheduling. > > > > > > > > > > > > > > > > Thank you very much in advance, > > > > Francesco Raviglione > > > > > > --------------------------------------------------------------------- > > > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > > > For additional commands, e-mail: users-h...@qpid.apache.org > > > > > > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: users-unsubscr...@qpid.apache.org > For additional commands, e-mail: users-h...@qpid.apache.org > >