Hi Luca, I did add a new command to the ZMQ steerable proxy and it appears to be quite self-contained. However that's my first patch to ZMQ so please have a look and let me know if I should change something. For now no changes to the documentation are attached: first I would like to know if the way I coded the patch is fine or not, if the naming is ok ("STATISTICS" command is ok?) etc
To keep track of this I opened a github issue: https://github.com/zeromq/libzmq/issues/2736 Anyway I'm attaching the patch file also to this mail... Thanks! Francesco 2017-09-01 20:35 GMT+02:00 Luca Boccassi <luca.bocca...@gmail.com>: > On Fri, 2017-09-01 at 20:18 +0200, Francesco wrote: >> 2017-09-01 19:58 GMT+02:00 Luca Boccassi <luca.bocca...@gmail.com>: >> > The third parameter to zmq_proxy: http://api.zeromq.org/4-2:zmq-pro >> > xy >> > >> > If you pass a socket, all messages will be duplicated and sent to >> > it. >> > Then you can do all the measurements you need. >> > Note that they are shallow refcounted copies, so only the small >> > metadata is actually copied, not the payloads, so it's reasonably >> > fast. >> >> Ok thanks, I can try that! >> Honestly however I think it would be nice to have an >> easier/more-optimized way to retrieve such kind of informations... >> maybe a message that is sent over a steerable proxy control socket >> (of >> type REQ/REP maybe) or a socket option to retrieve via >> zmq_get_socksockopt() >> >> Thanks! >> >> Francesco > > A new steerable message sounds like a good idea, feel free to send a > PR! > > Note that it probably should have an ON/OFF capability, as to avoid > performance penalties for users that are not interested in the feature. > > -- > Kind regards, > Luca Boccassi > > _______________________________________________ > zeromq-dev mailing list > zeromq-dev@lists.zeromq.org > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
diff --git a/src/proxy.cpp b/src/proxy.cpp index aca80e1..600e72e 100644 --- a/src/proxy.cpp +++ b/src/proxy.cpp @@ -82,6 +82,21 @@ #endif // ZMQ_HAVE_POLLER + +// Control socket messages + +typedef struct +{ + uint64_t pkts_in; + uint64_t bytes_in; + uint64_t pkts_out; + uint64_t bytes_out; +} zmq_socket_stats_t; + + + +// Utility functions + int capture ( class zmq::socket_base_t *capture_, zmq::msg_t& msg_, @@ -104,18 +119,21 @@ int capture ( } int forward ( - class zmq::socket_base_t *from_, - class zmq::socket_base_t *to_, + class zmq::socket_base_t *from_, zmq_socket_stats_t* from_stats, + class zmq::socket_base_t *to_, zmq_socket_stats_t* to_stats, class zmq::socket_base_t *capture_, zmq::msg_t& msg_) { int more; size_t moresz; + size_t complete_msg_size = 0; while (true) { int rc = from_->recv (&msg_, 0); if (unlikely (rc < 0)) return -1; + complete_msg_size += msg_.size(); + moresz = sizeof more; rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz); if (unlikely (rc < 0)) @@ -129,12 +147,45 @@ int forward ( rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0); if (unlikely (rc < 0)) return -1; + if (more == 0) break; } + + // A multipart message counts as 1 packet: + from_stats->pkts_in++; + from_stats->bytes_in+=complete_msg_size; + to_stats->pkts_out++; + to_stats->bytes_out+=complete_msg_size; + return 0; } +int reply_stats( + class zmq::socket_base_t *control_, + zmq_socket_stats_t* frontend_stats, + zmq_socket_stats_t* backend_stats) +{ + zmq::msg_t stats_msg; + int rc = stats_msg.init_size( sizeof(zmq_socket_stats_t)*2 ); + if (unlikely (rc < 0)) { + return close_and_return (&stats_msg, -1); + } + + uint8_t* pdata = (uint8_t*)stats_msg.data(); + memcpy( pdata + 0, (const void*) frontend_stats, sizeof(zmq_socket_stats_t) ); + memcpy( pdata + sizeof(zmq_socket_stats_t), (const void*) backend_stats, sizeof(zmq_socket_stats_t) ); + + rc = control_->send (&stats_msg, 0); + if (unlikely (rc < 0)) { + return close_and_return (&stats_msg, -1); + } + + rc = stats_msg.close(); + return rc; +} + + #ifdef ZMQ_HAVE_POLLER int zmq::proxy ( @@ -168,6 +219,10 @@ int zmq::proxy ( bool backend_out = false; bool control_in = false; zmq::socket_poller_t::event_t events [3]; + zmq_socket_stats_t frontend_stats; + zmq_socket_stats_t backend_stats; + memset(&frontend_stats, 0, sizeof(frontend_stats)); + memset(&backend_stats, 0, sizeof(backend_stats)); // Don't allocate these pollers from stack because they will take more than 900 kB of stack! // On Windows this blows up default stack of 1 MB and aborts the program. @@ -323,9 +378,16 @@ int zmq::proxy ( if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) state = terminated; else { - // This is an API error, we assert - puts ("E: invalid command sent to proxy"); - zmq_assert (false); + if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0) + { + rc = reply_stats(control_, &frontend_stats, &backend_stats); + CHECK_RC_EXIT_ON_FAILURE (); + } + else { + // This is an API error, we assert + puts ("E: invalid command sent to proxy"); + zmq_assert (false); + } } } control_in = false; @@ -336,7 +398,7 @@ int zmq::proxy ( // Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'. // In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event. if (frontend_in && (backend_out || frontend_equal_to_backend)) { - rc = forward (frontend_, backend_, capture_, msg); + rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg); CHECK_RC_EXIT_ON_FAILURE (); request_processed = true; frontend_in = backend_out = false; @@ -347,7 +409,7 @@ int zmq::proxy ( // covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to // design in 'for' event processing loop. if (backend_in && frontend_out) { - rc = forward (backend_, frontend_, capture_, msg); + rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg); CHECK_RC_EXIT_ON_FAILURE (); reply_processed = true; backend_in = frontend_out = false; @@ -443,6 +505,11 @@ int zmq::proxy ( { backend_, 0, ZMQ_POLLOUT, 0 } }; + zmq_socket_stats_t frontend_stats; + memset(&frontend_stats, 0, sizeof(frontend_stats)); + zmq_socket_stats_t backend_stats; + memset(&backend_stats, 0, sizeof(backend_stats)); + // Proxy can be in these three states enum { active, @@ -491,16 +558,24 @@ int zmq::proxy ( if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0) state = terminated; else { - // This is an API error, so we assert - puts ("E: invalid command sent to proxy"); - zmq_assert (false); + if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0) + { + rc = reply_stats(control_, &frontend_stats, &backend_stats); + if (unlikely (rc < 0)) + return -1; + } + else { + // This is an API error, we assert + puts ("E: invalid command sent to proxy"); + zmq_assert (false); + } } } // Process a request if (state == active && items [0].revents & ZMQ_POLLIN && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) { - rc = forward (frontend_, backend_, capture_, msg); + rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg); if (unlikely (rc < 0)) return close_and_return (&msg, -1); } @@ -509,7 +584,7 @@ int zmq::proxy ( && frontend_ != backend_ && items [1].revents & ZMQ_POLLIN && itemsout [0].revents & ZMQ_POLLOUT) { - rc = forward (backend_, frontend_, capture_, msg); + rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg); if (unlikely (rc < 0)) return close_and_return (&msg, -1); } diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp index f00f502..8780c3f 100644 --- a/tests/test_proxy.cpp +++ b/tests/test_proxy.cpp @@ -48,13 +48,31 @@ #define ID_SIZE_MAX 32 #define QT_WORKERS 5 #define QT_CLIENTS 3 -#define is_verbose 0 +#define is_verbose 1 struct thread_data { void *ctx; int id; }; +typedef struct +{ + uint64_t pkts_in; + uint64_t bytes_in; + uint64_t pkts_out; + uint64_t bytes_out; +} zmq_socket_stats_t; + +typedef struct +{ + zmq_socket_stats_t frontend; + zmq_socket_stats_t backend; +} zmq_proxy_stats_t; + +void *g_clients_pkts_out = NULL; +void *g_workers_pkts_out = NULL; + + static void client_task (void *db) { @@ -100,6 +118,7 @@ client_task (void *db) zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } }; int request_nbr = 0; bool run = true; + bool keep_sending = true; while (run) { // Tick once per 200 ms, pulling in arriving messages int centitick; @@ -119,16 +138,32 @@ client_task (void *db) } if (items [1].revents & ZMQ_POLLIN) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0); - if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); - if (memcmp (content, "TERMINATE", 9) == 0) { - run = false; - break; + + if (rc > 0) + { + content[rc] = 0; // NULL-terminate the command string + if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content); + if (memcmp (content, "TERMINATE", 9) == 0) { + run = false; + break; + } + if (memcmp (content, "STOP", 4) == 0) { + keep_sending = false; + break; + } } } } - sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE - rc = zmq_send (client, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); + + if (keep_sending) + { + sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE + if (is_verbose) printf("client send - identity = %s request #%03d\n", identity, request_nbr); + zmq_atomic_counter_inc(g_clients_pkts_out); + + rc = zmq_send (client, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } } rc = zmq_close (client); @@ -173,13 +208,13 @@ server_task (void *ctx) assert (rc == 0); // Control socket receives terminate command from main over inproc - void *control = zmq_socket (ctx, ZMQ_SUB); + void *control = zmq_socket (ctx, ZMQ_REP); assert (control); - rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); - assert (rc == 0); + /*rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0); + assert (rc == 0);*/ rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger)); assert (rc == 0); - rc = zmq_connect (control, "inproc://control"); + rc = zmq_connect (control, "inproc://control_proxy"); assert (rc == 0); // Launch pool of worker threads, precise number is not critical @@ -255,13 +290,17 @@ server_worker (void *ctx) char identity [ID_SIZE_MAX]; // the size received is the size sent bool run = true; + bool keep_sending = true; while (run) { rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message) if (rc > 0) { + content[rc] = 0; // NULL-terminate the command string if (is_verbose) printf("server_worker receives command = %s\n", content); if (memcmp (content, "TERMINATE", 9) == 0) run = false; + if (memcmp (content, "STOP", 4) == 0) + keep_sending = false; } // The DEALER socket gives us the reply envelope and message // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0 @@ -273,15 +312,22 @@ server_worker (void *ctx) printf ("server receive - identity = %s content = %s\n", identity, content); // Send 0..4 replies back - int reply, replies = rand() % 5; - for (reply = 0; reply < replies; reply++) { - // Sleep for some fraction of a second - msleep (rand () % 10 + 1); - // Send message from server to client - rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); - assert (rc == ID_SIZE); - rc = zmq_send (worker, content, CONTENT_SIZE, 0); - assert (rc == CONTENT_SIZE); + if (keep_sending) + { + int reply, replies = rand() % 5; + for (reply = 0; reply < replies; reply++) { + // Sleep for some fraction of a second + msleep (rand () % 10 + 1); + + // Send message from server to client + if (is_verbose) printf("server send - identity = %s reply\n", identity); + zmq_atomic_counter_inc(g_workers_pkts_out); + + rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE); + assert (rc == ID_SIZE); + rc = zmq_send (worker, content, CONTENT_SIZE, 0); + assert (rc == CONTENT_SIZE); + } } } } @@ -300,6 +346,11 @@ int main (void) void *ctx = zmq_ctx_new (); assert (ctx); + + g_clients_pkts_out = zmq_atomic_counter_new (); + g_workers_pkts_out = zmq_atomic_counter_new (); + + // Control socket receives terminate command from main over inproc void *control = zmq_socket (ctx, ZMQ_PUB); assert (control); @@ -309,6 +360,14 @@ int main (void) rc = zmq_bind (control, "inproc://control"); assert (rc == 0); + // Control socket receives terminate command from main over inproc + void *control_proxy = zmq_socket (ctx, ZMQ_REQ); + assert (control_proxy); + rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger)); + assert (rc == 0); + rc = zmq_bind (control_proxy, "inproc://control_proxy"); + assert (rc == 0); + void *threads [QT_CLIENTS + 1]; struct thread_data databags [QT_CLIENTS + 1]; for (int i = 0; i < QT_CLIENTS; i++) { @@ -319,11 +378,64 @@ int main (void) threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx); msleep (500); // Run for 500 ms then quit + + if (is_verbose) + printf ("stopping all clients and server workers\n"); + rc = zmq_send (control, "STOP", 4, 0); + assert (rc == 4); + + msleep(500); // Wait for all clients and workers to STOP + + + if (is_verbose) + printf ("retrieving stats from the proxy\n"); + + rc = zmq_send (control_proxy, "STATISTICS", 10, 0); + assert (rc == 10); + + zmq_msg_t stats_msg; + rc = zmq_msg_init (&stats_msg); + assert (rc == 0); + + rc = zmq_recvmsg (control_proxy, &stats_msg, 0); + assert (rc == sizeof(zmq_proxy_stats_t)); + + zmq_proxy_stats_t* stats = (zmq_proxy_stats_t*)zmq_msg_data(&stats_msg); + if (is_verbose) + { + printf ("frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n", + stats->frontend.pkts_in, stats->frontend.bytes_in, + stats->frontend.pkts_out, stats->frontend.bytes_out); + printf ("backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n", + stats->backend.pkts_in, stats->backend.bytes_in, + stats->backend.pkts_out, stats->backend.bytes_out); + + printf ("clients sent out %d requests\n", zmq_atomic_counter_value(g_clients_pkts_out)); + printf ("workers sent out %d replies\n", zmq_atomic_counter_value(g_workers_pkts_out)); + } + assert( stats->frontend.pkts_in == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) ); + assert( stats->frontend.pkts_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); + assert( stats->backend.pkts_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) ); + assert( stats->backend.pkts_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) ); + + rc = zmq_msg_close (&stats_msg); + assert (rc == 0); + + if (is_verbose) + printf ("shutting down all clients and server workers\n"); rc = zmq_send (control, "TERMINATE", 9, 0); assert (rc == 9); + if (is_verbose) + printf ("shutting down the proxy now\n"); + rc = zmq_send (control_proxy, "TERMINATE", 9, 0); + assert (rc == 9); + + rc = zmq_close (control); assert (rc == 0); + rc = zmq_close (control_proxy); + assert (rc == 0); for (int i = 0; i < QT_CLIENTS + 1; i++) zmq_threadclose (threads[i]);
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org https://lists.zeromq.org/mailman/listinfo/zeromq-dev