This patch is available also in bidi-pipes branch.
>From a24a7c15a824bb48da38809bff9416673dc5a176 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Tue, 31 May 2011 14:36:51 +0200
Subject: [PATCH] Session termination induced by socket fixed

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/pipe.cpp        |   18 ++++++++++++------
 src/pipe.hpp        |    5 +++--
 src/session.cpp     |   21 ++++++---------------
 src/socket_base.cpp |    4 ++--
 4 files changed, 23 insertions(+), 25 deletions(-)

diff --git a/src/pipe.cpp b/src/pipe.cpp
index fd7223c..26f7d85 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -301,8 +301,11 @@ void zmq::pipe_t::process_pipe_term_ack ()
     delete this;
 }
 
-void zmq::pipe_t::terminate ()
+void zmq::pipe_t::terminate (bool delay_)
 {
+    //  Overload the value specified at pipe creation.
+    delay = delay_;
+
     //  If terminate was already called, we can ignore the duplicit invocation.
     if (state == terminated || state == double_terminated)
         return;
@@ -321,9 +324,13 @@ void zmq::pipe_t::terminate ()
 
     //  There are still pending messages available, but the user calls
     //  'terminate'. We can act as if all the pending messages were read.
-    else if (state == pending) {
-        send_pipe_term_ack (peer);
-        state = terminating;
+    else if (state == pending && !delay) {
+            send_pipe_term_ack (peer);
+            state = terminating;
+    }
+
+    //  If there are pending messages still availabe, do nothing.
+    else if (state == pending && delay) {
     }
 
     //  We've already got delimiter, but not term command yet. We can ignore
@@ -338,8 +345,7 @@ void zmq::pipe_t::terminate ()
     else
         zmq_assert (false);
 
-    //  Stop inbound and outbound flow of messages.
-    in_active = false;
+    //  Stop outbound flow of messages.
     out_active = false;
 
     //  Rollback any unfinished outbound messages.
diff --git a/src/pipe.hpp b/src/pipe.hpp
index bf34a83..d3bf866 100644
--- a/src/pipe.hpp
+++ b/src/pipe.hpp
@@ -94,8 +94,9 @@ namespace zmq
 
         //  Ask pipe to terminate. The termination will happen asynchronously
         //  and user will be notified about actual deallocation by 'terminated'
-        //  event.
-        void terminate ();
+        //  event. If delay is true, the pending messages will be processed
+        //  before actual shutdown.
+        void terminate (bool delay_);
 
     private:
 
diff --git a/src/session.cpp b/src/session.cpp
index c9f4fdb..8f29248 100644
--- a/src/session.cpp
+++ b/src/session.cpp
@@ -228,13 +228,6 @@ void zmq::session_t::process_term (int linger_)
         return;
     }
 
-    //  If linger is set to zero, we can ask pipe to terminate without
-    //  waiting for pending messages to be read.
-    if (linger_ == 0) {
-        proceed_with_term ();
-        return;
-    }
-
     pending = true;
 
     //  If there's finite linger value, delay the termination.
@@ -246,6 +239,11 @@ void zmq::session_t::process_term (int linger_)
         has_linger_timer = true;
     }
 
+    //  Start pipe termination process. Delay the termination till all messages
+    //  are processed in case the linger time is non-zero.
+    pipe->terminate (linger_ != 0);
+
+    //  TODO: Should this go into pipe_t::terminate ?
     //  In case there's no engine and there's only delimiter in the
     //  pipe it wouldn't be ever read. Thus we check for it explicitly.
     pipe->check_read ();
@@ -256,13 +254,6 @@ void zmq::session_t::proceed_with_term ()
     //  The pending phase have just ended.
     pending = false;
 
-    //  If there's pipe attached to the session, we have to wait till it
-    //  terminates.
-    if (pipe) {
-        register_term_acks (1);
-        pipe->terminate ();
-    }
-
     //  Continue with standard termination.
     own_t::process_term (0);
 }
@@ -276,7 +267,7 @@ void zmq::session_t::timer_event (int id_)
 
     //  Ask pipe to terminate even though there may be pending messages in it.
     zmq_assert (pipe);
-    proceed_with_term ();
+    pipe->terminate (false);
 }
 
 bool zmq::session_t::has_engine ()
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index 59e2653..2b1d8af 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -233,7 +233,7 @@ void zmq::socket_base_t::attach_pipe (pipe_t *pipe_,
     //  straight away.
     if (is_terminating ()) {
         register_term_acks (1);
-        pipe_->terminate ();
+        pipe_->terminate (false);
     }
 }
 
@@ -740,7 +740,7 @@ void zmq::socket_base_t::process_term (int linger_)
 
     //  Ask all attached pipes to terminate.
     for (pipes_t::size_type i = 0; i != pipes.size (); ++i)
-        pipes [i]->terminate ();
+        pipes [i]->terminate (false);
     register_term_acks (pipes.size ());
 
     //  Continue the termination process immediately.
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to