>From 00dc0245e6aacbff247c84ac8480d3ddcabacd5a Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sun, 19 Jun 2011 11:17:20 +0200
Subject: [PATCH] Race condition in pipe_t fixed.

pipe_t now correctly drops pointer to the underlying pipe when
sending pipe_term_ack command.

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/pipe.cpp |   38 +++++++++++++++++++++++++-------------
 1 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/src/pipe.cpp b/src/pipe.cpp
index 26f7d85..20c7f69 100644
--- a/src/pipe.cpp
+++ b/src/pipe.cpp
@@ -165,10 +165,12 @@ void zmq::pipe_t::rollback ()
 {
     //  Remove incomplete message from the outbound pipe.
     msg_t msg;
-    while (outpipe->unwrite (&msg)) {
-        zmq_assert (msg.flags () & msg_t::more);
-        int rc = msg.close ();
-        errno_assert (rc == 0);
+    if (outpipe) {
+		while (outpipe->unwrite (&msg)) {
+		    zmq_assert (msg.flags () & msg_t::more);
+		    int rc = msg.close ();
+		    errno_assert (rc == 0);
+		}
     }
 }
 
@@ -238,6 +240,7 @@ void zmq::pipe_t::process_pipe_term ()
     if (state == active) {
         if (!delay) {
             state = terminating;
+            outpipe = NULL;
             send_pipe_term_ack (peer);
             return;
         }
@@ -251,6 +254,7 @@ void zmq::pipe_t::process_pipe_term ()
     //  term command as well, so we can move straight to terminating state.
     if (state == delimited) {
         state = terminating;
+        outpipe = NULL;
         send_pipe_term_ack (peer);
         return;
     }
@@ -260,6 +264,7 @@ void zmq::pipe_t::process_pipe_term ()
     //  own ack.
     if (state == terminated) {
         state = double_terminated;
+        outpipe = NULL;
         send_pipe_term_ack (peer);
         return;
     }
@@ -280,8 +285,10 @@ void zmq::pipe_t::process_pipe_term_ack ()
     //  are invalid.
     if (state == terminating) ;
     else if (state == double_terminated);
-    else if (state == terminated)
+    else if (state == terminated) {
+        outpipe = NULL;
         send_pipe_term_ack (peer);
+    }
     else
         zmq_assert (false);
 
@@ -325,6 +332,7 @@ void zmq::pipe_t::terminate (bool delay_)
     //  There are still pending messages available, but the user calls
     //  'terminate'. We can act as if all the pending messages were read.
     else if (state == pending && !delay) {
+            outpipe = NULL;
             send_pipe_term_ack (peer);
             state = terminating;
     }
@@ -348,15 +356,18 @@ void zmq::pipe_t::terminate (bool delay_)
     //  Stop outbound flow of messages.
     out_active = false;
 
-    //  Rollback any unfinished outbound messages.
-    rollback ();
+    if (outpipe) {
 
-    //  Push delimiter into the outbound pipe. Note that watermarks are not
-    //  checked thus the delimiter can be written even though the pipe is full.
-    msg_t msg;
-    msg.init_delimiter ();
-    outpipe->write (msg, false);
-    flush ();
+		//  Rollback any unfinished outbound messages.
+		rollback ();
+
+		//  Push delimiter into the outbound pipe. Note that watermarks are not
+		//  checked thus the delimiter can be written even though the pipe is full.
+		msg_t msg;
+		msg.init_delimiter ();
+		outpipe->write (msg, false);
+		flush ();
+    }
 }
 
 bool zmq::pipe_t::is_delimiter (msg_t &msg_)
@@ -400,6 +411,7 @@ void zmq::pipe_t::delimit ()
     }
 
     if (state == pending) {
+        outpipe = NULL;
         send_pipe_term_ack (peer);
         state = terminating;
         return;
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to