This patch is also available in bidi-pipes branch.
>From e080e3e8b620b0e7ed02c28712a0c92b08de3451 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sun, 12 Jun 2011 10:19:21 +0200
Subject: [PATCH] Publisher-side filtering for multi-part messages fixed

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/dist.cpp |    8 +++++---
 src/dist.hpp |    8 +++++---
 src/xpub.cpp |   25 ++++++++++++++++++++-----
 src/xpub.hpp |    3 +++
 4 files changed, 33 insertions(+), 11 deletions(-)

diff --git a/src/dist.cpp b/src/dist.cpp
index 707b9c1..15bd168 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -70,6 +70,11 @@ void zmq::dist_t::match (pipe_t *pipe_)
     matching++;    
 }
 
+void zmq::dist_t::unmatch ()
+{
+    matching = 0;
+}
+
 void zmq::dist_t::terminated (pipe_t *pipe_)
 {
     //  Remove the pipe from the list; adjust number of matching, active and/or
@@ -115,9 +120,6 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
     if (!msg_more)
         active = eligible;
 
-    //  Mark all the pipes as non-matching.
-    matching = 0;
-
     more = msg_more;
 
     return 0;
diff --git a/src/dist.hpp b/src/dist.hpp
index 005bb60..c8d121c 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -48,14 +48,16 @@ namespace zmq
         //  will send message also to this pipe.
         void match (class pipe_t *pipe_);
 
+        //  Mark all pipes as non-matching.
+        void unmatch ();
+
         //  Removes the pipe from the distributor object.
         void terminated (class pipe_t *pipe_);
 
-        //  Send the message to all the outbound pipes. After the call all the
-        //  pipes are marked as non-matching.
+        //  Send the message to the matching outbound pipes.
         int send_to_matching (class msg_t *msg_, int flags_);
 
-        //  Send the message to the matching outbound pipes.
+        //  Send the message to all the outbound pipes.
         int send_to_all (class msg_t *msg_, int flags_);
 
         bool has_out ();
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 9078de3..a102b68 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -26,7 +26,8 @@
 #include "msg.hpp"
 
 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_) :
-    socket_base_t (parent_, tid_)
+    socket_base_t (parent_, tid_),
+    more (false)
 {
     options.type = ZMQ_XPUB;
 }
@@ -99,13 +100,27 @@ void zmq::xpub_t::mark_as_matching (pipe_t *pipe_, void *arg_)
 
 int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
 {
-    //  Find the matching pipes.
-    subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
-        mark_as_matching, this);
+    bool msg_more = msg_->flags () & msg_t::more;
+
+    //  For the first part of multi-part message, find the matching pipes.
+    if (!more)
+        subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
+            mark_as_matching, this);
 
     //  Send the message to all the pipes that were marked as matching
     //  in the previous step.
-    return dist.send_to_matching (msg_, flags_);
+    int rc = dist.send_to_matching (msg_, flags_);
+    if (rc != 0)
+        return rc;
+
+    //  If we are at the end of multi-part message we can mark all the pipes
+    //  as non-matching.
+    if (!msg_more)
+        dist.unmatch ();
+
+    more = msg_more;
+
+    return 0;
 }
 
 bool zmq::xpub_t::xhas_out ()
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 740d1e2..a2e7335 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -66,6 +66,9 @@ namespace zmq
         //  Distributor of messages holding the list of outbound pipes.
         dist_t dist;
 
+        //  True if we are in the middle of sending a multi-part message.
+        bool more;
+
         //  List of pending (un)subscriptions, ie. those that were already
         //  applied to the trie, but not yet received by the user.
         typedef std::deque <blob_t> pending_t;
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to