Dhammika,

1. When finalise function finds out that the initiation is over, it
sends command to itself, saying "unplug the engine and send it to
another thread".

2. The rest of the out_event executes.

3. The scheduler invokes processing of the event at this point. Thus,
out_event is not on the stack anymore.

I'm attaching a patch to exemplify what I had in mind. It's not perfect, it still fails with test_stress_shutdown, but it should give you an idea...

Martin
diff --git a/src/command.hpp b/src/command.hpp
index 31a0e54..03e43a2 100644
--- a/src/command.hpp
+++ b/src/command.hpp
@@ -37,6 +37,7 @@ namespace zmq
             stop,
             plug,
             own,
+            finalise,
             attach,
             bind,
             activate_reader,
@@ -64,6 +65,10 @@ namespace zmq
                 class own_t *object;
             } own;
 
+            //  Sent by object to itself to deallocate itself.
+            struct {
+            } finalise;
+
             //  Attach the engine to the session. If engine is NULL, it informs
             //  session that the connection have failed.
             struct {
diff --git a/src/object.cpp b/src/object.cpp
index dd8fc24..2774d07 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -79,6 +79,10 @@ void zmq::object_t::process_command (command_t &cmd_)
         process_seqnum ();
         break;
 
+    case command_t::finalise:
+        process_finalise ();
+        break;
+
     case command_t::attach:
         process_attach (cmd_.args.attach.engine,
             cmd_.args.attach.peer_identity ?
@@ -193,6 +197,17 @@ void zmq::object_t::send_own (own_t *destination_, own_t *object_)
     send_command (cmd);
 }
 
+void zmq::object_t::send_finalise (object_t *destination_)
+{
+    command_t cmd;
+#if defined ZMQ_MAKE_VALGRIND_HAPPY
+    memset (&cmd, 0, sizeof (cmd));
+#endif
+    cmd.destination = destination_;
+    cmd.type = command_t::finalise;
+    send_command (cmd);
+}
+
 void zmq::object_t::send_attach (session_t *destination_, i_engine *engine_,
     const blob_t &peer_identity_, bool inc_seqnum_)
 {
@@ -351,6 +366,11 @@ void zmq::object_t::process_own (own_t *object_)
     zmq_assert (false);
 }
 
+void zmq::object_t::process_finalise ()
+{
+    zmq_assert (false);
+}
+
 void zmq::object_t::process_attach (i_engine *engine_,
     const blob_t &peer_identity_)
 {
diff --git a/src/object.hpp b/src/object.hpp
index f8cfdda..b6b6276 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -67,6 +67,7 @@ namespace zmq
             bool inc_seqnum_ = true);
         void send_own (class own_t *destination_,
             class own_t *object_);
+        void send_finalise (class object_t *destination_);
         void send_attach (class session_t *destination_,
              struct i_engine *engine_, const blob_t &peer_identity_,
              bool inc_seqnum_ = true);
@@ -88,6 +89,7 @@ namespace zmq
         virtual void process_stop ();
         virtual void process_plug ();
         virtual void process_own (class own_t *object_);
+        virtual void process_finalise ();
         virtual void process_attach (struct i_engine *engine_,
             const blob_t &peer_identity_);
         virtual void process_bind (class reader_t *in_pipe_,
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index a796faa..c9ccba5 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -36,6 +36,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
     own_t (io_thread_, options_),
     sent (false),
     received (false),
+    finalised (false),
     socket (socket_),
     session (session_),
     io_thread (io_thread_)
@@ -66,7 +67,11 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
 
     //  If initialisation is done, pass the engine to the session and
     //  destroy the init object.
-    finalise_initialisation ();
+    if (sent && received && !finalised) {
+        zmq_assert (engine);
+        finalised = true;
+        send_finalise (this);
+    }
 
     return true;
 }
@@ -103,7 +108,11 @@ void zmq::zmq_init_t::flush ()
 
     //  If initialisation is done, pass the engine to the session and
     //  destroy the init object.
-    finalise_initialisation ();
+    if (sent && received && !finalised) {
+        zmq_assert (engine);
+        finalised = true;
+        send_finalise (this);
+    }
 }
 
 void zmq::zmq_init_t::detach ()
@@ -133,71 +142,74 @@ void zmq::zmq_init_t::process_unplug ()
         engine->unplug ();
 }
 
-void zmq::zmq_init_t::finalise_initialisation ()
+void zmq::zmq_init_t::process_finalise ()
 {
-    if (sent && received) {
-
-        //  If we know what session we belong to, it's easy, just send the
-        //  engine to that session and destroy the init object. Note that we
-        //  know about the session only if this object is owned by it. Thus,
-        //  lifetime of this object in contained in the lifetime of the session
-        //  so the pointer cannot become invalid without notice.
-        if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, true);
-            engine = NULL;
-            terminate ();
-            return;
-        }
-
-        //  All the cases below are listener-based. Therefore we need the socket
-        //  reference so that new sessions can bind to that socket.
-        zmq_assert (socket);
-
-        //  We have no associated session. If the peer has no identity we'll
-        //  create a transient session for the connection. Note that
-        //  seqnum is incremented to account for attach command before the
-        //  session is launched. That way we are sure it won't terminate before
-        //  being attached.
-        if (peer_identity [0] == 0) {
-            session = new (std::nothrow) transient_session_t (io_thread,
-                socket, options);
-            zmq_assert (session);
-            session->inc_seqnum ();
-            launch_sibling (session);
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
-            terminate ();
-            return;
-        }
-        
-        //  Try to find the session corresponding to the peer's identity.
-        //  If found, send the engine to that session and destroy this object.
-        //  Note that session's seqnum is incremented by find_session rather
-        //  than by send_attach.
-        session = socket->find_session (peer_identity);
-        if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
-            terminate ();
-            return;
-        }
-
-        //  There's no such named session. We have to create one. Note that
-        //  seqnum is incremented to account for attach command before the
-        //  session is launched. That way we are sure it won't terminate before
-        //  being attached.
-        session = new (std::nothrow) named_session_t (io_thread, socket,
-            options, peer_identity);
+    //  If the engine have disconnected in the meantime, we just have to
+    //  deallocate the init object.
+    if (!engine) {
+        terminate ();
+        return;
+    }
+
+    //  Disconnect the engine from the poller.
+    engine->unplug ();
+
+    //  If we know what session we belong to, it's easy, just send the
+    //  engine to that session and destroy the init object. Note that we
+    //  know about the session only if this object is owned by it. Thus,
+    //  lifetime of this object in contained in the lifetime of the session
+    //  so the pointer cannot become invalid without notice.
+    if (session) {
+        send_attach (session, engine, peer_identity, true);
+        engine = NULL;
+        terminate ();
+        return;
+    }
+
+    //  All the cases below are listener-based. Therefore we need the socket
+    //  reference so that new sessions can bind to that socket.
+    zmq_assert (socket);
+
+    //  We have no associated session. If the peer has no identity we'll
+    //  create a transient session for the connection. Note that
+    //  seqnum is incremented to account for attach command before the
+    //  session is launched. That way we are sure it won't terminate before
+    //  being attached.
+    if (peer_identity [0] == 0) {
+        session = new (std::nothrow) transient_session_t (io_thread,
+            socket, options);
         zmq_assert (session);
         session->inc_seqnum ();
         launch_sibling (session);
-        engine->unplug ();
         send_attach (session, engine, peer_identity, false);
         engine = NULL;
         terminate ();
         return;
     }
+    
+    //  Try to find the session corresponding to the peer's identity.
+    //  If found, send the engine to that session and destroy this object.
+    //  Note that session's seqnum is incremented by find_session rather
+    //  than by send_attach.
+    session = socket->find_session (peer_identity);
+    if (session) {
+        send_attach (session, engine, peer_identity, false);
+        engine = NULL;
+        terminate ();
+        return;
+    }
+
+    //  There's no such named session. We have to create one. Note that
+    //  seqnum is incremented to account for attach command before the
+    //  session is launched. That way we are sure it won't terminate before
+    //  being attached.
+    session = new (std::nothrow) named_session_t (io_thread, socket,
+        options, peer_identity);
+    zmq_assert (session);
+    session->inc_seqnum ();
+    launch_sibling (session);
+    send_attach (session, engine, peer_identity, false);
+    engine = NULL;
+    terminate ();
+    return;
 }
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 511f141..5dd58ec 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -43,7 +43,8 @@ namespace zmq
 
     private:
 
-        void finalise_initialisation ();
+        //  Command handlers.
+        void process_finalise ();
 
         //  i_inout interface implementation.
         bool read (::zmq_msg_t *msg_);
@@ -64,6 +65,9 @@ namespace zmq
         //  True if peer's identity was already received.
         bool received;
 
+        //  True if 'finalise' command was already sent.
+        bool finalised;
+
         //  Socket the object belongs to.
         class socket_base_t *socket;
 
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to