On Fri, Dec 3, 2010 at 2:02 AM, Martin Sustrik <[email protected]> wrote: > Hi Dhammika, > >> Patch attached. > > I've checked the patch. Shouldn't the 'dispatch' be called when the engine > is actually being moved to another thread (inside finalise_initialisation > function) rather than in 'read' function? >
New patch attached. I separated it into two functions, should be more clear now. Engine is dispatched in zmq_init:flush(). >From c549459dcc4cd796f8d65ff5c5e1f40715c4fb13 Mon Sep 17 00:00:00 2001 From: dhammika <[email protected]> Date: Sun, 5 Dec 2010 22:57:00 -0800 Subject: [PATCH] fix race condition in session init Signed-off-by: dhammika <[email protected]> --- src/zmq_engine.cpp | 12 +++++++++++- src/zmq_engine.hpp | 2 ++ src/zmq_init.cpp | 40 +++++++++++++++++++++++----------------- src/zmq_init.hpp | 3 +++ 4 files changed, 39 insertions(+), 18 deletions(-) diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 0c1070d..746d60f 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), inout (NULL), + ephemeral_inout (NULL), options (options_), plugged (false) { @@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { zmq_assert (!plugged); plugged = true; + ephemeral_inout = NULL; - // Conncet to session/init object. + // Connect to session/init object. zmq_assert (!inout); zmq_assert (inout_); encoder.set_inout (inout_); @@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug () // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); + ephemeral_inout = inout; inout = NULL; } @@ -152,6 +155,13 @@ void zmq::zmq_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); + + // If IO handler has detached engine, flush any pending IO. + if (!plugged) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + return; + } // If there is no data to send, stop polling for output. if (outsize == 0) { diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index c5f95dc..4847324 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -69,6 +69,8 @@ namespace zmq encoder_t encoder; i_inout *inout; + // Detached transient inout handler. + i_inout *ephemeral_inout; options_t options; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index a796faa..8b8f110 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : own_t (io_thread_, options_), + ephemeral_engine (NULL), sent (false), received (false), socket (socket_), @@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) options.identity.size ()); sent = true; - // If initialisation is done, pass the engine to the session and - // destroy the init object. + // Initialization is done, prepare to dispatch engine. finalise_initialisation (); return true; @@ -101,9 +101,9 @@ void zmq::zmq_init_t::flush () if (!received) return; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise_initialisation (); + // Initialization is done, dispatch engine. + if (ephemeral_engine) + dispatch_engine (); } void zmq::zmq_init_t::detach () @@ -136,6 +136,20 @@ void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::finalise_initialisation () { if (sent && received) { + // Unplug and prepare to dispatch engine. + ephemeral_engine = engine; + engine = NULL; + ephemeral_engine->unplug (); + return; + } +} + +void zmq::zmq_init_t::dispatch_engine () +{ + if (sent && received) { + // Engine must be detached. + zmq_assert (!engine); + zmq_assert (ephemeral_engine); // If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we @@ -143,9 +157,7 @@ void zmq::zmq_init_t::finalise_initialisation () // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, true); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; } @@ -165,9 +177,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -178,9 +188,7 @@ void zmq::zmq_init_t::finalise_initialisation () // than by send_attach. session = socket->find_session (peer_identity); if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -194,9 +202,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 511f141..c5aa459 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -44,6 +44,7 @@ namespace zmq private: void finalise_initialisation (); + void dispatch_engine (); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -57,6 +58,8 @@ namespace zmq // Associated wire-protocol engine. i_engine *engine; + // Detached transient engine. + i_engine *ephemeral_engine; // True if our own identity was already sent to the peer. bool sent; -- 1.7.0.4 Dhammika _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
