The patch is also available in bidi-pipe branch.
>From 718885fdcd7af797f940078ca8c22aebab93c8bb Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Thu, 26 May 2011 11:30:25 +0200
Subject: [PATCH] Pending messages are delivered even if connection doesn't exist yet

Bug in previous refactoring fixed.

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/pipe.cpp       |    3 +-
 src/session.cpp    |  110 ++++++++++++++++++++++++----------------------------
 src/session.hpp    |   10 +---
 src/tcp_socket.cpp |    2 +-
 4 files changed, 57 insertions(+), 68 deletions(-)

diff --git a/src/pipe.cpp b/src/pipe.cpp
index 73e5aae..48fc3e5 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -216,11 +216,12 @@ void zmq::pipe_t::process_pipe_term ()
         if (!delay) {
             state = terminating;
             send_pipe_term_ack (peer);
+            return;
         }
         else {
             state = pending;
+            return;
         }
-        return;
     }
 
     //  Delimiter happened to arrive before the term command. Now we have the
diff --git a/src/session.cpp b/src/session.cpp
index bff452e..5601402 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -31,7 +31,7 @@ zmq::session_t::session_t (class io_thread_t *io_thread_,
     io_object_t (io_thread_),
     pipe (NULL),
     incomplete_in (false),
-    terminating (false),
+    pending (false),
     engine (NULL),
     socket (socket_),
     io_thread (io_thread_),
@@ -121,8 +121,11 @@ void zmq::session_t::terminated (pipe_t *pipe_)
     zmq_assert (pipe == pipe_);
     pipe = NULL;
 
-    if (terminating)
-        unregister_term_ack ();
+    //  If we are waiting for pending messages to be sent, at this point
+    //  we are sure that there will be no more messages and we can proceed
+    //  with termination safely.
+    if (pending)
+        proceed_with_term ();
 }
 
 void zmq::session_t::read_activated (pipe_t *pipe_)
@@ -150,15 +153,6 @@ void zmq::session_t::process_plug ()
 void zmq::session_t::process_attach (i_engine *engine_,
     const blob_t &peer_identity_)
 {
-    //  If we are already terminating, we destroy the engine straight away.
-    //  Note that we don't have to unplug it before deleting as it's not
-    //  yet plugged to the session.
-    if (terminating) {
-        if (engine_)
-            delete engine_;
-        return;
-    }
-
     //  If some other object (e.g. init) notifies us that the connection failed
     //  without creating an engine we need to start the reconnection process.
     if (!engine_) {
@@ -217,37 +211,52 @@ void zmq::session_t::detach ()
 
 void zmq::session_t::process_term (int linger_)
 {
-    //  If termination is already underway, do nothing.
-    if (!terminating) {
-
-        terminating = true;
-
-		//  If the termination of the pipe happens before the term command is
-		//  delivered there's nothing much to do. We can proceed with the
-		//  stadard termination immediately.
-		if (pipe) {
-
-			//  We're going to wait till the pipe terminates.
-			register_term_acks (1);
-
-			//  If linger is set to zero, we can ask pipe to terminate without
-			//  waiting for pending messages to be read.
-			if (linger_ == 0)
-				pipe->terminate ();
-
-			//  If there's finite linger value, set up a timer.
-			if (linger_ > 0) {
-			   zmq_assert (!has_linger_timer);
-			   add_timer (linger_, linger_timer_id);
-			   has_linger_timer = true;
-			}
-
-			//  In case there's no engine and there's only delimiter in the pipe it
-			//  wouldn't be ever read. Thus we check for it explicitly.
-			pipe->check_read ();
-		}
+    zmq_assert (!pending);
+
+    //  If the termination of the pipe happens before the term command is
+    //  delivered there's nothing much to do. We can proceed with the
+    //  stadard termination immediately.
+    if (!pipe) {
+        proceed_with_term ();
+        return;
+    }
+
+    //  If linger is set to zero, we can ask pipe to terminate without
+    //  waiting for pending messages to be read.
+    if (linger_ == 0) {
+        proceed_with_term ();
+        return;
+    }
+
+    pending = true;
+
+    //  If there's finite linger value, delay the termination.
+    //  If linger is infinite (negative) we don't even have to set
+    //  the timer.
+    if (linger_ > 0) {
+        zmq_assert (!has_linger_timer);
+        add_timer (linger_, linger_timer_id);
+        has_linger_timer = true;
     }
 
+    //  In case there's no engine and there's only delimiter in the
+    //  pipe it wouldn't be ever read. Thus we check for it explicitly.
+    pipe->check_read ();
+}
+
+void zmq::session_t::proceed_with_term ()
+{
+    //  The pending phase have just ended.
+    pending = false;
+
+    //  If there's pipe attached to the session, we have to wait till it
+    //  terminates.
+    if (pipe) {
+        register_term_acks (1);
+        pipe->terminate ();
+    }
+
+    //  Continue with standard termination.
     own_t::process_term (0);
 }
 
@@ -260,7 +269,7 @@ void zmq::session_t::timer_event (int id_)
 
     //  Ask pipe to terminate even though there may be pending messages in it.
     zmq_assert (pipe);
-    pipe->terminate ();
+    proceed_with_term ();
 }
 
 bool zmq::session_t::has_engine ()
@@ -278,21 +287,4 @@ void zmq::session_t::unregister_session (const blob_t &name_)
     socket->unregister_session (name_);
 }
 
-void zmq::session_t::terminate ()
-{
-    //  If termination process is already underway, do nothing.
-    if (!terminating) {
-		terminating = true;
-		
-		//  If the pipe was already terminated, there's nothing much to do.
-		//  If it wasn't, we'll ask it to terminate.
-		if (pipe) {
-
-			register_term_acks (1);
-			pipe->terminate ();
-		}
-    }
-
-	own_t::terminate ();
-}
 
diff --git a/src/session.hpp b/src/session.hpp
index f1564d8..8bca735 100644
--- a/src/session.hpp
+++ b/src/session.hpp
@@ -59,10 +59,6 @@ namespace zmq
 
     protected:
 
-        //  This function allows to shut down the session even though
-        //  there are messages pending.
-        void terminate ();
-
         //  Two events for the derived session type. Attached is triggered
         //  when session is attached to a peer. The function can reject the new
         //  peer by returning false. Detached is triggered at the beginning of
@@ -105,9 +101,9 @@ namespace zmq
         //  is still in the in pipe.
         bool incomplete_in;
 
-        //  If true the termination process is already underway, ie. term ack
-        //  for the pipe was already registered etc.
-        bool terminating;
+        //  True if termination have been suspended to push the pending
+        //  messages to the network.
+        bool pending;
 
         //  The protocol I/O engine connected to the session.
         struct i_engine *engine;
diff --git a/src/tcp_socket.cpp b/src/tcp_socket.cpp
index 2257e4f..3c9b1fd 100644
--- a/src/tcp_socket.cpp
+++ b/src/tcp_socket.cpp
@@ -213,7 +213,7 @@ int zmq::tcp_socket_t::read (void *data_, size_t size_)
 
     errno_assert (nbytes != -1);
 
-    //  Orderly shutdown by the other peer.
+    //  Orderly shutdown by the peer.
     if (nbytes == 0)
         return -1;
 
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to