Hi all,
To follow the process more closely from now on I am going to post my
patches to the mailing list for review same way as everyone else does.
Feel free to comment!
This patch to master is initial work on multiple-property connection
initiation, which is a prerequisite for more sophisticated functionality
such as "subports".
Martin
>From f68008292911a4669951d05b656f78a6e2b75066 Mon Sep 17 00:00:00 2001
From: Martin Sustrik <[email protected]>
Date: Sun, 3 Apr 2011 09:39:28 +0200
Subject: [PATCH] TCP and IPC connection initiation allow for multiple properties
So far the only property passed on connection initiation was
identity. The mechanism was now made extensible. Additional
properties are needed to introduce functionality such as
checking the peer's socket type, "subports" etc.
Signed-off-by: Martin Sustrik <[email protected]>
---
src/zmq_init.cpp | 98 +++++++++++++++++++++++++++++++++++++++--------------
src/zmq_init.hpp | 18 ++++++++--
2 files changed, 87 insertions(+), 29 deletions(-)
diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index cf65d69..5ca2367 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -29,6 +29,7 @@
#include "session.hpp"
#include "uuid.hpp"
#include "blob.hpp"
+#include "wire.hpp"
#include "err.hpp"
zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
@@ -36,7 +37,6 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
const options_t &options_) :
own_t (io_thread_, options_),
ephemeral_engine (NULL),
- sent (false),
received (false),
socket (socket_),
session (session_),
@@ -45,26 +45,61 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
// Create the engine object for this connection.
engine = new (std::nothrow) zmq_engine_t (fd_, options);
alloc_assert (engine);
+
+ // Generate an unique identity.
+ unsigned char identity [uuid_t::uuid_blob_len + 1];
+ identity [0] = 0;
+ memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
+ peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
+
+ // Create a list of props to send.
+
+ zmq_msg_t msg;
+ int rc = zmq_msg_init_size (&msg, 4);
+ errno_assert (rc == 0);
+ unsigned char *data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_type);
+ put_uint16 (data + 2, options.type);
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+
+ if (!options.identity.empty ()) {
+ rc = zmq_msg_init_size (&msg, 2 + options.identity.size ());
+ errno_assert (rc == 0);
+ data = (unsigned char*) zmq_msg_data (&msg);
+ put_uint16 (data, prop_identity);
+ memcpy (data + 2, options.identity.data (), options.identity.size ());
+ msg.flags |= ZMQ_MSG_MORE;
+ to_send.push_back (msg);
+ }
+
+ // Remove the MORE flag from the last prop.
+ to_send.back ().flags &= ~ZMQ_MSG_MORE;
}
zmq::zmq_init_t::~zmq_init_t ()
{
if (engine)
engine->terminate ();
+
+ // If there are unsent props still queued deallocate them.
+ for (to_send_t::iterator it = to_send.begin (); it != to_send.end ();
+ ++it) {
+ int rc = zmq_msg_close (&(*it));
+ errno_assert (rc == 0);
+ }
+ to_send.clear ();
}
bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
{
// If the identity was already sent, do nothing.
- if (sent)
+ if (to_send.empty ())
return false;
- // Send the identity.
- int rc = zmq_msg_init_size (msg_, options.identity.size ());
- zmq_assert (rc == 0);
- memcpy (zmq_msg_data (msg_), options.identity.c_str (),
- options.identity.size ());
- sent = true;
+ // Pass next property to the engine.
+ *msg_ = to_send.front ();
+ to_send.erase (to_send.begin ());
// Try finalize initialization.
finalise_initialisation ();
@@ -79,24 +114,35 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
if (received)
return false;
- // Retreieve the remote identity. If it's empty, generate a unique name.
- if (!zmq_msg_size (msg_)) {
- unsigned char identity [uuid_t::uuid_blob_len + 1];
- identity [0] = 0;
- memcpy (identity + 1, uuid_t ().to_blob (), uuid_t::uuid_blob_len);
- peer_identity.assign (identity, uuid_t::uuid_blob_len + 1);
- }
- else {
- peer_identity.assign ((const unsigned char*) zmq_msg_data (msg_),
- zmq_msg_size (msg_));
- }
- int rc = zmq_msg_close (msg_);
- zmq_assert (rc == 0);
+ size_t size = zmq_msg_size (msg_);
+ unsigned char *data = (unsigned char*) zmq_msg_data (msg_);
- received = true;
+ // There should be at least property type in the message.
+ zmq_assert (size >= 2);
+ uint16_t prop = get_uint16 (data);
- // Try finalize initialization.
- finalise_initialisation ();
+ switch (prop) {
+ case prop_type:
+ {
+ zmq_assert (size == 4);
+ // TODO: Check whether the type is OK.
+ // uint16_t type = get_uint16 (data + 2);
+ // ...
+ break;
+ };
+ case prop_identity:
+ {
+ peer_identity.assign (data + 2, size - 2);
+ break;
+ }
+ default:
+ zmq_assert (false);
+ }
+
+ if (!(msg_->flags & ZMQ_MSG_MORE)) {
+ received = true;
+ finalise_initialisation ();
+ }
return true;
}
@@ -142,7 +188,7 @@ void zmq::zmq_init_t::process_unplug ()
void zmq::zmq_init_t::finalise_initialisation ()
{
// Unplug and prepare to dispatch engine.
- if (sent && received) {
+ if (to_send.empty () && received) {
ephemeral_engine = engine;
engine = NULL;
ephemeral_engine->unplug ();
@@ -152,7 +198,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
void zmq::zmq_init_t::dispatch_engine ()
{
- if (sent && received) {
+ if (to_send.empty () && received) {
// Engine must be detached.
zmq_assert (!engine);
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index d90915a..92ab05b 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -21,12 +21,15 @@
#ifndef __ZMQ_ZMQ_INIT_HPP_INCLUDED__
#define __ZMQ_ZMQ_INIT_HPP_INCLUDED__
+#include <vector>
+
+#include "../include/zmq.h"
+
#include "i_inout.hpp"
#include "i_engine.hpp"
#include "own.hpp"
#include "fd.hpp"
#include "stdint.hpp"
-#include "stdint.hpp"
#include "blob.hpp"
namespace zmq
@@ -44,6 +47,13 @@ namespace zmq
private:
+ // Peer property IDs.
+ enum prop_t
+ {
+ prop_type = 1,
+ prop_identity = 2
+ };
+
void finalise_initialisation ();
void dispatch_engine ();
@@ -63,8 +73,10 @@ namespace zmq
// Detached transient engine.
i_engine *ephemeral_engine;
- // True if our own identity was already sent to the peer.
- bool sent;
+ // List of messages to send to the peer during the connection
+ // initiation phase.
+ typedef std::vector < ::zmq_msg_t> to_send_t;
+ to_send_t to_send;
// True if peer's identity was already received.
bool received;
--
1.7.0.4
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev