>From 1c239708ab174c1de9f99e256d23158f74a24dbc Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sun, 13 Nov 2011 10:33:49 +0100
Subject: [PATCH] Couple of bugs in XREP handling of identities fixed. wq:
 Signed-off-by: Martin Sustrik <[email protected]>

---
 src/xrep.cpp |   37 ++++++++++++++++++++++---------------
 1 files changed, 22 insertions(+), 15 deletions(-)

diff --git a/src/xrep.cpp b/src/xrep.cpp
index ea19e56..336d4e5 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -189,14 +189,18 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
         return 0;
     }
 
-    //  Get next message part.
-    pipe_t *pipe;
-    int rc = fq.recvpipe (msg_, flags_, &pipe);
-    if (rc != 0)
-        return -1;
-
-    //  If identity is received, change the key assigned to the pipe.
-    if (unlikely (msg_->flags () & msg_t::identity)) {
+    pipe_t *pipe = NULL;
+    while (true) {
+
+        //  Get next message part.
+        int rc = fq.recvpipe (msg_, flags_, &pipe);
+        if (rc != 0)
+            return -1;
+
+        //  If identity is received, change the key assigned to the pipe.
+        if (likely (!(msg_->flags () & msg_t::identity)))
+            break;
+
         zmq_assert (!more_in);
 
         //  Empty identity means we can preserve the auto-generated identity.
@@ -219,11 +223,6 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
             }
             zmq_assert (it != outpipes.end ());
         }
-
-        //  After processing the identity, try to get the next message.
-        rc = fq.recvpipe (msg_, flags_, &pipe);
-        if (rc != 0)
-            return -1;
     }
 
     //  If we are in the middle of reading a message, just return the next part.
@@ -234,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
  
     //  We are at the beginning of a new message. Move the message part we
     //  have to the prefetched and return the ID of the peer instead.
-    rc = prefetched_msg.move (*msg_);
+    int rc = prefetched_msg.move (*msg_);
     errno_assert (rc == 0);
     prefetched = true;
     rc = msg_->close ();
@@ -260,9 +259,17 @@ int zmq::xrep_t::rollback (void)
 
 bool zmq::xrep_t::xhas_in ()
 {
+    //  We may already have a message pre-fetched.
     if (prefetched)
         return true;
-    return fq.has_in ();
+
+    //  Try to read the next message to the pre-fetch buffer.
+    int rc = xrecv (&prefetched_msg, ZMQ_DONTWAIT);
+    if (rc != 0 && errno == EAGAIN)
+        return false;
+    zmq_assert (rc == 0);
+    prefetched = true;
+    return true;
 }
 
 bool zmq::xrep_t::xhas_out ()
-- 
1.7.5.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to