Hello,

We manage to reproduce the bug in a simplified use case.
Please find attached the "Broker.hpp" along with the test "Main.cpp".
The usage was inspired from the proton C++ multi-threaded examples and the
doc:
https://github.com/apache/qpid-proton/blob/master/cpp/docs/mt.md#thread-safety-rules
.
Each of container, connection, and session have different handlers.
The connection and session handlers are destroyed when they go out of scope.
Handlers hold a copy on the proton objects as members.

If we run the test without adding the line 142 (which releases the proton
session) we have race condition that leads to a segfault.

>From the cores, it is always segfaulting on the ref counting.
This is why we proposed the atomic types in C11.

Best regards.
Jeremy and Rabih


On Tue, Apr 23, 2019 at 9:28 AM Cliff Jansen <[email protected]> wrote:

> I am of the same opinion as Gordon that fixing the counting (to be atomic)
> will only address the current symptom without preventing other races that
> will present themselves later or on other hardware architectures.
>
> Having two threads participating in Proton object reference counting is
> problematic as you can't guarantee which ones will end up running the
> finalizers (or resurectors if the reference count increases during
> finalization).
>
> The simplest way to ensure thread safety is to abstain from all direct
> (except for the handful of documented thread-safe interfaces) and indirect
> calls into the Proton engine from any non-callback thread.  In C++ the
> latter is more difficult, as there can be hidden reference counting if you
> have Proton objects in your application objects.  There you have be certain
> that you never have such objects being copied or destroyed in a
> non-callback thread.  Alternatively, you can use dumb pointers to the
> Proton objects and use some application mechanism (mutex, condition
> variable...) to allow the non-callback thread to know when the pointer to
> the Proton object is valid.
>
> Cliff
>
> On Mon, Apr 22, 2019 at 1:49 AM Gordon Sim <[email protected]> wrote:
>
> > On 19/04/2019 11:02 am, Rabih M wrote:
> > > We are only using proton objects in the handler, however, at
> destruction
> > > time, there is a race condition on the reference counter between the
> > proton
> > > objects from the main thread that are being destroyed, and the proton
> > > objects held by the proton thread.
> >
> > Can you elaborate a little on this? Access to (or destruction of)
> > objects from distinct connection should not need to be coordinated,
> > though objects for the same connection clearly have to be.
> >
> > Is the issue you are seeing related to some global state (e.g. sasl
> > destruction)?
> >
> > What calls are being made that are in a race? Is this plain C or c++
> > with implicit calls being made by destructors?
> >
> > > The reference counter not being atomic is very error prone for users.
> > > How about transforming the int to an atomic int (already provided in
> C11
> > > standard: https://en.cppreference.com/w/c/language/atomic)?
> >
> > My initial thought is that just making the counter atomic would not be
> > sufficient for general races, so it would be important to understand
> > what specific uses cases this was designed to protect.
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: [email protected]
> > For additional commands, e-mail: [email protected]
> >
> >
>
#include <proton/connection.hpp>
#include <proton/container.hpp>
#include <proton/session.hpp>
#include <proton/work_queue.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/connection_options.hpp>
#include <proton/session_options.hpp>

#include <Broker.hpp>

#include <iostream>
#include <thread>
#include <future>
#include <functional>
#include <string>


struct Container : public proton::messaging_handler {
   Container(std::function<void()> onContainerStarted, std::function<void()> 
onError) : 
      m_onContainerStarted(onContainerStarted),
      m_onError(onError) {
   }

   void on_container_start(proton::container&) override {
      m_onContainerStarted();
   }

   void on_error(const proton::error_condition&) override {
      m_onError();
   }

   std::function<void()> m_onContainerStarted;
   std::function<void()> m_onError;
};

struct Connection : public proton::messaging_handler {
   Connection(std::function<void(proton::connection&)> onConnectionOpened, 
std::function<void()> onConnectionClosed, std::function<void()> onError) :
      m_onConnectionOpened(onConnectionOpened),
      m_onConnectionClosed(onConnectionClosed),
      m_onError(onError){
   }

   void on_connection_open(proton::connection& connection) override {
      m_connection = connection;
      m_onConnectionOpened(m_connection);
   }

   void on_connection_close(proton::connection&) override {
      m_onConnectionClosed();
   }

   void on_connection_error(proton::connection&) override {
      m_onError();
   }

   std::function<void(proton::connection&)> m_onConnectionOpened;
   std::function<void()> m_onConnectionClosed;
   std::function<void()> m_onError;
   proton::connection m_connection;
};

struct Session : public proton::messaging_handler {
   Session(std::function<void(proton::session&)> onSessionOpened, 
std::function<void()> onSessionClosed, std::function<void()> onError) : 
      m_onSessionOpened(onSessionOpened),
      m_onSessionClosed(onSessionClosed),
      m_onError(onError) {
   }

   void on_session_open(proton::session& session) override{
      m_session = session;
      m_onSessionOpened(m_session);
   }

   void on_session_close(proton::session&) override {
      m_onSessionClosed();
   }

   std::function<void(proton::session&)> m_onSessionOpened;
   std::function<void()> m_onSessionClosed;
   std::function<void()> m_onError;
   proton::session m_session;
};

int main() {
{
    try {
        std::string url("//127.0.0.1:5672");
        Broker brk(url, "examples");

        // Starting container
        std::promise<void> containerStarted;
        Container containerHandler(
         [&]{
            std::cout << "Container started" << std::endl;
            containerStarted.set_value();
         },
         [] {
            std::cout << "Container failed" << std::endl;
         }
        );
        proton::container container(containerHandler);

        std::thread clientThread([&]() {
         try {
            container.run();
         }
         catch (const std::exception& e) {
            std::cout << "Client threw exception: " << e.what() << std::endl;
         }
        });
        containerStarted.get_future().wait();

        {
            // Opening connection and session in the same scope
            // handlers will be destroyed at the end of the scope
            std::promise<proton::connection&> connectionOpened;
            std::promise<void> connectionClosed;
            Connection connectionHandler(
             [&connectionOpened](proton::connection& c){
                std::cout << "Connection opened" << std::endl;
                connectionOpened.set_value(c);
             },
             [&]{
                std::cout << "Connection closed" << std::endl;
                connectionClosed.set_value();
             },
             [] {
                std::cout << "Connection failed" << std::endl;
             }
            );
            container.connect(url, 
proton::connection_options().handler(connectionHandler));
            proton::connection& connection = 
connectionOpened.get_future().get();

            std::promise<proton::session&> sessionOpened;
            std::promise<void> sessionClosed;
            Session sessionHandler(
            [&sessionOpened](proton::session& s){
               sessionOpened.set_value(s);
            },
            [&]{
               // !!!!!! without the following line we encounter random 
segfaults !!!!!!
               sessionHandler.m_session = proton::session();
               sessionClosed.set_value();
            },
            [] {
               std::cout << "session failed" << std::endl;
            }
            );
            connection.work_queue().add([&]() {
            
connection.open_session(proton::session_options().handler(sessionHandler));
            });
            proton::session& session = sessionOpened.get_future().get();
            
            connection.work_queue().add([&]() {
               session.close();
            });

            // Waiting for session to close
            sessionClosed.get_future().get();

            connection.work_queue().add([&]() {
               connection.close();
            });
            // Waiting for connection to close
            connectionClosed.get_future().wait();
        }

        if (clientThread.joinable()) {
         clientThread.join();
        }
    }
    catch (std::exception& e) {
      std::cout << e.what() << std::endl;
    }
    return 0;
}
#ifndef TEST_BROKER_HPP
#define TEST_BROKER_HPP

#include <proton/listener.hpp>
#include <proton/messaging_handler.hpp>
#include <proton/listen_handler.hpp>

#include <proton/imperative/ThreadRAII.hpp>

#include <string>
#include <iostream>
#include <thread>
#include <future>

class listenerHandler : public proton::listen_handler
{
public:
   std::future<void> getStartedFuture()
   {
      return m_containerStarted.get_future();
   }

private:
   void on_open(proton::listener&) override
   {
      m_containerStarted.set_value();
   }

   void on_error(proton::listener&, const std::string& s) override
   {
      
m_containerStarted.set_exception(std::make_exception_ptr(std::runtime_error(s)));
   }

   std::promise<void> m_containerStarted;
};

class Broker : public proton::messaging_handler
{
public:
   Broker(const std::string& url, const std::string& destination)
      :m_url(url + "/" + destination)
   {
      m_brokerThread = std::thread([&]() {
         try
         {
            proton::container(*this).run();
         }
         catch (const std::exception& e) {
            std::cerr << "Broker threw exception: " << e.what() << std::endl;
         }});

      // Wait for the container to start
      m_listenerHandler.getStartedFuture().get();
   }

   ~Broker()
   {
      if (!m_isClosed) {
         m_listener.stop();
      }
   }

   void injectMessages(std::vector<proton::message> messages)
   {
      m_messages.insert(m_messages.end(), messages.begin(), messages.end());
   }

   int m_acceptedMsgs = 0;
   int m_rejectedMsgs = 0;
   int m_releasedMsgs = 0;

private:
   void on_container_start(proton::container &c) override
   {
      std::cout << "broker on_container_start" << std::endl;
      c.receiver_options(proton::receiver_options());
      m_listener = c.listen(m_url, m_listenerHandler);
   }

   void on_connection_open(proton::connection &c) override
   {
      std::cout << "broker on_connection_open" << std::endl;
      m_connection = c;
      c.open();
   }

   void on_connection_close(proton::connection&) override
   {
      std::cout << "broker on_connection_close" << std::endl;
      m_listener.stop();
   }

   void on_transport_error(proton::transport &t) override
   {
      std::cout << "broker on_transport_error" << std::endl;
      std::cerr << "Broker::on_transport_error: " << t.error().what() << 
std::endl;
      m_listener.stop();
   }

   void on_error(const proton::error_condition &c) override
   {
      std::cout << "broker on_error" << std::endl;
      std::cerr << "Broker::on_error: " << c.what() << std::endl;

      m_isClosed = true;
      m_listener.stop();
   }

   void on_message(proton::delivery& delivery, proton::message& message) 
override
   {
      std::cout << "broker on_message" << std::endl;
      m_currentDelivery = { message, delivery };
   }

   void sendMessages(proton::sender& sender)
   {
      size_t numberOfMessagesToSend = 
std::min(static_cast<size_t>(sender.credit()), m_messages.size());

      for (auto count = numberOfMessagesToSend; count > 0; --count)
      {
         auto message = m_messages.front();
         m_messages.pop_front();
         sender.send(message);
      }
   }

   void on_sendable(proton::sender& sender) override
   {
      std::cout << "broker on_sendable" << std::endl;
      sendMessages(sender);
   }

   void on_tracker_accept(proton::tracker& ) override
   {
      ++m_acceptedMsgs;
   }

   void on_tracker_reject(proton::tracker&) override
   {
      ++m_rejectedMsgs;
   }

   void on_tracker_release(proton::tracker&) override
   {
      ++m_releasedMsgs;
   }

private:
   std::string m_url;
   bool m_isClosed = false;
   proton::ThreadRAII m_brokerThread;
   std::deque<proton::message> m_messages;
   std::pair<proton::message, proton::delivery> m_currentDelivery;
   listenerHandler m_listenerHandler;

   proton::listener m_listener;
   proton::connection m_connection;
};

#endif
#0  0x0000000000000001 in ?? ()
#1  0x00007f1887849200 in pn_class_decref (clazz=0x7f1878009220, 
object=0x7f1878009d10)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/object/object.c:91
#2  0x00007f188784adf5 in pn_record_finalize (object=0x7f1878009dc0)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/object/record.c:51
#3  0x00007f1887849283 in pn_class_free (clazz=0x7f1887a707c0 <clazz>, 
object=0x7f1878009dc0)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/object/object.c:124
#4  0x00007f188784945f in pn_free (object=<optimized out>)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/object/object.c:263
#5  0x00007f188785304c in pn_session_finalize (object=0x7f187800d5c0)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/engine.c:948
#6  0x00007f1887849218 in pn_class_decref (clazz=0x7f1887a70ac0 <clazz>, 
object=0x7f187800d5c0)
    at 
/proton-acceptance/proton-workspace/qpid-proton-0.27.0/c/src/core/object/object.c:95
#7  0x00000000004ae3dc in Session::~Session() ()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to