Sorry, here's the patch...

On 9/9/2010, "Martin Sustrik" <[email protected]> wrote:

>
>Hi all,
>
>To make the contribution process completely deterministic, I am going to
>post my patches on the mailing list in the same way as everybody else
>does. Although I'll merge them myself, it still makes the license they
>are contributed under clear and opens space for peer review.
>
>Here's a small patch that returns an error (EMTHREAD) from zmq_bind and
>zmq_connect is these require I/O operations and no I/O thread is
>available, i.e. zmq_init was called with parameter of zero.
>
>The patch is licensed under MIT/X11 license.
>
>Martin
>_______________________________________________
>zeromq-dev mailing list
>[email protected]
>http://lists.zeromq.org/mailman/listinfo/zeromq-dev
From a68e6739f4248e25a0f9a64c89729f55dfacb842 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Thu, 9 Sep 2010 08:25:00 +0200
Subject: [PATCH] when no I/O threads are available error is raised instead of assertion

---
 doc/zmq_bind.txt        |    2 ++
 doc/zmq_connect.txt     |    2 ++
 include/zmq.h           |    2 +-
 src/connect_session.cpp |   15 ++++++++++-----
 src/ctx.cpp             |    7 +++++--
 src/ctx.hpp             |    5 +++--
 src/object.cpp          |    4 ++--
 src/object.hpp          |    2 +-
 src/socket_base.cpp     |   21 ++++++++++++++++++---
 src/zmq.cpp             |    2 ++
 src/zmq_connecter.cpp   |    9 +++++++--
 src/zmq_listener.cpp    |    9 +++++++--
 12 files changed, 60 insertions(+), 20 deletions(-)

diff --git a/doc/zmq_bind.txt b/doc/zmq_bind.txt
index 7aa5a0b..23c3134 100644
--- a/doc/zmq_bind.txt
+++ b/doc/zmq_bind.txt
@@ -58,6 +58,8 @@ The requested 'address' specifies a nonexistent interface.
 The 0MQ 'context' associated with the specified 'socket' was terminated.
 *EFAULT*::
 The provided 'socket' was not valid (NULL).
+*EMTHREAD*::
+No I/O thread is available to accomplish the task.
 
 
 EXAMPLE
diff --git a/doc/zmq_connect.txt b/doc/zmq_connect.txt
index ffcf3b4..a95f716 100644
--- a/doc/zmq_connect.txt
+++ b/doc/zmq_connect.txt
@@ -56,6 +56,8 @@ The requested 'transport' protocol is not compatible with the socket type.
 The 0MQ 'context' associated with the specified 'socket' was terminated.
 *EFAULT*::
 The provided 'socket' was not valid (NULL).
+*EMTHREAD*::
+No I/O thread is available to accomplish the task.
 
 
 EXAMPLE
diff --git a/include/zmq.h b/include/zmq.h
index 8c01477..c5f79d4 100644
--- a/include/zmq.h
+++ b/include/zmq.h
@@ -85,7 +85,7 @@ ZMQ_EXPORT void zmq_version (int *major, int *minor, int *patch);
 #define EFSM (ZMQ_HAUSNUMERO + 51)
 #define ENOCOMPATPROTO (ZMQ_HAUSNUMERO + 52)
 #define ETERM (ZMQ_HAUSNUMERO + 53)
-#define EMTHREAD (ZMQ_HAUSNUMERO + 54)  /*  Old error code, remove in 3.x     */
+#define EMTHREAD (ZMQ_HAUSNUMERO + 54)
 
 /*  This function retrieves the errno as it is known to 0MQ library. The goal */
 /*  of this function is to make the code 100% portable, including where 0MQ   */
diff --git a/src/connect_session.cpp b/src/connect_session.cpp
index afa80b8..10d19c3 100644
--- a/src/connect_session.cpp
+++ b/src/connect_session.cpp
@@ -43,13 +43,18 @@ void zmq::connect_session_t::process_plug ()
 
 void zmq::connect_session_t::start_connecting ()
 {
+    //  Choose I/O thread to run connecter in. Given that we are already
+    //  running in an I/O thread, there must be at least one available.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    zmq_assert (io_thread);
+
     //  Create the connecter object.
 
     //  Both TCP and IPC transports are using the same infrastructure.
-    if (protocol == "tcp" || protocol == "ipc") {        
+    if (protocol == "tcp" || protocol == "ipc") {
+
         zmq_connecter_t *connecter = new (std::nothrow) zmq_connecter_t (
-            choose_io_thread (options.affinity), this, options,
-            protocol.c_str (), address.c_str ());
+            io_thread, this, options, protocol.c_str (), address.c_str ());
         zmq_assert (connecter);
         launch_child (connecter);
         return;
@@ -70,7 +75,7 @@ void zmq::connect_session_t::start_connecting ()
 
             //  PGM sender.
             pgm_sender_t *pgm_sender =  new (std::nothrow) pgm_sender_t (
-                choose_io_thread (options.affinity), options);
+                io_thread, options);
             zmq_assert (pgm_sender);
 
             int rc = pgm_sender->init (udp_encapsulation, address.c_str ());
@@ -82,7 +87,7 @@ void zmq::connect_session_t::start_connecting ()
 
             //  PGM receiver.
             pgm_receiver_t *pgm_receiver =  new (std::nothrow) pgm_receiver_t (
-                choose_io_thread (options.affinity), options);
+                io_thread, options);
             zmq_assert (pgm_receiver);
 
             int rc = pgm_receiver->init (udp_encapsulation, address.c_str ());
diff --git a/src/ctx.cpp b/src/ctx.cpp
index 2660e1f..267f7d0 100644
--- a/src/ctx.cpp
+++ b/src/ctx.cpp
@@ -64,7 +64,8 @@ zmq::ctx_t::ctx_t (uint32_t io_threads_) :
     }
 
     //  In the unused part of the slot array, create a list of empty slots.
-    for (uint32_t i = slot_count - 1; i >= io_threads_; i--) {
+    for (int32_t i = (int32_t) slot_count - 1;
+          i >= (int32_t) io_threads_; i--) {
         empty_slots.push_back (i);
         slots [i] = NULL;
     }
@@ -221,8 +222,10 @@ void zmq::ctx_t::send_command (uint32_t slot_, const command_t &command_)
 
 zmq::io_thread_t *zmq::ctx_t::choose_io_thread (uint64_t affinity_)
 {
+    if (io_threads.empty ())
+        return NULL;
+
     //  Find the I/O thread with minimum load.
-    zmq_assert (io_threads.size () > 0);
     int min_load = -1;
     io_threads_t::size_type result = 0;
     for (io_threads_t::size_type i = 0; i != io_threads.size (); i++) {
diff --git a/src/ctx.hpp b/src/ctx.hpp
index 360ca0e..98b4f81 100644
--- a/src/ctx.hpp
+++ b/src/ctx.hpp
@@ -65,8 +65,9 @@ namespace zmq
         void send_command (uint32_t slot_, const command_t &command_);
 
         //  Returns the I/O thread that is the least busy at the moment.
-        //  Taskset specifies which I/O threads are eligible (0 = all).
-        class io_thread_t *choose_io_thread (uint64_t taskset_);
+        //  Affinity specifies which I/O threads are eligible (0 = all).
+        //  Returns NULL is no I/O thread is available.
+        class io_thread_t *choose_io_thread (uint64_t affinity_);
 
         //  Management of inproc endpoints.
         int register_endpoint (const char *addr_, class socket_base_t *socket_);
diff --git a/src/object.cpp b/src/object.cpp
index 7b5532b..90c015a 100644
--- a/src/object.cpp
+++ b/src/object.cpp
@@ -142,9 +142,9 @@ void zmq::object_t::log (zmq_msg_t *msg_)
     ctx->log (msg_);
 }
 
-zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t taskset_)
+zmq::io_thread_t *zmq::object_t::choose_io_thread (uint64_t affinity_)
 {
-    return ctx->choose_io_thread (taskset_);
+    return ctx->choose_io_thread (affinity_);
 }
 
 void zmq::object_t::zombify_socket (socket_base_t *socket_)
diff --git a/src/object.hpp b/src/object.hpp
index 6b52f4b..bc1b325 100644
--- a/src/object.hpp
+++ b/src/object.hpp
@@ -54,7 +54,7 @@ namespace zmq
         void log (zmq_msg_t *msg_);
 
         //  Chooses least loaded I/O thread.
-        class io_thread_t *choose_io_thread (uint64_t taskset_);
+        class io_thread_t *choose_io_thread (uint64_t affinity_);
 
         //  Zombify particular socket. In other words, pass the ownership to
         //  the context.
diff --git a/src/socket_base.cpp b/src/socket_base.cpp
index cdad09d..288a627 100644
--- a/src/socket_base.cpp
+++ b/src/socket_base.cpp
@@ -289,8 +289,17 @@ int zmq::socket_base_t::bind (const char *addr_)
         return register_endpoint (addr_, this);
 
     if (protocol == "tcp" || protocol == "ipc") {
+
+        //  Choose I/O thread to run the listerner in.
+        io_thread_t *io_thread = choose_io_thread (options.affinity);
+        if (!io_thread) {
+            errno = EMTHREAD;
+            return -1;
+        }
+
+        //  Create and run the listener.
         zmq_listener_t *listener = new (std::nothrow) zmq_listener_t (
-            choose_io_thread (options.affinity), this, options);
+            io_thread, this, options);
         zmq_assert (listener);
         int rc = listener->set_address (protocol.c_str(), address.c_str ());
         if (rc != 0) {
@@ -376,10 +385,16 @@ int zmq::socket_base_t::connect (const char *addr_)
         return 0;
     }
 
+    //  Choose the I/O thread to run the session in.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    if (!io_thread) {
+        errno = EMTHREAD;
+        return -1;
+    }
+
     //  Create session.
     connect_session_t *session = new (std::nothrow) connect_session_t (
-        choose_io_thread (options.affinity), this, options,
-        protocol.c_str (), address.c_str ());
+        io_thread, this, options, protocol.c_str (), address.c_str ());
     zmq_assert (session);
 
     //  If 'immediate connect' feature is required, we'll create the pipes
diff --git a/src/zmq.cpp b/src/zmq.cpp
index 9a4bdec..e93f8b7 100644
--- a/src/zmq.cpp
+++ b/src/zmq.cpp
@@ -88,6 +88,8 @@ const char *zmq_strerror (int errnum_)
         return "The protocol is not compatible with the socket type";
     case ETERM:
         return "Context was terminated";
+    case EMTHREAD:
+        return "No thread available";
     default:
 #if defined _MSC_VER
 #pragma warning (push)
diff --git a/src/zmq_connecter.cpp b/src/zmq_connecter.cpp
index 8f8fae2..cfca875 100644
--- a/src/zmq_connecter.cpp
+++ b/src/zmq_connecter.cpp
@@ -77,9 +77,14 @@ void zmq::zmq_connecter_t::out_event ()
         return;
     }
 
+    //  Choose I/O thread to run connecter in. Given that we are already
+    //  running in an I/O thread, there must be at least one available.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    zmq_assert (io_thread);
+
     //  Create an init object. 
-    zmq_init_t *init = new (std::nothrow) zmq_init_t (
-        choose_io_thread (options.affinity), NULL, session, fd, options);
+    zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, NULL,
+        session, fd, options);
     zmq_assert (init);
     launch_sibling (init);
 
diff --git a/src/zmq_listener.cpp b/src/zmq_listener.cpp
index 4569ac1..78e44e6 100644
--- a/src/zmq_listener.cpp
+++ b/src/zmq_listener.cpp
@@ -64,9 +64,14 @@ void zmq::zmq_listener_t::in_event ()
     if (fd == retired_fd)
         return;
 
+    //  Choose I/O thread to run connecter in. Given that we are already
+    //  running in an I/O thread, there must be at least one available.
+    io_thread_t *io_thread = choose_io_thread (options.affinity);
+    zmq_assert (io_thread);
+
     //  Create and launch an init object. 
-    zmq_init_t *init = new (std::nothrow) zmq_init_t (
-        choose_io_thread (options.affinity), socket, NULL, fd, options);
+    zmq_init_t *init = new (std::nothrow) zmq_init_t (io_thread, socket,
+        NULL, fd, options);
     zmq_assert (init);
     launch_sibling (init);
 }
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to