From 05c33907c6942ca310c1c0ddc78b376689610ddf Mon Sep 17 00:00:00 2001
From: Yucheng Low <yuchenglow@gmail.com>
Date: Mon, 25 Feb 2013 17:55:11 -0800
Subject: [PATCH]  - Added an option ZMQ_PUB_NODROP and ZMQ_XPUB_NODROP which
 provides a reliable PUB socket; blocking on sends instead
 of silently dropping messages when the hwm is reached.

---
 include/zmq.h |  3 ++-
 src/dist.cpp  | 13 +++++++++++++
 src/dist.hpp  |  3 +++
 src/pub.cpp   |  9 +++++++++
 src/pub.hpp   |  1 +
 src/xpub.cpp  | 38 ++++++++++++++++++++++++++++++--------
 src/xpub.hpp  |  3 +++
 7 files changed, 61 insertions(+), 9 deletions(-)

diff --git a/include/zmq.h b/include/zmq.h
index f533d25..a99b4b6 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -250,7 +250,8 @@ ZMQ_EXPORT int zmq_msg_set (zmq_msg_t *msg, int option, int optval);
 #define ZMQ_TCP_ACCEPT_FILTER 38
 #define ZMQ_DELAY_ATTACH_ON_CONNECT 39
 #define ZMQ_XPUB_VERBOSE 40
-
+#define ZMQ_XPUB_NODROP 41
+#define ZMQ_PUB_NODROP 41
 
 /*  Message options                                                           */
 #define ZMQ_MORE 1
diff --git a/src/dist.cpp b/src/dist.cpp
index a8cb2ec..f13b18b 100644
--- a/src/dist.cpp
+++ b/src/dist.cpp
@@ -91,6 +91,7 @@ void zmq::dist_t::terminated (pipe_t *pipe_)
 
 void zmq::dist_t::activated (pipe_t *pipe_)
 {
+    if (pipes.index (pipe_) < eligible) return;
     //  Move the pipe from passive to eligible state.
     pipes.swap (pipes.index (pipe_), eligible);
     eligible++;
@@ -117,6 +118,7 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
     //  Push the message to matching pipes.
     distribute (msg_, flags_);
 
+
     //  If mutlipart message is fully sent, activate all the eligible pipes.
     if (!msg_more)
         active = eligible;
@@ -126,6 +128,17 @@ int zmq::dist_t::send_to_matching (msg_t *msg_, int flags_)
     return 0;
 }
 
+bool zmq::dist_t::check_write_on_matching (msg_t *msg_, int flags_)
+{
+    // returns true if every pipe I am writing to is writeable
+    bool writeable = true;
+    for (pipes_t::size_type i = 0; i < matching; ++i) {
+        writeable &= pipes [i]->check_write ();
+    }
+
+    return writeable;
+}
+
 void zmq::dist_t::distribute (msg_t *msg_, int flags_)
 {
     // flags_ is unused
diff --git a/src/dist.hpp b/src/dist.hpp
index be86ab2..c1008cc 100644
--- a/src/dist.hpp
+++ b/src/dist.hpp
@@ -57,6 +57,9 @@ namespace zmq
         //  Removes the pipe from the distributor object.
         void terminated (zmq::pipe_t *pipe_);
 
+        //  Checks if the write will succeed
+        bool check_write_on_matching (zmq::msg_t *msg_, int flags_);
+
         //  Send the message to the matching outbound pipes.
         int send_to_matching (zmq::msg_t *msg_, int flags_);
 
diff --git a/src/pub.cpp b/src/pub.cpp
index 8d664e7..46bdf74 100644
--- a/src/pub.cpp
+++ b/src/pub.cpp
@@ -44,6 +44,15 @@ bool zmq::pub_t::xhas_in ()
     return false;
 }
 
+
+int zmq::pub_t::xsetsockopt (int option_, const void *optval_, size_t optvallen_) {
+    if (option_ == ZMQ_PUB_NODROP) {
+        return xpub_t::xsetsockopt(option_, optval_, optvallen_);
+    }
+    errno = EINVAL;
+    return -1;
+}
+
 zmq::pub_session_t::pub_session_t (io_thread_t *io_thread_, bool connect_,
       socket_base_t *socket_, const options_t &options_,
       const address_t *addr_) :
diff --git a/src/pub.hpp b/src/pub.hpp
index bdc6a8b..29054e7 100644
--- a/src/pub.hpp
+++ b/src/pub.hpp
@@ -42,6 +42,7 @@ namespace zmq
         //  Implementations of virtual functions from socket_base_t.
         int xrecv (zmq::msg_t *msg_, int flags_);
         bool xhas_in ();
+        int xsetsockopt (int option_, const void *optval_, size_t optvallen_);
 
     private:
 
diff --git a/src/xpub.cpp b/src/xpub.cpp
index 2c181b8..fbee217 100644
--- a/src/xpub.cpp
+++ b/src/xpub.cpp
@@ -29,7 +29,8 @@
 zmq::xpub_t::xpub_t (class ctx_t *parent_, uint32_t tid_, int sid_) :
     socket_base_t (parent_, tid_, sid_),
     verbose(false),
-    more (false)
+    more (false),
+    nodrop(false)
 {
     options.type = ZMQ_XPUB;
 }
@@ -87,16 +88,29 @@ void zmq::xpub_t::xwrite_activated (pipe_t *pipe_)
 int zmq::xpub_t::xsetsockopt (int option_, const void *optval_,
     size_t optvallen_)
 {
-    if (option_ != ZMQ_XPUB_VERBOSE) {
-        errno = EINVAL;
-        return -1;
+    if (option_ == ZMQ_XPUB_VERBOSE) {
+        if (optvallen_ != sizeof (int) || 
+            *static_cast <const int*> (optval_) < 0) {
+            errno = EINVAL;
+            return -1;
+        }
+        verbose = *static_cast <const int*> (optval_);
+        return 0;
+    }
+    else if (option_ == ZMQ_XPUB_NODROP) {
+        if (optvallen_ != sizeof (int) || 
+            *static_cast <const int*> (optval_) < 0) {
+            errno = EINVAL;
+            return -1;
+        }
+        nodrop = *static_cast <const int*> (optval_);
+        if (nodrop) printf("nodrop\n");
+        return 0;
     }
-    if (optvallen_ != sizeof (int) || *static_cast <const int*> (optval_) < 0) {
+    else {
         errno = EINVAL;
         return -1;
     }
-    verbose = *static_cast <const int*> (optval_);
-    return 0;
 }
 
 void zmq::xpub_t::xterminated (pipe_t *pipe_)
@@ -120,10 +134,18 @@ int zmq::xpub_t::xsend (msg_t *msg_, int flags_)
     bool msg_more = msg_->flags () & msg_t::more ? true : false;
 
     //  For the first part of multi-part message, find the matching pipes.
-    if (!more)
+    if (!more) {
         subscriptions.match ((unsigned char*) msg_->data (), msg_->size (),
             mark_as_matching, this);
+    }
 
+    if (nodrop) {
+        if (!dist.check_write_on_matching (msg_, flags_))  {
+            dist.unmatch();
+            errno = EAGAIN;
+            return -1;
+        }
+    }
     //  Send the message to all the pipes that were marked as matching
     //  in the previous step.
     int rc = dist.send_to_matching (msg_, flags_);
diff --git a/src/xpub.hpp b/src/xpub.hpp
index 045253d..552e6b7 100644
--- a/src/xpub.hpp
+++ b/src/xpub.hpp
@@ -80,6 +80,9 @@ namespace zmq
         //  True if we are in the middle of sending a multi-part message.
         bool more;
 
+        // True if we are configured to never drop messages
+        bool nodrop;
+
         //  List of pending (un)subscriptions, ie. those that were already
         //  applied to the trie, but not yet received by the user.
         typedef std::basic_string <unsigned char> blob_t;
-- 
1.7.11.1

