The patch is also available in bidi-pipes branch.
>From ee7313b4d896e9f7ff6a035395b20f617e4ff796 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Tue, 31 May 2011 16:21:17 +0200
Subject: [PATCH] Subscriptions are processed immediately in XPUB socket

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/xpub.cpp |  127 +++++++++++++++++++---------------------------------------
 src/xpub.hpp |    8 ----
 2 files changed, 42 insertions(+), 93 deletions(-)

diff --git a/src/xpub.cpp b/src/xpub.cpp
index 9158cf4..4b41696 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -39,12 +39,41 @@ void zmq::xpub_t::xattach_pipe (pipe_t *pipe_, const blob_t &peer_identity_)
 {
     zmq_assert (pipe_);
     dist.attach (pipe_);
-    fq.attach (pipe_);
+
+    //  The pipe is active when attached. Let's read the subscriptions from
+    //  it, if any.
+    xread_activated (pipe_);
 }
 
 void zmq::xpub_t::xread_activated (pipe_t *pipe_)
 {
-    fq.activated (pipe_);
+    //  There are some subscriptions waiting. Let's process them.
+    msg_t sub;
+    sub.init ();
+    while (true) {
+
+        //  Grab next subscription.
+        if (!pipe_->read (&sub)) {
+            sub.close ();
+            return;
+        }
+
+        //  Apply the subscription to the trie.
+        unsigned char *data = (unsigned char*) sub.data ();
+        size_t size = sub.size ();
+        zmq_assert (size > 0 && (*data == 0 || *data == 1));
+        bool unique;
+		if (*data == 0)
+		    unique = subscriptions.rm (data + 1, size - 1, pipe_);
+		else
+		    unique = subscriptions.add (data + 1, size - 1, pipe_);
+
+        //  If the subscription is not a duplicate store it so that it can be
+        //  passed to used on next recv call.
+        if (unique && options.type != ZMQ_PUB)
+            pending.push_back (blob_t ((unsigned char*) sub.data (),
+                sub.size ()));
+    }
 }
 
 void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
@@ -60,31 +89,10 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
     subscriptions.rm (pipe_, send_unsubscription, this);
 
     dist.terminated (pipe_);
-    fq.terminated (pipe_);
 }
 
 int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
-{
-    //  First, process any (un)subscriptions from downstream.
-    msg_t sub;
-    sub.init ();
-    while (true) {
-
-        //  Grab next subscription.
-        pipe_t *pipe;
-        int rc = fq.recvpipe (&sub, 0, &pipe);
-        if (rc != 0 && errno == EAGAIN)
-            break;
-        errno_assert (rc == 0);
-
-        //  Apply the subscription to the trie. If it's not a duplicate,
-        //  store it so that it can be passed to used on next recv call.
-        if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB)
-            pending.push_back (blob_t ((unsigned char*) sub.data (),
-                sub.size ()));
-    }
-    sub.close ();
-    
+{    
     return dist.send (msg_, flags_);
 }
 
@@ -96,75 +104,24 @@ bool zmq::xpub_t::xhas_out ()
 int zmq::xpub_t::xrecv (msg_t *msg_, int flags_)
 {
     //  If there is at least one 
-    if (!pending.empty ()) {
-        int rc = msg_->close ();
-        errno_assert (rc == 0);
-        rc = msg_->init_size (pending.front ().size ());
-        errno_assert (rc == 0);
-        memcpy (msg_->data (), pending.front ().data (),
-            pending.front ().size ());
-        pending.pop_front ();
-        return 0;
-    }
-
-    //  Grab and apply next subscription.
-    pipe_t *pipe;
-    int rc = fq.recvpipe (msg_, 0, &pipe);
-    if (rc != 0)
-        return -1;
-    if (!apply_subscription (msg_, pipe)) {
-//  TODO: This should be a loop rather!
-        msg_->close ();
-        msg_->init ();
+    if (pending.empty ()) {
         errno = EAGAIN;
         return -1;
     }
+
+    int rc = msg_->close ();
+    errno_assert (rc == 0);
+    rc = msg_->init_size (pending.front ().size ());
+    errno_assert (rc == 0);
+    memcpy (msg_->data (), pending.front ().data (),
+        pending.front ().size ());
+    pending.pop_front ();
     return 0;
 }
 
 bool zmq::xpub_t::xhas_in ()
 {
-    if (!pending.empty ())
-        return true;
-
-    //  Even if there are subscriptions in the fair-queuer they may be
-    //  duplicates. Thus, we have to check by hand wheter there is any
-    //  subscription available to pass upstream.
-    //  First, process any (un)subscriptions from downstream.
-    msg_t sub;
-    sub.init ();
-    while (true) {
-
-        //  Grab next subscription.
-        pipe_t *pipe;
-        int rc = fq.recvpipe (&sub, 0, &pipe);
-        if (rc != 0 && errno == EAGAIN) {
-            sub.close ();
-            return false;
-        }
-        errno_assert (rc == 0);
-
-        //  Apply the subscription to the trie. If it's not a duplicate store
-        //  it so that it can be passed to used on next recv call.
-        if (apply_subscription (&sub, pipe) && options.type != ZMQ_PUB) {
-            pending.push_back (blob_t ((unsigned char*) sub.data (),
-                sub.size ()));
-            sub.close ();
-            return true;
-        }
-    }
-}
-
-bool zmq::xpub_t::apply_subscription (msg_t *sub_, pipe_t *pipe_)
-{
-    unsigned char *data = (unsigned char*) sub_->data ();
-    size_t size = sub_->size ();
-    zmq_assert (size > 0 && (*data == 0 || *data == 1));
-
-    if (*data == 0)
-        return subscriptions.rm (data + 1, size - 1, pipe_);
-    else
-        return subscriptions.add (data + 1, size - 1, pipe_);
+    return !pending.empty ();
 }
 
 void zmq::xpub_t::send_unsubscription (unsigned char *data_, size_t size_,
diff --git a/src/xpub.hpp b/src/xpub.hpp
index b824548..c5d64b5 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -28,7 +28,6 @@
 #include "array.hpp"
 #include "blob.hpp"
 #include "dist.hpp"
-#include "fq.hpp"
 
 namespace zmq
 {
@@ -53,10 +52,6 @@ namespace zmq
 
     private:
 
-        //  Applies the subscription to the trie. Return false if it is a
-        //  duplicate.
-        bool apply_subscription (class msg_t *sub_, class pipe_t *pipe_);
-
         //  Function to be applied to the trie to send all the subsciptions
         //  upstream.
         static void send_unsubscription (unsigned char *data_, size_t size_,
@@ -68,9 +63,6 @@ namespace zmq
         //  Distributor of messages holding the list of outbound pipes.
         dist_t dist;
 
-        //  Object to fair-queue the subscription requests.
-        fq_t fq;
-
         //  List of pending (un)subscriptions, ie. those that were already
         //  applied to the trie, but not yet received by the user.
         typedef std::deque <blob_t> pending_t;
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to