Hi Luca,

I did add a new command to the ZMQ steerable proxy and it appears to
be quite self-contained. However that's my first patch to ZMQ so
please have a look and let me know if I should change something.
For now no changes to the documentation are attached: first I would
like to know if the way I coded the patch is fine or not, if the
naming is ok ("STATISTICS" command is ok?) etc

To keep track of this I opened a github issue:
https://github.com/zeromq/libzmq/issues/2736
Anyway I'm attaching the patch file also to this mail...


Thanks!
Francesco



2017-09-01 20:35 GMT+02:00 Luca Boccassi <luca.bocca...@gmail.com>:
> On Fri, 2017-09-01 at 20:18 +0200, Francesco wrote:
>> 2017-09-01 19:58 GMT+02:00 Luca Boccassi <luca.bocca...@gmail.com>:
>> > The third parameter to zmq_proxy: http://api.zeromq.org/4-2:zmq-pro
>> > xy
>> >
>> > If you pass a socket, all messages will be duplicated and sent to
>> > it.
>> > Then you can do all the measurements you need.
>> > Note that they are shallow refcounted copies, so only the small
>> > metadata is actually copied, not the payloads, so it's reasonably
>> > fast.
>>
>> Ok thanks, I can try that!
>> Honestly however I think it would be nice to have an
>> easier/more-optimized way to retrieve such kind of informations...
>> maybe a message that is sent over a steerable proxy control socket
>> (of
>> type REQ/REP maybe) or a socket option to retrieve via
>> zmq_get_socksockopt()
>>
>> Thanks!
>>
>> Francesco
>
> A new steerable message sounds like a good idea, feel free to send a
> PR!
>
> Note that it probably should have an ON/OFF capability, as to avoid
> performance penalties for users that are not interested in the feature.
>
> --
> Kind regards,
> Luca Boccassi
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
diff --git a/src/proxy.cpp b/src/proxy.cpp
index aca80e1..600e72e 100644
--- a/src/proxy.cpp
+++ b/src/proxy.cpp
@@ -82,6 +82,21 @@
 
 #endif //  ZMQ_HAVE_POLLER
 
+
+// Control socket messages
+
+typedef struct
+{
+    uint64_t pkts_in;
+    uint64_t bytes_in;
+    uint64_t pkts_out;
+    uint64_t bytes_out;
+} zmq_socket_stats_t;
+
+
+
+// Utility functions
+
 int capture (
         class zmq::socket_base_t *capture_,
         zmq::msg_t& msg_,
@@ -104,18 +119,21 @@ int capture (
 }
 
 int forward (
-        class zmq::socket_base_t *from_,
-        class zmq::socket_base_t *to_,
+        class zmq::socket_base_t *from_, zmq_socket_stats_t* from_stats,
+        class zmq::socket_base_t *to_, zmq_socket_stats_t* to_stats,
         class zmq::socket_base_t *capture_,
         zmq::msg_t& msg_)
 {
     int more;
     size_t moresz;
+    size_t complete_msg_size = 0;
     while (true) {
         int rc = from_->recv (&msg_, 0);
         if (unlikely (rc < 0))
             return -1;
 
+        complete_msg_size += msg_.size();
+
         moresz = sizeof more;
         rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
         if (unlikely (rc < 0))
@@ -129,12 +147,45 @@ int forward (
         rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0);
         if (unlikely (rc < 0))
             return -1;
+
         if (more == 0)
             break;
     }
+
+    // A multipart message counts as 1 packet:
+    from_stats->pkts_in++;
+    from_stats->bytes_in+=complete_msg_size;
+    to_stats->pkts_out++;
+    to_stats->bytes_out+=complete_msg_size;
+
     return 0;
 }
 
+int reply_stats(
+        class zmq::socket_base_t *control_,
+        zmq_socket_stats_t* frontend_stats,
+        zmq_socket_stats_t* backend_stats)
+{
+    zmq::msg_t stats_msg;
+    int rc = stats_msg.init_size( sizeof(zmq_socket_stats_t)*2 );
+    if (unlikely (rc < 0)) {
+        return close_and_return (&stats_msg, -1);
+    }
+
+    uint8_t* pdata = (uint8_t*)stats_msg.data();
+    memcpy( pdata + 0,                          (const void*) frontend_stats, sizeof(zmq_socket_stats_t) );
+    memcpy( pdata + sizeof(zmq_socket_stats_t), (const void*) backend_stats,  sizeof(zmq_socket_stats_t) );
+
+    rc = control_->send (&stats_msg, 0);
+    if (unlikely (rc < 0)) {
+        return close_and_return (&stats_msg, -1);
+    }
+
+    rc = stats_msg.close();
+    return rc;
+}
+
+
 #ifdef ZMQ_HAVE_POLLER
 
 int zmq::proxy (
@@ -168,6 +219,10 @@ int zmq::proxy (
     bool backend_out = false;
     bool control_in = false;
     zmq::socket_poller_t::event_t events [3];
+    zmq_socket_stats_t frontend_stats;
+    zmq_socket_stats_t backend_stats;
+    memset(&frontend_stats, 0, sizeof(frontend_stats));
+    memset(&backend_stats, 0, sizeof(backend_stats));
 
     //  Don't allocate these pollers from stack because they will take more than 900 kB of stack!
     //  On Windows this blows up default stack of 1 MB and aborts the program.
@@ -323,9 +378,16 @@ int zmq::proxy (
                     if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
                         state = terminated;
                     else {
-                        //  This is an API error, we assert
-                        puts ("E: invalid command sent to proxy");
-                        zmq_assert (false);
+                        if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
+                        {
+                            rc = reply_stats(control_, &frontend_stats, &backend_stats);
+                            CHECK_RC_EXIT_ON_FAILURE ();
+                        }
+                        else {
+                            //  This is an API error, we assert
+                            puts ("E: invalid command sent to proxy");
+                            zmq_assert (false);
+                        }
                     }
                 }
             control_in = false;
@@ -336,7 +398,7 @@ int zmq::proxy (
             //  Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
             //  In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
             if (frontend_in && (backend_out || frontend_equal_to_backend)) {
-                rc = forward (frontend_, backend_, capture_, msg);
+                rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
                 CHECK_RC_EXIT_ON_FAILURE ();
                 request_processed = true;
                 frontend_in = backend_out = false;
@@ -347,7 +409,7 @@ int zmq::proxy (
             //  covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
             //  design in 'for' event processing loop.
             if (backend_in && frontend_out) {
-                rc = forward (backend_, frontend_, capture_, msg);
+                rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
                 CHECK_RC_EXIT_ON_FAILURE ();
                 reply_processed = true;
                 backend_in = frontend_out = false;
@@ -443,6 +505,11 @@ int zmq::proxy (
         { backend_, 0, ZMQ_POLLOUT, 0 }
     };
 
+    zmq_socket_stats_t frontend_stats;
+    memset(&frontend_stats, 0, sizeof(frontend_stats));
+    zmq_socket_stats_t backend_stats;
+    memset(&backend_stats, 0, sizeof(backend_stats));
+
     //  Proxy can be in these three states
     enum {
         active,
@@ -491,16 +558,24 @@ int zmq::proxy (
                     if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
                         state = terminated;
                     else {
-                        //  This is an API error, so we assert
-                        puts ("E: invalid command sent to proxy");
-                        zmq_assert (false);
+                        if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
+                        {
+                            rc = reply_stats(control_, &frontend_stats, &backend_stats);
+                            if (unlikely (rc < 0))
+                                return -1;
+                        }
+                        else {
+                            //  This is an API error, we assert
+                            puts ("E: invalid command sent to proxy");
+                            zmq_assert (false);
+                        }
                     }
         }
         //  Process a request
         if (state == active
         && items [0].revents & ZMQ_POLLIN
         && (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
-            rc = forward (frontend_, backend_, capture_, msg);
+            rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
             if (unlikely (rc < 0))
                 return close_and_return (&msg, -1);
         }
@@ -509,7 +584,7 @@ int zmq::proxy (
         &&  frontend_ != backend_
         &&  items [1].revents & ZMQ_POLLIN
         &&  itemsout [0].revents & ZMQ_POLLOUT) {
-            rc = forward (backend_, frontend_, capture_, msg);
+            rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
             if (unlikely (rc < 0))
                 return close_and_return (&msg, -1);
         }
diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp
index f00f502..8780c3f 100644
--- a/tests/test_proxy.cpp
+++ b/tests/test_proxy.cpp
@@ -48,13 +48,31 @@
 #define ID_SIZE_MAX 32
 #define QT_WORKERS    5
 #define QT_CLIENTS    3
-#define is_verbose 0
+#define is_verbose 1
 
 struct thread_data {
     void *ctx;
     int id;
 };
 
+typedef struct
+{
+    uint64_t pkts_in;
+    uint64_t bytes_in;
+    uint64_t pkts_out;
+    uint64_t bytes_out;
+} zmq_socket_stats_t;
+
+typedef struct
+{
+    zmq_socket_stats_t frontend;
+    zmq_socket_stats_t backend;
+} zmq_proxy_stats_t;
+
+void *g_clients_pkts_out = NULL;
+void *g_workers_pkts_out = NULL;
+
+
 static void
 client_task (void *db)
 {
@@ -100,6 +118,7 @@ client_task (void *db)
     zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
     int request_nbr = 0;
     bool run = true;
+    bool keep_sending = true;
     while (run) {
         // Tick once per 200 ms, pulling in arriving messages
         int centitick;
@@ -119,16 +138,32 @@ client_task (void *db)
             }
             if (items [1].revents & ZMQ_POLLIN) {
                 rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
-                if (is_verbose) printf("client receive - identity = %s    command = %s\n", identity, content);
-                if (memcmp (content, "TERMINATE", 9) == 0) {
-                    run = false;
-                    break;
+
+                if (rc > 0)
+                {
+                    content[rc] = 0;        // NULL-terminate the command string
+                    if (is_verbose) printf("client receive - identity = %s    command = %s\n", identity, content);
+                    if (memcmp (content, "TERMINATE", 9) == 0) {
+                        run = false;
+                        break;
+                    }
+                    if (memcmp (content, "STOP", 4) == 0) {
+                        keep_sending = false;
+                        break;
+                    }
                 }
             }
         }
-        sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
-        rc = zmq_send (client, content, CONTENT_SIZE, 0);
-        assert (rc == CONTENT_SIZE);
+
+        if (keep_sending)
+        {
+            sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
+            if (is_verbose) printf("client send - identity = %s    request #%03d\n", identity, request_nbr);
+            zmq_atomic_counter_inc(g_clients_pkts_out);
+
+            rc = zmq_send (client, content, CONTENT_SIZE, 0);
+            assert (rc == CONTENT_SIZE);
+        }
     }
 
     rc = zmq_close (client);
@@ -173,13 +208,13 @@ server_task (void *ctx)
     assert (rc == 0);
 
     // Control socket receives terminate command from main over inproc
-    void *control = zmq_socket (ctx, ZMQ_SUB);
+    void *control = zmq_socket (ctx, ZMQ_REP);
     assert (control);
-    rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
-    assert (rc == 0);
+    /*rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
+    assert (rc == 0);*/
     rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
     assert (rc == 0);
-    rc = zmq_connect (control, "inproc://control");
+    rc = zmq_connect (control, "inproc://control_proxy");
     assert (rc == 0);
 
     // Launch pool of worker threads, precise number is not critical
@@ -255,13 +290,17 @@ server_worker (void *ctx)
     char identity [ID_SIZE_MAX];      // the size received is the size sent
 
     bool run = true;
+    bool keep_sending = true;
     while (run) {
         rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
         if (rc > 0) {
+            content[rc] = 0;        // NULL-terminate the command string
             if (is_verbose)
                 printf("server_worker receives command = %s\n", content);
             if (memcmp (content, "TERMINATE", 9) == 0)
                 run = false;
+            if (memcmp (content, "STOP", 4) == 0)
+                keep_sending = false;
         }
         // The DEALER socket gives us the reply envelope and message
         // if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
@@ -273,15 +312,22 @@ server_worker (void *ctx)
                 printf ("server receive - identity = %s    content = %s\n", identity, content);
 
             // Send 0..4 replies back
-            int reply, replies = rand() % 5;
-            for (reply = 0; reply < replies; reply++) {
-                // Sleep for some fraction of a second
-                msleep (rand () % 10 + 1);
-                //  Send message from server to client
-                rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
-                assert (rc == ID_SIZE);
-                rc = zmq_send (worker, content, CONTENT_SIZE, 0);
-                assert (rc == CONTENT_SIZE);
+            if (keep_sending)
+            {
+                int reply, replies = rand() % 5;
+                for (reply = 0; reply < replies; reply++) {
+                    // Sleep for some fraction of a second
+                    msleep (rand () % 10 + 1);
+
+                    //  Send message from server to client
+                    if (is_verbose) printf("server send - identity = %s    reply\n", identity);
+                    zmq_atomic_counter_inc(g_workers_pkts_out);
+
+                    rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
+                    assert (rc == ID_SIZE);
+                    rc = zmq_send (worker, content, CONTENT_SIZE, 0);
+                    assert (rc == CONTENT_SIZE);
+                }
             }
         }
     }
@@ -300,6 +346,11 @@ int main (void)
 
     void *ctx = zmq_ctx_new ();
     assert (ctx);
+
+    g_clients_pkts_out = zmq_atomic_counter_new ();
+    g_workers_pkts_out = zmq_atomic_counter_new ();
+
+
     // Control socket receives terminate command from main over inproc
     void *control = zmq_socket (ctx, ZMQ_PUB);
     assert (control);
@@ -309,6 +360,14 @@ int main (void)
     rc = zmq_bind (control, "inproc://control");
     assert (rc == 0);
 
+    // Control socket receives terminate command from main over inproc
+    void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
+    assert (control_proxy);
+    rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
+    assert (rc == 0);
+    rc = zmq_bind (control_proxy, "inproc://control_proxy");
+    assert (rc == 0);
+
     void *threads [QT_CLIENTS + 1];
     struct thread_data databags [QT_CLIENTS + 1];
     for (int i = 0; i < QT_CLIENTS; i++) {
@@ -319,11 +378,64 @@ int main (void)
     threads[QT_CLIENTS] = zmq_threadstart  (&server_task, ctx);
     msleep (500); // Run for 500 ms then quit
 
+
+    if (is_verbose)
+        printf ("stopping all clients and server workers\n");
+    rc = zmq_send (control, "STOP", 4, 0);
+    assert (rc == 4);
+
+    msleep(500); // Wait for all clients and workers to STOP
+
+
+    if (is_verbose)
+        printf ("retrieving stats from the proxy\n");
+
+    rc = zmq_send (control_proxy, "STATISTICS", 10, 0);
+    assert (rc == 10);
+
+    zmq_msg_t stats_msg;
+    rc = zmq_msg_init (&stats_msg);
+    assert (rc == 0);
+
+    rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
+    assert (rc == sizeof(zmq_proxy_stats_t));
+
+    zmq_proxy_stats_t* stats = (zmq_proxy_stats_t*)zmq_msg_data(&stats_msg);
+    if (is_verbose)
+    {
+        printf ("frontend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
+                stats->frontend.pkts_in, stats->frontend.bytes_in,
+                stats->frontend.pkts_out, stats->frontend.bytes_out);
+        printf ("backend: pkts_in=%lu bytes_in=%lu  pkts_out=%lu bytes_out=%lu\n",
+                stats->backend.pkts_in, stats->backend.bytes_in,
+                stats->backend.pkts_out, stats->backend.bytes_out);
+
+        printf ("clients sent out %d requests\n", zmq_atomic_counter_value(g_clients_pkts_out));
+        printf ("workers sent out %d replies\n", zmq_atomic_counter_value(g_workers_pkts_out));
+    }
+    assert( stats->frontend.pkts_in == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
+    assert( stats->frontend.pkts_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
+    assert( stats->backend.pkts_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
+    assert( stats->backend.pkts_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
+
+    rc = zmq_msg_close (&stats_msg);
+    assert (rc == 0);
+
+    if (is_verbose)
+        printf ("shutting down all clients and server workers\n");
     rc = zmq_send (control, "TERMINATE", 9, 0);
     assert (rc == 9);
 
+    if (is_verbose)
+        printf ("shutting down the proxy now\n");
+    rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
+    assert (rc == 9);
+
+
     rc = zmq_close (control);
     assert (rc == 0);
+    rc = zmq_close (control_proxy);
+    assert (rc == 0);
 
     for (int i = 0; i < QT_CLIENTS + 1; i++)
         zmq_threadclose (threads[i]);
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
https://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to