Hello,
I am using proton cpp 0.27.1.
While working on the C++ imperative API, I encountered a problem while
using container.stop.
Please find the test code attached (a simplified version of the current
implementation).
I attached the core I am getting too.
The problem:
In the example I attached, the container.stop in the ImperativeContainer
destructor might have a race condition with the closing of the connection.
Is everything related to this connection is freed when on_transport_close
is called? because in the on_transport_close I signal for the main thread
to continue and execute container.stop which might create a race condition
if some actions are taken after on_transport_close is called...
A probable solution:
I think we need a sure way to be able to release a handler:
It might be an unregister handler function or a call back after
on_transport_close that tells us that we can release the memory of the
handler.
Best regards,
Rabih
struct ContainerHandler : public proton::messaging_handler {
void on_container_start(proton::container&) override {
std::cout << "client on_container_start" << std::endl;
m_onOpenPromise.set_value();
}
void on_container_stop(proton::container&) override {
std::cout << "client on_container_stop" << std::endl;
m_onStopPromise.set_value();
}
std::promise<void> m_onOpenPromise;
std::promise<void> m_onStopPromise;
};
struct ImperativeContainer {
ImperativeContainer()
: m_container(m_containerHand)
{
auto openContainerFuture = m_containerHand.m_onOpenPromise.get_future();
m_thread = std::thread([&]() {
try {
m_container.auto_stop(false);
m_container.run();
}
catch (const std::exception& e) {
std::cout << "std::exception caught on pn container, message:" <<
e.what() << std::endl;
}});
openContainerFuture.get();
}
~ImperativeContainer() {
auto closeContainerFuture = m_containerHand.m_onStopPromise.get_future();
m_container.stop();
closeContainerFuture.get();
if (m_thread.joinable()) {
m_thread.join();
}
}
ContainerHandler m_containerHand;
proton::container m_container;
std::thread m_thread;
};
struct ConnectionHandler : public proton::messaging_handler {
void on_connection_open(proton::connection& conn) override {
std::cout << "client on_connection_open" << std::endl;
m_connection = conn;
m_work = &m_connection.work_queue();
m_onOpenPromise.set_value();
}
void on_transport_close(proton::transport&) override {
std::cout << "client on_transport_close" << std::endl;
//to avoid race condition in free
m_connection = proton::connection();
m_onClosePromise.set_value();
std::cout << "client on_transport_close finished" << std::endl;
}
void on_connection_close(proton::connection&) {
std::cout << "client on_connection_close" << std::endl;
}
void on_connection_error(proton::connection& conn) {
std::cout << "client on_connection_error: " << conn.error().what() <<
std::endl;
}
void on_transport_open(proton::transport&) {
std::cout << "client on_transport_open" << std::endl;
}
void on_transport_error(proton::transport& t) {
std::cout << "client on_transport_error: " << t.error().what() <<
std::endl;
}
proton::connection m_connection;
proton::work_queue* m_work;
std::promise<void> m_onOpenPromise;
std::promise<void> m_onClosePromise;
};
class ImperativeConnection {
public:
ImperativeConnection(proton::container& myContainer, const std::strind& url)
{
auto openConnectionFuture = m_connectionHand.m_onOpenPromise.get_future();
myContainer.connect(url, m_connectionHand);
openConnectionFuture.get();
}
~ImperativeConnection() {
auto closeConnectionFuture =
m_connectionHand.m_onClosePromise.get_future();
m_connectionHand.m_work->add([=]()
{m_connectionHand.m_connection.close(); });
closeConnectionFuture.wait();
}
private:
ConnectionHandler m_connectionHand;
};
int main(int argc, char **argv) {
std::string url = argc > 1 ? argv[1] : "//127.0.0.1:5672";
ImperativeContainer cont;
ImperativeConnection conn(cont.m_container, url);
}
#0 pn_error_free (error=0xffffffff00000000)
at /data/home/proton/qpid-proton-0.27.1/c/src/core/error.c:50
#1 0x00007f5439a301b7 in pn_data_finalize (object=0x1a22420)
at /data/home/proton/qpid-proton-0.27.1/c/src/core/codec.c:90
#2 0x00007f5439a2d218 in pn_class_decref (clazz=0x7f5439c54880 <clazz>,
object=0x1a22420)
at /data/home/proton/qpid-proton-0.27.1/c/src/core/object/object.c:95
#3 0x00007f5439a36f1d in pn_condition_tini (condition=0x1a24990)
at /data/home/proton/qpid-proton-0.27.1/c/src/core/engine.c:219
#4 0x00007f5439a396a6 in pn_condition_free (c=0x1a24990)
at /data/home/proton/qpid-proton-0.27.1/c/src/core/engine.c:227
#5 0x00007f5439c59728 in pconnection_final_free (pc=0x1a2e4a0)
at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:888
#6 0x00007f5439c5a384 in pconnection_cleanup (pc=<optimized out>)
at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:905
#7 0x00007f5439c5b21a in pconnection_process (pc=0x1a2e4a0,
events=events@entry=0, timeout=timeout@entry=false, topup=topup@entry=false,
is_io_2=is_io_2@entry=false)
at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:1272
#8 0x00007f5439c5bcf2 in process_inbound_wake (ee=0x1a24510, p=0x1a24410)
at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:2092
#9 proactor_do_epoll (p=0x1a24410, can_block=true)
at /data/home/proton/qpid-proton-0.27.1/c/src/proactor/epoll.c:2129
#10 0x00007f543a9c3c1c in proton::container::impl::thread
(this=this@entry=0x1a238a0)
at
/data/home/proton/qpid-proton-0.27.1/cpp/src/proactor_container_impl.cpp:736
#11 0x00007f543a9c4250 in proton::container::impl::run (this=0x1a238a0,
threads=<optimized out>)
at
/data/home/proton/qpid-proton-0.27.1/cpp/src/proactor_container_impl.cpp:788
#12 0x00007f543ac32906 in
proton::Container::Container()::{lambda()#1}::operator()() const () at
/src/new/rmourad/qpid-imperative-proton/src/Container.cpp:17
#13 0x00007f543ac341e2 in
std::_Bind_simple<proton::Container::Container()::<lambda()>()>::_M_invoke<>(std::_Index_tuple<>)
(this=0x1a21178)
at /opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/functional:1700
#14 0x00007f543ac34127 in
std::_Bind_simple<proton::Container::Container()::<lambda()>()>::operator()(void)
(this=0x1a21178) at
/opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/functional:1688
#15 0x00007f543ac340a4 in
std::thread::_Impl<std::_Bind_simple<proton::Container::Container()::<lambda()>()>
>::_M_run(void) (this=0x1a21160)
at /opt/rh/devtoolset-3/root/usr/include/c++/4.9.2/thread:115
#16 0x00007f543a744470 in ?? () from /usr/lib64/libstdc++.so.6
#17 0x00007f543ae66aa1 in start_thread () from /lib64/libpthread.so.0
#18 0x00007f5439f48c4d in clone () from /lib64/libc.so.6
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]