You could retest this with a simpler example. The key-value store does a fair amount of work per message and it's possible your set-up provokes scheduler thrashing, i.e. the OS spends more time swapping threads in and out than it leaves for the threads...
On Fri, Aug 28, 2015 at 5:31 PM, Blair Anson <[email protected]> wrote: > Hi, I have been load testing a pubsub server (essentially CloneServer3 from > the examples) and have come across some curious behavior that is stopping me > from adding additional clients once I get to around 400. I was wondering if > anyone has an idea as to what is happening when I reach 400 clients, and how > I can resolve the issue? > > Each client performs: > 1. a snapshot request > 2. receives snap shot > 3. loops over a subscriber & publisher calls until the process is killed > > (code for the client and server are at the bottom of the email) > > zeromq version is 4.0.4 (I have some project dependencies so I'm unable to > update to the latest version at the moment) > > I'm running the PubSub server on an m3-medium in Amazon. > > Increasing the number of clients in steps of 100 up to 300 results in the > following metrics > > CPU Avg% 14 > CPU Peak% 23 > lsof | grep -c server 2784 > > So overall the box isn't working very hard, however if I bump up the clients > to 400 then I get > > CPU Avg% 98 > CPU Peak% 100 > lsof | grep -c server 3111 > > the box suddenly starts to thrash to CPU. Delving into top we can see the > following... > > %Cpu(s): 15.6 us, 43.1 sy, 0.0 ni, 0.0 id, 0.0 wa, 0.0 hi, 0.5 si, 40.7 > st > > it appears that most of the load is attributed to "sy" and "st" which are.. > > us: user cpu time (or) % CPU time spent in user space > sy: system cpu time (or) % CPU time spent in kernel space > ni: user nice cpu time (or) % CPU time spent on low priority processes > id: idle cpu time (or) % CPU time spent idle > wa: io wait cpu time (or) % CPU time spent in wait (on disk) > hi: hardware irq (or) % CPU time spent servicing/handling hardware > interrupts > si: software irq (or) % CPU time spent servicing/handling software > interrupts > st: steal time - - % CPU time in involuntary wait by virtual cpu while > hypervisor is servicing another processor (or) % CPU time stolen from a > virtual machine > > I don't understand how increasing the load of a zmq user process results in > the kernel & hypervisor consuming all the available CPU. I would have > expected the User CPU Time to increase as that is the type of cpu usage the > zmq pubsub server consumes. > Does anyone have any ideas as to why I'm hitting a limit? > > > ----------- Code of the Client > ----------------------------------------------------------------------------------------------------- > > #include "kvsimple.c" > > // This client is identical to clonecli3 except for where we > // handle subtrees. > #define SUBTREE > "/a14d5afa-edec-4545-82a4-d6db4a5441e0:eda2f56f-4afe-4955-95e0-dc13a32c7663/" > > #define LOCATION > "{\"_latlon\":[{\"longitude\":-0.074397,\"latitude\":51.5452491,\"iM\":1},{\"longitude\":-0.074297,\"latitude\":51.5452298,\"iM\":1},{\"longitude\":-0.0742712,\"latitude\":51.5452147,\"iM\":1}],\"_findmUserId\":\"eda2f56f-4afe-4955-95e0-dc13a32c7663\",\"_bearing\":0.0,\"maxLocationPoints\":10}" > > #define SUBTREERESULTS "/results/" > > int main (void) > { > // Prepare our context and subscriber > zctx_t *ctx = zctx_new (); > void *snapshot = zsocket_new (ctx, ZMQ_DEALER); > zsocket_connect (snapshot, > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5556"); > void *subscriber = zsocket_new (ctx, ZMQ_SUB); > > zsocket_set_subscribe (subscriber, SUBTREE); > > zsocket_connect (subscriber, > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5557"); > > > void *publisher = zsocket_new (ctx, ZMQ_PUSH); > zsocket_connect (publisher, > "tcp://ec2-xx-xx-xx-xx.compute.amazonaws.com:5558"); > > zhash_t *kvmap = zhash_new (); > srandom ((unsigned) time (NULL)); > > > // .until > // We first request a state snapshot: > int64_t sequence = 0; > zstr_sendm (snapshot, "ICANHAZ?"); > zstr_send (snapshot, SUBTREE); > // .skip > while (true) { > kvmsg_t *kvmsg = kvmsg_recv (snapshot); > if (!kvmsg) > break; // Interrupted > if (streq (kvmsg_key (kvmsg), "KTHXBAI")) { > sequence = kvmsg_sequence (kvmsg); > printf ("I: received snapshot=%d\n", (int) sequence); > kvmsg_dump(kvmsg); > kvmsg_destroy (&kvmsg); > break; // Done > } > kvmsg_store (&kvmsg, kvmap); > } > int64_t alarm = zclock_time () + 1000; > int64_t startTime = zclock_time (); > > while (!zctx_interrupted) { > zmq_pollitem_t items [] = { { subscriber, 0, ZMQ_POLLIN, 0 } }; > int tickless = (int) ((alarm - zclock_time ())); > if (tickless < 0) > tickless = 0; > int rc = zmq_poll (items, 1, tickless * ZMQ_POLL_MSEC); > if (rc == -1) > break; // Context has been shut down > > if (items [0].revents & ZMQ_POLLIN) { > kvmsg_t *kvmsg = kvmsg_recv (subscriber); > if (!kvmsg) > break; // Interrupted > > // Discard out-of-sequence kvmsgs, incl. heartbeats > if (kvmsg_sequence (kvmsg) > sequence) { > kvmsg_dump(kvmsg); > sequence = kvmsg_sequence (kvmsg); > kvmsg_store (&kvmsg, kvmap); > printf ("I: received update=%d\n", (int) sequence); > int64_t elapsedtime = zclock_time ()-startTime; > printf (" Time elapsed %" PRId64"\n", elapsedtime); > > } > else > kvmsg_destroy (&kvmsg); > } > > > > // .until > // If we timed out, generate a random kvmsg > if (zclock_time () >= alarm) { > kvmsg_t *kvmsg = kvmsg_new (0); > kvmsg_fmt_key (kvmsg, "%s%d", SUBTREE, randof (10000)); > //kvmsg_fmt_body (kvmsg, "%d", randof (1000000)); > //kvmsg_fmt_body (kvmsg, "%s", LOCATION); > kvmsg_set_body (kvmsg, (byte *) LOCATION, strlen(LOCATION)); > kvmsg_send (kvmsg, publisher); > kvmsg_destroy (&kvmsg); > alarm = zclock_time () + 1000; > } > > > > // .skip > } > printf (" Interrupted\n%d messages in\n", (int) sequence); > zhash_destroy (&kvmap); > zctx_destroy (&ctx); > > if((int)sequence>0){ > exit(0); > } else { > exit(1); > } > } > > > > > ---------- Code of the Server > ----------------------------------------------------------------------------------------------------- > > > #include "kvsimple.c" > > // Routing information for a key-value snapshot > typedef struct { > void *socket; // ROUTER socket to send to > zframe_t *identity; // Identity of peer who requested state > char *subtree; // Client subtree specification > } kvroute_t; > > // Send one state snapshot key-value pair to a socket > // Hash item data is our kvmsg object, ready to send > static int > s_send_single (const char *key, void *data, void *args) > { > kvroute_t *kvroute = (kvroute_t *) args; > kvmsg_t *kvmsg = (kvmsg_t *) data; > if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg)) > && memcmp (kvroute->subtree, > kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) { > // Send identity of recipient first > zframe_send (&kvroute->identity, > kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE); > kvmsg_send (kvmsg, kvroute->socket); > } > return 0; > } > > // The main task is identical to clonesrv3 except for where it > // handles subtrees. > // .skip > > int main (void) > { > // Prepare our context and sockets > zctx_t *ctx = zctx_new (); > void *snapshot = zsocket_new (ctx, ZMQ_ROUTER); > zsocket_bind (snapshot, "tcp://*:5556"); > void *publisher = zsocket_new (ctx, ZMQ_PUB); > zsocket_bind (publisher, "tcp://*:5557"); > void *collector = zsocket_new (ctx, ZMQ_PULL); > zsocket_bind (collector, "tcp://*:5558"); > > int64_t sequence = 0; > zhash_t *kvmap = zhash_new (); > > zmq_pollitem_t items [] = { > { collector, 0, ZMQ_POLLIN, 0 }, > { snapshot, 0, ZMQ_POLLIN, 0 } > }; > while (!zctx_interrupted) { > int rc = zmq_poll (items, 2, 10 * ZMQ_POLL_MSEC); > > // Apply state update sent from client > if (items [0].revents & ZMQ_POLLIN) { > kvmsg_t *kvmsg = kvmsg_recv (collector); > kvmsg_dump(kvmsg); // stderr msg body > if (!kvmsg) > break; // Interrupted > kvmsg_set_sequence (kvmsg, ++sequence); > kvmsg_send (kvmsg, publisher); > //kvmsg_store (&kvmsg, kvmap); // don't store to save time > kvmsg_destroy(&kvmsg); // remove message from memory > printf ("I: publishing update %5d\n", (int) sequence); > //kvmsg_dump(kvmsg); // stderr msg body > } > // Execute state snapshot request > if (items [1].revents & ZMQ_POLLIN) { > zframe_t *identity = zframe_recv (snapshot); > if (!identity) > break; // Interrupted > > // .until > // Request is in second frame of message > char *request = zstr_recv (snapshot); > char *subtree = NULL; > if (streq (request, "ICANHAZ?")) { > free (request); > subtree = zstr_recv (snapshot); > } > // .skip > else { > printf ("E: bad request, aborting\n"); > break; > } > // .until > // Send state snapshot to client > kvroute_t routing = { snapshot, identity, subtree }; > // .skip > > // For each entry in kvmap, send kvmsg to client > zhash_foreach (kvmap, s_send_single, &routing); > > // .until > // Now send END message with sequence number > printf ("I: sending shapshot=%d\n", (int) sequence); > zframe_send (&identity, snapshot, ZFRAME_MORE); > kvmsg_t *kvmsg = kvmsg_new (sequence); > kvmsg_set_key (kvmsg, "KTHXBAI"); > kvmsg_set_body (kvmsg, (byte *) subtree, 0); > kvmsg_send (kvmsg, snapshot); > kvmsg_destroy (&kvmsg); > free (subtree); > } > } > // .skip > printf (" Interrupted\n%d messages handled\n", (int) sequence); > zhash_destroy (&kvmap); > zctx_destroy (&ctx); > > return 0; > } > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
