Hi all,

This patch provides first try on publisher side filtering. It is also available in bidi-pipes branch.

I don't have much free time to work on it, so it would be helpful if those interested in the feature grabbed the bidi-pipes branch, tested it and reported or even patched any problems.

Unfinished whitepaper about the design of the feature can be found here:

http://www.250bpm.com/pubsub

Martin

>From bd86def1c799a35d5cef0c0a9a1347a18fea227e Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sat, 11 Jun 2011 20:29:56 +0200
Subject: [PATCH] Actual message filtering happens in XPUB socket

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/dist.cpp  |   47 ++++++++++++++++++++++++++++++++++++++---------
 src/dist.hpp  |   20 +++++++++++++++++++-
 src/mtrie.cpp |   21 ++++++++++++++++-----
 src/mtrie.hpp |   11 +++++++----
 src/xpub.cpp  |   16 ++++++++++++++--
 src/xpub.hpp  |    3 +++
 src/xsub.cpp  |    4 ++--
 7 files changed, 99 insertions(+), 23 deletions(-)

diff --git a/src/dist.cpp b/src/dist.cpp
index f7f0488..707b9c1 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -25,6 +25,7 @@
 #include "likely.hpp"
 
 zmq::dist_t::dist_t () :
+    matching (0),
     active (0),
     eligible (0),
     more (false)
@@ -54,10 +55,27 @@ void zmq::dist_t::attach (pipe_t *pipe_)
     }
 }
 
+void zmq::dist_t::match (pipe_t *pipe_)
+{
+    //  If pipe is already matching do nothing.
+    if (pipes.index (pipe_) < matching)
+        return;
+
+    //  If the pipe isn't eligible, ignore it.
+    if (pipes.index (pipe_) >= eligible)
+        return;
+
+    //  Mark the pipe as matching.
+    pipes.swap (pipes.index (pipe_), matching);
+    matching++;    
+}
+
 void zmq::dist_t::terminated (pipe_t *pipe_)
 {
-    //  Remove the pipe from the list; adjust number of active and/or
+    //  Remove the pipe from the list; adjust number of matching, active and/or
     //  eligible pipes accordingly.
+    if (pipes.index (pipe_) < matching)
+        matching--;
     if (pipes.index (pipe_) < active)
         active--;
     if (pipes.index (pipe_) < eligible)
@@ -79,18 +97,27 @@ void zmq::dist_t::activated (pipe_t *pipe_)
     }
 }
 
-int zmq::dist_t::send (msg_t *msg_, int flags_)
+int zmq::dist_t::send_to_all (msg_t *msg_, int flags_)
+{
+    matching = active;
+    return send_to_matching (msg_, flags_);
+}
+
+int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
 {
     //  Is this end of a multipart message?
     bool msg_more = msg_->flags () & msg_t::more;
 
-    //  Push the message to active pipes.
+    //  Push the message to matching pipes.
     distribute (msg_, flags_);
 
     //  If mutlipart message is fully sent, activate all the eligible pipes.
     if (!msg_more)
         active = eligible;
 
+    //  Mark all the pipes as non-matching.
+    matching = 0;
+
     more = msg_more;
 
     return 0;
@@ -98,8 +125,8 @@ int zmq::dist_t::send (msg_t *msg_, int flags_)
 
 void zmq::dist_t::distribute (msg_t *msg_, int flags_)
 {
-    //  If there are no active pipes available, simply drop the message.
-    if (active == 0) {
+    //  If there are no matching pipes available, simply drop the message.
+    if (matching == 0) {
         int rc = msg_->close ();
         errno_assert (rc == 0);
         rc = msg_->init ();
@@ -107,12 +134,12 @@ void zmq::dist_t::distribute (msg_t *msg_, int flags_)
         return;
     }
 
-    //  Add active-1 references to the message. We already hold one reference,
+    //  Add matching-1 references to the message. We already hold one reference,
     //  that's why -1.
-    msg_->add_refs (active - 1);
+    msg_->add_refs (matching - 1);
 
-    //  Push copy of the message to each active pipe.
-    for (pipes_t::size_type i = 0; i < active;) {
+    //  Push copy of the message to each matching pipe.
+    for (pipes_t::size_type i = 0; i < matching;) {
         if (!write (pipes [i], msg_))
             msg_->rm_refs (1);
         else
@@ -133,6 +160,8 @@ bool zmq::dist_t::has_out ()
 bool zmq::dist_t::write (pipe_t *pipe_, msg_t *msg_)
 {
     if (!pipe_->write (msg_)) {
+        pipes.swap (pipes.index (pipe_), matching - 1);
+        matching--;
         pipes.swap (pipes.index (pipe_), active - 1);
         active--;
         pipes.swap (active, eligible - 1);
diff --git a/src/dist.hpp b/src/dist.hpp
index 10613c1..005bb60 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -38,11 +38,26 @@ namespace zmq
         dist_t ();
         ~dist_t ();
 
+        //  Adds the pipe to the distributor object.
         void attach (class pipe_t *pipe_);
+
+        //  Activates pipe that have previously reached high watermark.
         void activated (class pipe_t *pipe_);
+
+        //  Mark the pipe as matching. Subsequent call to send_to_matching
+        //  will send message also to this pipe.
+        void match (class pipe_t *pipe_);
+
+        //  Removes the pipe from the distributor object.
         void terminated (class pipe_t *pipe_);
 
-        int send (class msg_t *msg_, int flags_);
+        //  Send the message to all the outbound pipes. After the call all the
+        //  pipes are marked as non-matching.
+        int send_to_matching (class msg_t *msg_, int flags_);
+
+        //  Send the message to the matching outbound pipes.
+        int send_to_all (class msg_t *msg_, int flags_);
+
         bool has_out ();
 
     private:
@@ -58,6 +73,9 @@ namespace zmq
         typedef array_t <class pipe_t, 2> pipes_t;
         pipes_t pipes;
 
+        //  Number of all the pipes to send the next message to.
+        pipes_t::size_type matching;
+
         //  Number of active pipes. All the active pipes are located at the
         //  beginning of the pipes array. These are the pipes the messages
         //  can be sent to at the moment.
diff --git a/src/mtrie.cpp b/src/mtrie.cpp
index 91f6852..fafac2d 100644
--- a/src/mtrie.cpp
+++ b/src/mtrie.cpp
@@ -206,10 +206,21 @@ bool zmq::mtrie_t::rm_helper (unsigned char *prefix_, size_t size_,
     return next_node->rm_helper (prefix_ + 1, size_ - 1, pipe_);
 }
 
-void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
+void zmq::mtrie_t::match (unsigned char *data_, size_t size_,
+    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
 {
-    //  Merge the subscriptions from this node to the resultset.
-    pipes_.insert (pipes.begin (), pipes.end ());
+    match_helper (data_, size_, func_, arg_);
+}
+
+void zmq::mtrie_t::match_helper (unsigned char *data_, size_t size_,
+    void (*func_) (pipe_t *pipe_, void *arg_), void *arg_)
+{
+    //  TODO: This function is on critical path. Rewrite it as iteration
+    //  rather than recursion.
+
+    //  Signal the pipes attached to this node.
+    for (pipes_t::iterator it = pipes.begin (); it != pipes.end (); ++it)
+        func_ (*it, arg_);
 
     //  If there are no subnodes in the trie, return.
     if (count == 0)
@@ -217,14 +228,14 @@ void zmq::mtrie_t::match (unsigned char *data_, size_t size_, pipes_t &pipes_)
 
     //  If there's one subnode (optimisation).
     if (count == 1) {
-        next.node->match (data_ + 1, size_ - 1, pipes_);
+        next.node->match (data_ + 1, size_ - 1, func_, arg_);
         return;
     }
 
     //  If there are multiple subnodes.
     for (unsigned char c = 0; c != count; c++) {
         if (next.table [c])
-            next.table [c]->match (data_ + 1, size_ - 1, pipes_);
+            next.table [c]->match (data_ + 1, size_ - 1, func_, arg_);
     }   
 }
 
diff --git a/src/mtrie.hpp b/src/mtrie.hpp
index cd47029..68a3f2c 100644
--- a/src/mtrie.hpp
+++ b/src/mtrie.hpp
@@ -35,8 +35,6 @@ namespace zmq
     {
     public:
 
-        typedef std::set <class pipe_t*> pipes_t;
-
         mtrie_t ();
         ~mtrie_t ();
 
@@ -55,8 +53,9 @@ namespace zmq
         //  actually removed rather than de-duplicated.
         bool rm (unsigned char *prefix_, size_t size_, class pipe_t *pipe_);
 
-        //  Get all matching pipes.
-        void match (unsigned char *data_, size_t size_, pipes_t &pipes_);
+        //  Signal all the matching pipes.
+        void match (unsigned char *data_, size_t size_,
+            void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
 
     private:
 
@@ -68,8 +67,12 @@ namespace zmq
             void *arg_);
         bool rm_helper (unsigned char *prefix_, size_t size_,
             class pipe_t *pipe_);
+        void match_helper (unsigned char *data_, size_t size_,
+            void (*func_) (class pipe_t *pipe_, void *arg_), void *arg_);
 
+        typedef std::set <class pipe_t*> pipes_t;
         pipes_t pipes;
+
         unsigned char min;
         unsigned short count;
         union {
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 4b41696..9078de3 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -91,9 +91,21 @@ void zmq::xpub_t::xterminated (pipe_t *pipe_)
     dist.terminated (pipe_);
 }
 
+void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
+{
+    xpub_t *self = (xpub_t*) arg_;
+    self->dist.match (pipe_);
+}
+
 int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
-{    
-    return dist.send (msg_, flags_);
+{
+    //  Find the matching pipes.
+    subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
+        mark_as_matching, this);
+
+    //  Send the message to all the pipes that were marked as matching
+    //  in the previous step.
+    return dist.send_to_matching (msg_, flags_);
 }
 
 bool zmq::xpub_t::xhas_out ()
diff --git a/src/xpub.hpp b/src/xpub.hpp
index c5d64b5..740d1e2 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -57,6 +57,9 @@ namespace zmq
         static void send_unsubscription (unsigned char *data_, size_t size_,
             void *arg_);
 
+        //  Function to be applied to each matching pipes.
+        static void mark_as_matching (class pipe_t *pipe_, void *arg_);
+
         //  List of all subscriptions mapped to corresponding pipes.
         mtrie_t subscriptions;
 
diff --git a/src/xsub.cpp b/src/xsub.cpp
index 729f6a4..a847d7f 100644
--- a/src/xsub.cpp
+++ b/src/xsub.cpp
@@ -87,13 +87,13 @@ int zmq::xsub_t::xsend (msg_t *msg_, int flags_)
     // Process the subscription.
     if (*data == 1) {
         if (subscriptions.add (data + 1, size - 1))
-            return dist.send (msg_, flags_);
+            return dist.send_to_all (msg_, flags_);
         else
             return 0;
     }
     else if (*data == 0) {
         if (subscriptions.rm (data + 1, size - 1))
-            return dist.send (msg_, flags_);
+            return dist.send_to_all (msg_, flags_);
         else
             return 0;
     }
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to