>From 89daa57faab728204447c3bb708e753a2aeffa55 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Mon, 18 Jul 2011 09:28:59 +0200
Subject: [PATCH 1/2] Missing files for GENERIC socket implementation added

Signed-off-by: Martin Sustrik <[email protected]>
---
 src/generic.cpp |  265 +++++++++++++++++++++++++++++++++++++++++++++++++++++++
 src/generic.hpp |  108 ++++++++++++++++++++++
 2 files changed, 373 insertions(+), 0 deletions(-)
 create mode 100755 src/generic.cpp
 create mode 100755 src/generic.hpp

diff --git a/src/generic.cpp b/src/generic.cpp
new file mode 100755
index 0000000..9078ce3
--- /dev/null
+++ b/src/generic.cpp
@@ -0,0 +1,265 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "generic.hpp"
+#include "pipe.hpp"
+#include "wire.hpp"
+#include "random.hpp"
+#include "likely.hpp"
+#include "wire.hpp"
+#include "err.hpp"
+
+zmq::generic_t::generic_t (class ctx_t *parent_, uint32_t tid_) :
+    socket_base_t (parent_, tid_),
+    prefetched (false),
+    more_in (false),
+    current_out (NULL),
+    more_out (false),
+    next_peer_id (generate_random ())
+{
+    options.type = ZMQ_GENERIC;
+
+    prefetched_msg.init ();
+}
+
+zmq::generic_t::~generic_t ()
+{
+    zmq_assert (outpipes.empty ());
+    prefetched_msg.close ();
+}
+
+void zmq::generic_t::xattach_pipe (pipe_t *pipe_)
+{
+    zmq_assert (pipe_);
+
+    //  Generate a new peer ID. Take care to avoid duplicates.
+    outpipes_t::iterator it = outpipes.lower_bound (next_peer_id);
+    if (!outpipes.empty ()) {
+        while (true) {
+            if (it == outpipes.end ())
+                it = outpipes.begin ();
+            if (it->first != next_peer_id)
+                break;
+            ++next_peer_id;
+            ++it;
+        }
+    }
+
+    //  Add the pipe to the map out outbound pipes.
+    outpipe_t outpipe = {pipe_, true};
+    bool ok = outpipes.insert (outpipes_t::value_type (
+        next_peer_id, outpipe)).second;
+    zmq_assert (ok);
+
+    //  Add the pipe to the list of inbound pipes.
+    pipe_->set_pipe_id (next_peer_id);
+    fq.attach (pipe_);
+
+    //  Queue the connection command.
+    pending_command_t cmd = {1, next_peer_id};
+    pending_commands.push_back (cmd);
+
+    //  Advance next peer ID so that if new connection is dropped shortly after
+    //  its creation we don't accidentally get two subsequent peers with
+    //  the same ID.
+    ++next_peer_id;
+}
+
+void zmq::generic_t::xterminated (pipe_t *pipe_)
+{
+    fq.terminated (pipe_);
+
+    for (outpipes_t::iterator it = outpipes.begin ();
+          it != outpipes.end (); ++it) {
+        if (it->second.pipe == pipe_) {
+
+            //  Queue the disconnection command.
+            pending_command_t cmd = {2, it->first};
+            pending_commands.push_back (cmd);
+
+            //  Remove the pipe.
+            outpipes.erase (it);
+            if (pipe_ == current_out)
+                current_out = NULL;
+            return;
+        }
+    }
+    zmq_assert (false);
+}
+
+void zmq::generic_t::xread_activated (pipe_t *pipe_)
+{
+    fq.activated (pipe_);
+}
+
+void zmq::generic_t::xwrite_activated (pipe_t *pipe_)
+{
+    for (outpipes_t::iterator it = outpipes.begin ();
+          it != outpipes.end (); ++it) {
+        if (it->second.pipe == pipe_) {
+            zmq_assert (!it->second.active);
+            it->second.active = true;
+            return;
+        }
+    }
+    zmq_assert (false);
+}
+
+int zmq::generic_t::xsend (msg_t *msg_, int flags_)
+{
+    //  If this is the first part of the message it's the ID of the
+    //  peer to send the message to.
+    if (!more_out) {
+        zmq_assert (!current_out);
+
+        //  If we have malformed message (prefix with no subsequent message)
+        //  then just silently ignore it.
+        //  TODO: The connections should be killed instead.
+        if (msg_->flags () & msg_t::label) {
+
+            more_out = true;
+
+            //  Find the pipe associated with the peer ID stored in the prefix.
+            //  If there's no such pipe just silently ignore the message.
+            zmq_assert (msg_->size () == 4);
+            uint32_t peer_id = get_uint32 ((unsigned char*) msg_->data ());
+            outpipes_t::iterator it = outpipes.find (peer_id);
+
+            if (it != outpipes.end ()) {
+                current_out = it->second.pipe;
+                msg_t empty;
+                int rc = empty.init ();
+                errno_assert (rc == 0);
+                if (!current_out->check_write (&empty)) {
+                    it->second.active = false;
+                    more_out = false;
+                    current_out = NULL;
+                }
+                rc = empty.close ();
+                errno_assert (rc == 0);
+            }
+        }
+
+        int rc = msg_->close ();
+        errno_assert (rc == 0);
+        rc = msg_->init ();
+        errno_assert (rc == 0);
+        return 0;
+    }
+
+    //  Check whether this is the last part of the message.
+    more_out = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+
+    //  Push the message into the pipe. If there's no out pipe, just drop it.
+    if (current_out) {
+        bool ok = current_out->write (msg_);
+        zmq_assert (ok);
+        if (!more_out) {
+            current_out->flush ();
+            current_out = NULL;
+        }
+    }
+    else {
+        int rc = msg_->close ();
+        errno_assert (rc == 0);
+    }
+
+    //  Detach the message from the data buffer.
+    int rc = msg_->init ();
+    errno_assert (rc == 0);
+
+    return 0;
+}
+
+int zmq::generic_t::xrecv (msg_t *msg_, int flags_)
+{
+    //  If there's a queued command, pass it to the caller.
+    if (unlikely (!more_in && !pending_commands.empty ())) {
+        msg_->init_size (5);
+        unsigned char *data = (unsigned char*) msg_->data ();
+        put_uint8 (data, pending_commands.front ().cmd);
+        put_uint32 (data + 1, pending_commands.front ().peer);
+        msg_->set_flags (msg_t::command);
+        pending_commands.pop_front ();
+        return 0;
+    }
+
+    //  If there is a prefetched message, return it.
+    if (prefetched) {
+        int rc = msg_->move (prefetched_msg);
+        errno_assert (rc == 0);
+        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+        prefetched = false;
+        return 0;
+    }
+
+    //  Get next message part.
+    pipe_t *pipe;
+    int rc = fq.recvpipe (msg_, flags_, &pipe);
+    if (rc != 0)
+        return -1;
+
+    //  If we are in the middle of reading a message, just return the next part.
+    if (more_in) {
+        more_in = msg_->flags () & (msg_t::more | msg_t::label) ? true : false;
+        return 0;
+    }
+ 
+    //  We are at the beginning of a new message. Move the message part we
+    //  have to the prefetched and return the ID of the peer instead.
+    rc = prefetched_msg.move (*msg_);
+    errno_assert (rc == 0);
+    prefetched = true;
+    rc = msg_->close ();
+    errno_assert (rc == 0);
+    rc = msg_->init_size (4);
+    errno_assert (rc == 0);
+    put_uint32 ((unsigned char*) msg_->data (), pipe->get_pipe_id ());
+    msg_->set_flags (msg_t::label);
+    return 0;
+}
+
+int zmq::generic_t::rollback (void)
+{
+    if (current_out) {
+        current_out->rollback ();
+        current_out = NULL;
+        more_out = false;
+    }
+    return 0;
+}
+
+bool zmq::generic_t::xhas_in ()
+{
+    if (prefetched)
+        return true;
+    return fq.has_in ();
+}
+
+bool zmq::generic_t::xhas_out ()
+{
+    //  In theory, GENERIC socket is always ready for writing. Whether actual
+    //  attempt to write succeeds depends on whitch pipe the message is going
+    //  to be routed to.
+    return true;
+}
+
+
+
diff --git a/src/generic.hpp b/src/generic.hpp
new file mode 100755
index 0000000..7768333
--- /dev/null
+++ b/src/generic.hpp
@@ -0,0 +1,108 @@
+/*
+    Copyright (c) 2007-2011 iMatix Corporation
+    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file
+
+    This file is part of 0MQ.
+
+    0MQ is free software; you can redistribute it and/or modify it under
+    the terms of the GNU Lesser General Public License as published by
+    the Free Software Foundation; either version 3 of the License, or
+    (at your option) any later version.
+
+    0MQ is distributed in the hope that it will be useful,
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+    GNU Lesser General Public License for more details.
+
+    You should have received a copy of the GNU Lesser General Public License
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#ifndef __ZMQ_GENERIC_HPP_INCLUDED__
+#define __ZMQ_GENERIC_HPP_INCLUDED__
+
+#include <map>
+#include <deque>
+
+#include "socket_base.hpp"
+#include "stdint.hpp"
+#include "msg.hpp"
+#include "fq.hpp"
+
+namespace zmq
+{
+
+    //  TODO: This class uses O(n) scheduling. Rewrite it to use O(1) algorithm.
+    class generic_t :
+        public socket_base_t
+    {
+    public:
+
+        generic_t (class ctx_t *parent_, uint32_t tid_);
+        ~generic_t ();
+
+        //  Overloads of functions from socket_base_t.
+        void xattach_pipe (class pipe_t *pipe_);
+        int xsend (class msg_t *msg_, int flags_);
+        int xrecv (class msg_t *msg_, int flags_);
+        bool xhas_in ();
+        bool xhas_out ();
+        void xread_activated (class pipe_t *pipe_);
+        void xwrite_activated (class pipe_t *pipe_);
+        void xterminated (class pipe_t *pipe_);
+
+    protected:
+
+        //  Rollback any message parts that were sent but not yet flushed.
+        int rollback ();
+
+    private:
+
+        //  Fair queueing object for inbound pipes.
+        fq_t fq;
+
+        //  Have we prefetched a message.
+        bool prefetched;
+
+        //  Holds the prefetched message.
+        msg_t prefetched_msg;
+
+        //  If true, more incoming message parts are expected.
+        bool more_in;
+
+        struct outpipe_t
+        {
+            class pipe_t *pipe;
+            bool active;
+        };
+
+        //  Outbound pipes indexed by the peer IDs.
+        typedef std::map <uint32_t, outpipe_t> outpipes_t;
+        outpipes_t outpipes;
+
+        //  The pipe we are currently writing to.
+        class pipe_t *current_out;
+
+        //  If true, more outgoing message parts are expected.
+        bool more_out;
+
+        //  Peer ID are generated. It's a simple increment and wrap-over
+        //  algorithm. This value is the next ID to use (if not used already).
+        uint32_t next_peer_id;
+
+        //  Commands to be delivered to the user.
+        struct pending_command_t
+        {
+            uint8_t cmd;
+            uint32_t peer;
+        };
+        typedef std::deque <pending_command_t> pending_commands_t;
+        pending_commands_t pending_commands;
+
+        generic_t (const generic_t&);
+        const generic_t &operator = (const generic_t&);
+    };
+
+}
+
+#endif
-- 
1.7.0.4

>From 1a408805521f839e3e36183d9f2dec1d08e24caf Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Mon, 18 Jul 2011 09:30:37 +0200
Subject: [PATCH 2/2] ZMQ_IDENTITY option removed from the documentation

Signed-off-by: Martin Sustrik <[email protected]>
---
 doc/zmq_getsockopt.txt |   23 -----------------------
 doc/zmq_setsockopt.txt |   23 -----------------------
 2 files changed, 0 insertions(+), 46 deletions(-)

diff --git a/doc/zmq_getsockopt.txt b/doc/zmq_getsockopt.txt
index 51b6c9e..128b733 100644
--- a/doc/zmq_getsockopt.txt
+++ b/doc/zmq_getsockopt.txt
@@ -136,29 +136,6 @@ Default value:: 0
 Applicable socket types:: N/A
 
 
-ZMQ_IDENTITY: Retrieve socket identity
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_IDENTITY' option shall retrieve the identity of the specified
-'socket'. Socket identity determines if existing 0MQ infrastructure (_message
-queues_, _forwarding devices_) shall be identified with a specific application
-and persist across multiple runs of the application.
-
-If the socket has no identity, each run of an application is completely
-separate from other runs. However, with identity set the socket shall re-use
-any existing 0MQ infrastructure configured by the previous run(s). Thus the
-application may receive messages that were sent in the meantime, _message
-queue_ limits shall be shared with previous run(s) and so on.
-
-Identity can be at least one byte and at most 255 bytes long. Identities
-starting with binary zero are reserved for use by 0MQ infrastructure.
-
-[horizontal]
-Option value type:: binary data
-Option value unit:: N/A
-Default value:: NULL
-Applicable socket types:: all
-
-
 ZMQ_RATE: Retrieve multicast data rate
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_RATE' option shall retrieve the maximum send or receive data rate for
diff --git a/doc/zmq_setsockopt.txt b/doc/zmq_setsockopt.txt
index 8bc9e8f..f62a7e9 100644
--- a/doc/zmq_setsockopt.txt
+++ b/doc/zmq_setsockopt.txt
@@ -92,29 +92,6 @@ Default value:: 0
 Applicable socket types:: N/A
 
 
-ZMQ_IDENTITY: Set socket identity
-~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
-The 'ZMQ_IDENTITY' option shall set the identity of the specified 'socket'.
-Socket identity determines if existing 0MQ infrastructure (_message queues_,
-_forwarding devices_) shall be identified with a specific application and
-persist across multiple runs of the application.
-
-If the socket has no identity, each run of an application is completely
-separate from other runs. However, with identity set the socket shall re-use
-any existing 0MQ infrastructure configured by the previous run(s). Thus the
-application may receive messages that were sent in the meantime, _message
-queue_ limits shall be shared with previous run(s) and so on.
-
-Identity should be at least one byte and at most 255 bytes long. Identities
-starting with binary zero are reserved for use by 0MQ infrastructure.
-
-[horizontal]
-Option value type:: binary data
-Option value unit:: N/A
-Default value:: NULL
-Applicable socket types:: all
-
-
 ZMQ_SUBSCRIBE: Establish message filter
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 The 'ZMQ_SUBSCRIBE' option shall establish a new message filter on a 'ZMQ_SUB'
-- 
1.7.0.4

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to