>From 91fdedf25c4d76b0ec0aeb5d1d9f1c9a1a769447 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sat, 17 Dec 2011 10:14:32 +0100
Subject: [PATCH] Fix polling on XREP socket

When polling on XREP socket in incoming message part was prefetched,
but not the identity of sender. The problem is fixed by this patch.

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/xrep.cpp |   45 ++++++++++++++++++++++++++++++++++++---------
 src/xrep.hpp |    8 ++++++--
 2 files changed, 42 insertions(+), 11 deletions(-)

diff --git a/src/xrep.cpp b/src/xrep.cpp
index f304b23..520fa24 100644
--- a/src/xrep.cpp
+++ b/src/xrep.cpp
@@ -29,7 +29,7 @@
 
 zmq::xrep_t::xrep_t (class ctx_t *parent_, uint32_t tid_) :
     socket_base_t (parent_, tid_),
-    prefetched (false),
+    prefetched (0),
     more_in (false),
     current_out (NULL),
     more_out (false),
@@ -180,12 +180,23 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_)
 
 int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
 {
+    //  if there is a prefetched identity, return it.
+    if (prefetched == 2)
+    {
+        int rc = msg_->init_size (prefetched_id.size ());
+        errno_assert (rc == 0);
+        memcpy (msg_->data (), prefetched_id.data (), prefetched_id.size ());
+        msg_->set_flags (msg_t::more);
+        prefetched = 1;
+        return 0;
+    }
+
     //  If there is a prefetched message, return it.
-    if (prefetched) {
+    if (prefetched == 1) {
         int rc = msg_->move (prefetched_msg);
         errno_assert (rc == 0);
         more_in = msg_->flags () & msg_t::more ? true : false;
-        prefetched = false;
+        prefetched = 0;
         return 0;
     }
 
@@ -235,7 +246,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_)
     //  have to the prefetched and return the ID of the peer instead.
     int rc = prefetched_msg.move (*msg_);
     errno_assert (rc == 0);
-    prefetched = true;
+    prefetched = 1;
     rc = msg_->close ();
     errno_assert (rc == 0);
 
@@ -259,16 +270,32 @@ int zmq::xrep_t::rollback (void)
 
 bool zmq::xrep_t::xhas_in ()
 {
+    //  If we are in  the middle of reading the messages, there are
+    //  definitely more parts available.
+    if (more_in)
+        return true;
+
     //  We may already have a message pre-fetched.
-    if (prefetched)
+    if (prefetched > 0)
         return true;
 
-    //  Try to read the next message to the pre-fetch buffer.
-    int rc = xrep_t::xrecv (&prefetched_msg, ZMQ_DONTWAIT);
-    if (rc != 0 && errno == EAGAIN)
+    //  Try to read the next message to the pre-fetch buffer. If anything,
+    //  it will be identity of the peer sending the message.
+    msg_t id;
+    id.init ();
+    int rc = xrep_t::xrecv (&id, ZMQ_DONTWAIT);
+    if (rc != 0 && errno == EAGAIN) {
+        id.close ();
         return false;
+    }
     zmq_assert (rc == 0);
-    prefetched = true;
+
+    //  We have first part of the message prefetched now. We will store the
+    //  prefetched identity as well.
+    prefetched_id.assign ((unsigned char*) id.data (), id.size ());
+    id.close ();
+    prefetched = 2;
+
     return true;
 }
 
diff --git a/src/xrep.hpp b/src/xrep.hpp
index df82d00..65bd564 100644
--- a/src/xrep.hpp
+++ b/src/xrep.hpp
@@ -67,8 +67,12 @@ namespace zmq
         //  Fair queueing object for inbound pipes.
         fq_t fq;
 
-        //  Have we prefetched a message.
-        bool prefetched;
+        //  This value is either 0 (nothing is prefetched), 1 (only message body
+        //  is prefetched) or 2 (both identity and message body are prefetched).
+        int prefetched;
+
+        //  Holds the prefetched identity.
+        blob_t prefetched_id;
 
         //  Holds the prefetched message.
         msg_t prefetched_msg;
-- 
1.7.4.1

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to