On Wed, Aug 26, 2009 at 01:52:18AM -0300, Christiano Farina Haesbaert wrote:
> On Wed, Aug 05, 2009 at 07:17:04PM +0200, Henning Brauer wrote:
> > * Claudio Jeker <[email protected]> [2009-08-05 19:04]:
> > > This needs a bit more work and maybe it would make sense to switch away
> > > from poll to kqueue or libevent. poll() gets inefficient when handling
> > > large ammount of fds. But that's maybe for later.
> > 
> > that is DEFINATELY for later if it all.
> > 
> > i really want this, if you can address claudio's points i hope to find
> > time to get this in next week
> 
> Hello there,
> 
> So hopefully I've got it all right now, the -m option was removed, no
> need for such a thing, the rdomain diff is applied also. I don't have
> the SO_RDOMAIN definition anywhere, maybe my head is old ?
> 
> We found a bug in kfind_tcb that occurs frequently on SMP machines
> when issuing a lot of connections, it seems that the tcb queue can be
> altered/destroyed between the two kget calls, which would cause
> kfind_tcb fail with prev pointer insane, we now retry the operation up
> to 10 times, never happened again. Thanks to Damien for helping with
> that.
> 

I'm running with this version that fixes the way closed sockets are
handled. Instead of ignoring the last fd it will run handle_connection()
on that fd if the fd is valid and has data. I also fixed a off by one in
the same code.

With this the code seems to work nicely (but it is a bit verbose when used
with 1000 sessions).
-- 
:wq Claudio

Index: tcpbench.c
===================================================================
RCS file: /cvs/src/usr.bin/tcpbench/tcpbench.c,v
retrieving revision 1.10
diff -u -p -r1.10 tcpbench.c
--- tcpbench.c  13 Aug 2009 14:26:38 -0000      1.10
+++ tcpbench.c  27 Aug 2009 16:07:29 -0000
@@ -18,6 +18,7 @@
 #include <sys/time.h>
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/resource.h>
 
 #include <net/route.h>
 
@@ -47,24 +48,45 @@
 #include <kvm.h>
 #include <nlist.h>
 
-#define DEFAULT_PORT           "12345"
-#define DEFAULT_STATS_INTERVAL 1000            /* ms */
-#define DEFAULT_BUF            256 * 1024
+#define DEFAULT_PORT "12345"
+#define DEFAULT_STATS_INTERVAL 1000 /* ms */
+#define DEFAULT_BUF 256 * 1024
+#define MAX_FD 1024
 
 sig_atomic_t done = 0;
-sig_atomic_t print_stats = 0;
+sig_atomic_t proc_slice = 0;
 
-u_int rdomain;
+static u_int  rdomain;
+static char **kflag;
+static size_t Bflag;
+static int    Sflag;
+static int    rflag;
+static int    sflag;
+static int    vflag;
 
+/* stats for a single connection */
 struct statctx {
-       struct timeval t_start, t_last, t_cur;
+       struct timeval t_start, t_last;
        unsigned long long bytes;
-       pid_t pid;
        u_long tcbaddr;
-       kvm_t *kh;
        char **kvars;
+       kvm_t *kh;
 };
 
+/*
+ * We account the mainstats here, that is the stats
+ * for all connections, all variables starting with slice
+ * are used to account information for the timeslice
+ * between each output. Peak variables record the highest
+ * between all slices so far.
+ */
+static struct {
+       unsigned long long slice_bytes; /* bytes for last slice */
+       struct timeval t_start;         /* when we started counting */
+       long double peak_mbps;          /* peak mbps so far */
+       int nconns;                     /* connected clients */
+} mainstats;
+
 /* When adding variables, also add to stats_display() */
 static const char *allowed_kvars[] = {
        "inpcb.inp_flags",
@@ -105,7 +127,7 @@ exitsighand(int signo)
 static void
 alarmhandler(int signo)
 {
-       print_stats = 1;
+       proc_slice = 1;
        signal(signo, alarmhandler);
 }
 
@@ -139,6 +161,39 @@ saddr_ntop(const struct sockaddr *addr, 
 }
 
 static void
+set_timer(int toggle)
+{
+       struct itimerval itv;
+
+       if (rflag <= 0)
+               return;
+
+       if (toggle) {
+               itv.it_interval.tv_sec = rflag / 1000;
+               itv.it_interval.tv_usec = (rflag % 1000) * 1000;
+               itv.it_value = itv.it_interval;
+       }
+       else
+               bzero(&itv, sizeof(itv));
+               
+       setitimer(ITIMER_REAL, &itv, NULL);
+}
+
+static void
+print_header(void)
+{
+       char **kv;
+       
+       printf("%12s %14s %12s %8s ", "elapsed_ms", "bytes", "mbps",
+           "bwidth");
+       
+       for (kv = kflag;  kflag != NULL && *kv != NULL; kv++) 
+               printf("%s%s", kv != kflag ? "," : "", *kv);
+       
+       printf("\n");
+}
+
+static void
 kget(kvm_t *kh, u_long addr, void *buf, int size)
 {
        if (kvm_read(kh, addr, buf, size) != size)
@@ -146,7 +201,7 @@ kget(kvm_t *kh, u_long addr, void *buf, 
 }
 
 static u_long
-kfind_tcb(kvm_t *kh, u_long ktcbtab, int sock, int vflag)
+kfind_tcb(kvm_t *kh, u_long ktcbtab, int sock)
 {
        struct inpcbtable tcbtab;
        struct inpcb *head, *next, *prev;
@@ -158,7 +213,9 @@ kfind_tcb(kvm_t *kh, u_long ktcbtab, int
        struct sockaddr_in *in4;
        struct sockaddr_in6 *in6;
        char tmp1[64], tmp2[64];
+       int nretry;
 
+       nretry = 10;
        melen = themlen = sizeof(struct sockaddr_storage);
        if (getsockname(sock, (struct sockaddr *)&me, &melen) == -1)
                err(1, "getsockname");
@@ -177,7 +234,7 @@ kfind_tcb(kvm_t *kh, u_long ktcbtab, int
        }
        if (vflag >= 2)
                fprintf(stderr, "Using PCB table at %lu\n", ktcbtab);
-
+retry:
        kget(kh, ktcbtab, &tcbtab, sizeof(tcbtab));
        prev = head = (struct inpcb *)&CIRCLEQ_FIRST(
            &((struct inpcbtable *)ktcbtab)->inpt_queue);
@@ -189,8 +246,15 @@ kfind_tcb(kvm_t *kh, u_long ktcbtab, int
                if (vflag >= 2)
                        fprintf(stderr, "Checking PCB %p\n", next);
                kget(kh, (u_long)next, &inpcb, sizeof(inpcb));
-               if (CIRCLEQ_PREV(&inpcb, inp_queue) != prev)
-                       errx(1, "pcb prev pointer insane");
+               if (CIRCLEQ_PREV(&inpcb, inp_queue) != prev) {
+                       if (nretry--) {
+                               warnx("pcb prev pointer insane");
+                               goto retry;
+                       }
+                       else
+                               errx(1, "pcb prev pointer insane,"
+                                    " all attempts exausted");
+               }
                prev = next;
                next = CIRCLEQ_NEXT(&inpcb, inp_queue);
 
@@ -306,194 +370,181 @@ check_prepare_kvars(char *list)
 }
 
 static void
-stats_prepare(struct statctx *sc, int fd, kvm_t *kh, u_long ktcbtab,
-    int rflag, int vflag, char **kflag)
+stats_prepare(struct statctx *sc, int fd, kvm_t *kh, u_long ktcbtab)
 {
-       struct itimerval itv;
-       int i;
-
        if (rflag <= 0)
                return;
        sc->kh = kh;
        sc->kvars = kflag;
        if (kflag)
-               sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd, vflag);
-       gettimeofday(&sc->t_start, NULL);
+               sc->tcbaddr = kfind_tcb(kh, ktcbtab, fd);
+       if (gettimeofday(&sc->t_start, NULL) == -1)
+               err(1, "gettimeofday");
        sc->t_last = sc->t_start;
-       signal(SIGALRM, alarmhandler);
-       itv.it_interval.tv_sec = rflag / 1000;
-       itv.it_interval.tv_usec = (rflag % 1000) * 1000;
-       itv.it_value = itv.it_interval;
-       setitimer(ITIMER_REAL, &itv, NULL);
        sc->bytes = 0;
-       sc->pid = getpid();
-
-       printf("%8s %12s %14s %12s ", "pid", "elapsed_ms", "bytes", "Mbps");
-       if (sc->kvars != NULL) {
-               for (i = 0; sc->kvars[i] != NULL; i++)
-                       printf("%s%s", i > 0 ? "," : "", sc->kvars[i]);
-       }
-       printf("\n");
-       fflush(stdout);
 }
 
 static void
 stats_update(struct statctx *sc, ssize_t n)
 {
        sc->bytes += n;
+       mainstats.slice_bytes += n;
 }
 
 static void
-stats_display(struct statctx *sc)
+stats_cleanslice(void)
 {
-       struct timeval t_diff;
-       unsigned long long total_elapsed, since_last;
-       size_t i;
-       struct inpcb inpcb;
-       struct tcpcb tcpcb;
-       struct socket sockb;
-
-       gettimeofday(&sc->t_cur, NULL);
-       timersub(&sc->t_cur, &sc->t_start, &t_diff);
-       total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
-       timersub(&sc->t_cur, &sc->t_last, &t_diff);
-       since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
-       printf("%8ld %12llu %14llu %12.3Lf ", (long)sc->pid,
-           total_elapsed, sc->bytes,
-           (long double)(sc->bytes * 8) / (since_last * 1000.0));
-       sc->t_last = sc->t_cur;
-       sc->bytes = 0;
+       mainstats.slice_bytes = 0;
+}
 
+static void
+stats_display(unsigned long long total_elapsed, long double mbps,
+    float bwperc, struct statctx *sc, struct inpcb *inpcb,
+    struct tcpcb *tcpcb, struct socket *sockb)
+{
+       int j;
+       
+       printf("%12llu %14llu %12.3Lf %7.2f%% ", total_elapsed, sc->bytes,
+           mbps, bwperc);
+       
        if (sc->kvars != NULL) {
-               kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb, &sockb);
-               for (i = 0; sc->kvars[i] != NULL; i++) {
-#define P(v, f) \
-       if (strcmp(sc->kvars[i], #v) == 0) { \
-               printf("%s"f, i > 0 ? "," : "", v); \
-               continue; \
-       }
-                       P(inpcb.inp_flags, "0x%08x")
-                       P(sockb.so_rcv.sb_cc, "%lu")
-                       P(sockb.so_rcv.sb_hiwat, "%lu")
-                       P(sockb.so_snd.sb_cc, "%lu")
-                       P(sockb.so_snd.sb_hiwat, "%lu")
-                       P(tcpcb.snd_una, "%u")
-                       P(tcpcb.snd_nxt, "%u")
-                       P(tcpcb.snd_wl1, "%u")
-                       P(tcpcb.snd_wl2, "%u")
-                       P(tcpcb.snd_wnd, "%lu")
-                       P(tcpcb.rcv_wnd, "%lu")
-                       P(tcpcb.rcv_nxt, "%u")
-                       P(tcpcb.rcv_adv, "%u")
-                       P(tcpcb.snd_max, "%u")
-                       P(tcpcb.snd_cwnd, "%lu")
-                       P(tcpcb.snd_ssthresh, "%lu")
-                       P(tcpcb.t_rcvtime, "%u")
-                       P(tcpcb.t_rtttime, "%u")
-                       P(tcpcb.t_rtseq, "%u")
-                       P(tcpcb.t_srtt, "%hu")
-                       P(tcpcb.t_rttvar, "%hu")
-                       P(tcpcb.t_rttmin, "%hu")
-                       P(tcpcb.max_sndwnd, "%lu")
-                       P(tcpcb.snd_scale, "%u")
-                       P(tcpcb.rcv_scale, "%u")
-                       P(tcpcb.last_ack_sent, "%u")
+               kupdate_stats(sc->kh, sc->tcbaddr, inpcb, tcpcb,
+                   sockb);
+
+               for (j = 0; sc->kvars[j] != NULL; j++) {
+#define S(a) #a
+#define P(b, v, f)                                                     \
+                       if (strcmp(sc->kvars[j], S(b.v)) == 0) {        \
+                               printf("%s"f, j > 0 ? "," : "", b->v);  \
+                               continue;                               \
+                       }
+                       P(inpcb, inp_flags, "0x%08x")
+                       P(sockb, so_rcv.sb_cc, "%lu")
+                       P(sockb, so_rcv.sb_hiwat, "%lu")
+                       P(sockb, so_snd.sb_cc, "%lu")
+                       P(sockb, so_snd.sb_hiwat, "%lu")
+                       P(tcpcb, snd_una, "%u")
+                       P(tcpcb, snd_nxt, "%u")
+                       P(tcpcb, snd_wl1, "%u")
+                       P(tcpcb, snd_wl2, "%u")
+                       P(tcpcb, snd_wnd, "%lu")
+                       P(tcpcb, rcv_wnd, "%lu")
+                       P(tcpcb, rcv_nxt, "%u")
+                       P(tcpcb, rcv_adv, "%u")
+                       P(tcpcb, snd_max, "%u")
+                       P(tcpcb, snd_cwnd, "%lu")
+                       P(tcpcb, snd_ssthresh, "%lu")
+                       P(tcpcb, t_rcvtime, "%u")
+                       P(tcpcb, t_rtttime, "%u")
+                       P(tcpcb, t_rtseq, "%u")
+                       P(tcpcb, t_srtt, "%hu")
+                       P(tcpcb, t_rttvar, "%hu")
+                       P(tcpcb, t_rttmin, "%hu")
+                       P(tcpcb, max_sndwnd, "%lu")
+                       P(tcpcb, snd_scale, "%u")
+                       P(tcpcb, rcv_scale, "%u")
+                       P(tcpcb, last_ack_sent, "%u")
+#undef S                           
 #undef P
                }
        }
        printf("\n");
-       fflush(stdout);
 }
 
 static void
-stats_finish(struct statctx *sc)
+mainstats_display(long double slice_mbps, long double avg_mbps)
 {
-       struct itimerval itv;
-
-       signal(SIGALRM, SIG_DFL);
-       bzero(&itv, sizeof(itv));
-       setitimer(ITIMER_REAL, &itv, NULL);
+       printf("Conn: %3d Mbps: %12.3Lf Peak Mbps: %12.3Lf Avg Mbps: %12.3Lf\n",
+           mainstats.nconns, slice_mbps, mainstats.peak_mbps, avg_mbps); 
 }
 
-static void __dead
-handle_connection(kvm_t *kvmh, u_long ktcbtab, int sock, int vflag,
-    int rflag, char **kflag, int Bflag)
+static void
+process_slice(struct statctx *sc, size_t nsc)
 {
-       char *buf;
-       struct pollfd pfd;
-       ssize_t n;
-       int r;
-       struct statctx sc;
-
-       if ((buf = malloc(Bflag)) == NULL)
-               err(1, "malloc");
-       if ((r = fcntl(sock, F_GETFL, 0)) == -1)
-               err(1, "fcntl(F_GETFL)");
-       r |= O_NONBLOCK;
-       if (fcntl(sock, F_SETFL, r) == -1)
-               err(1, "fcntl(F_SETFL, O_NONBLOCK)");
+       unsigned long long total_elapsed, since_last;
+       long double mbps, slice_mbps = 0;
+       float bwperc;
+       nfds_t i;
+       struct timeval t_cur, t_diff;
+       struct inpcb inpcb;
+       struct tcpcb tcpcb;
+       struct socket sockb;
+       
+       for (i = 0; i < nsc; i++, sc++) {
+               if (gettimeofday(&t_cur, NULL) == -1)
+                       err(1, "gettimeofday");
+               if (sc->kvars != NULL) /* process kernel stats */
+                       kupdate_stats(sc->kh, sc->tcbaddr, &inpcb, &tcpcb,
+                           &sockb);
+               timersub(&t_cur, &sc->t_start, &t_diff);
+               total_elapsed = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
+               timersub(&t_cur, &sc->t_last, &t_diff);
+               since_last = t_diff.tv_sec * 1000 + t_diff.tv_usec / 1000;
+               bwperc = (sc->bytes * 100.0) / mainstats.slice_bytes;
+               mbps = (sc->bytes * 8) / (since_last * 1000.0);
+               slice_mbps += mbps;
+               
+               stats_display(total_elapsed, mbps, bwperc, sc,
+                   &inpcb, &tcpcb, &sockb);
+               
+               sc->t_last = t_cur;
+               sc->bytes = 0;
 
-       signal(SIGINT, exitsighand);
-       signal(SIGTERM, exitsighand);
-       signal(SIGHUP, exitsighand);
-       signal(SIGPIPE, SIG_IGN);
+       }
 
-       bzero(&pfd, sizeof(pfd));
-       pfd.fd = sock;
-       pfd.events = POLLIN;
+       /* process stats for this slice */
+       if (slice_mbps > mainstats.peak_mbps)
+               mainstats.peak_mbps = slice_mbps;
+       mainstats_display(slice_mbps, slice_mbps / mainstats.nconns);
+}
 
-       stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
+static int
+handle_connection(struct statctx *sc, int fd, char *buf, size_t buflen)
+{
+       ssize_t n;
 
-       while (!done) {
-               if (print_stats) {
-                       stats_display(&sc);
-                       print_stats = 0;
-               }
-               if (poll(&pfd, 1, INFTIM) == -1) {
-                       if (errno == EINTR)
-                               continue;
-                       err(1, "poll");
-               }
-               if ((n = read(pfd.fd, buf, Bflag)) == -1) {
-                       if (errno == EINTR || errno == EAGAIN)
-                               continue;
-                       err(1, "read");
-               }
-               if (n == 0) {
-                       fprintf(stderr, "%8ld closed by remote end\n",
-                           (long)getpid());
-                       done = -1;
-                       break;
-               }
-               if (vflag >= 3)
-                       fprintf(stderr, "read: %zd bytes\n", n);
-               stats_update(&sc, n);
+again:
+       n = read(fd, buf, buflen);
+       if (n == -1) {
+               if (errno == EINTR)
+                       goto again;
+               else if (errno == EWOULDBLOCK) 
+                       return 0;
+               warn("fd %d read error", fd);
+               
+               return -1;
        }
-       stats_finish(&sc);
-
-       free(buf);
-       close(sock);
-       exit(1);
+       else if (n == 0) {
+               if (vflag)
+                       fprintf(stderr, "%8d closed by remote end\n", fd);
+               close(fd);
+               return -1;
+       }
+       if (vflag >= 3)
+               fprintf(stderr, "read: %zd bytes\n", n);
+       
+       stats_update(sc, n);
+       return 0;
 }
 
-static void __dead
-serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop,
-    int vflag, int rflag, char **kflag, int Sflag, int Bflag)
+static nfds_t
+serverbind(struct pollfd *pfd, nfds_t max_nfds, struct addrinfo *aitop)
 {
        char tmp[128];
-       int r, sock, client_id, on = 1;
+       int sock, on = 1;
        struct addrinfo *ai;
-       struct pollfd *pfd;
-       struct sockaddr_storage ss;
-       socklen_t sslen;
-       size_t nfds, i, j;
+       nfds_t lnfds;
 
-       pfd = NULL;
-       nfds = 0;
+       lnfds = 0;
        for (ai = aitop; ai != NULL; ai = ai->ai_next) {
+               if (lnfds == max_nfds) {
+                       fprintf(stderr,
+                           "maximum number of listening fds reached\n");
+                       break;
+               }
                saddr_ntop(ai->ai_addr, ai->ai_addrlen, tmp, sizeof(tmp));
                if (vflag)
-                       fprintf(stderr, "Try listen on %s\n", tmp);
+                       fprintf(stderr, "Try to listen on %s\n", tmp);
                if ((sock = socket(ai->ai_family, ai->ai_socktype,
                    ai->ai_protocol)) == -1) {
                        if (ai->ai_next == NULL)
@@ -532,104 +583,179 @@ serverloop(kvm_t *kvmh, u_long ktcbtab, 
                        close(sock);
                        continue;
                }
-               if (nfds > 128)
-                       break;
-               if ((pfd = realloc(pfd, ++nfds * sizeof(*pfd))) == NULL)
-                       errx(1, "realloc(pfd * %zu)", nfds);
-               pfd[nfds - 1].fd = sock;
-               pfd[nfds - 1].events = POLLIN;
+               if (vflag >= 3)
+                       fprintf(stderr, "listening on fd %d\n", sock);
+               lnfds++;
+               pfd[lnfds - 1].fd = sock;
+               pfd[lnfds - 1].events = POLLIN;
+
        }
        freeaddrinfo(aitop);
-       if (nfds == 0)
+       if (lnfds == 0)
                errx(1, "No working listen addresses found");
 
-       signal(SIGINT, exitsighand);
-       signal(SIGTERM, exitsighand);
-       signal(SIGHUP, exitsighand);
-       signal(SIGPIPE, SIG_IGN);
-       signal(SIGCHLD, SIG_IGN);
+       return lnfds;
+}      
 
+static void
+set_listening(struct pollfd *pfd, nfds_t lfds, int toggle) {
+       int i;
+
+       for (i = 0; i < (int)lfds; i++) {
+               if (toggle)
+                       pfd[i].events = POLLIN;
+               else
+                       pfd[i].events = 0;
+       }
+                       
+}
+static void __dead
+serverloop(kvm_t *kvmh, u_long ktcbtab, struct addrinfo *aitop)
+{
+       socklen_t sslen;
+       struct pollfd *pfd;
+       char tmp[128], *buf;
+       struct statctx *psc;
+       struct sockaddr_storage ss;
+       nfds_t i, nfds, lfds;
+       size_t nalloc;
+       int r, sock, client_id;
+
+       sslen = sizeof(ss);
+       nalloc = 128;
+       if ((pfd = calloc(sizeof(*pfd), nalloc)) == NULL)
+               err(1, "calloc");
+       if ((psc = calloc(sizeof(*psc), nalloc)) == NULL)
+               err(1, "calloc");
+       if ((buf = malloc(Bflag)) == NULL)
+               err(1, "malloc");
+       lfds = nfds = serverbind(pfd, nalloc - 1, aitop);
+       if (vflag >= 3)
+               fprintf(stderr, "listening on %d fds\n", lfds);
        if (setpgid(0, 0) == -1)
                err(1, "setpgid");
-
+       
+       print_header();
+       
        client_id = 0;
-       while (!done) {         
+       while (!done) {
+               if (proc_slice) { 
+                       process_slice(psc + lfds, nfds - lfds);
+                       stats_cleanslice();
+                       proc_slice = 0;
+               }
+               if (vflag >= 3) 
+                       fprintf(stderr, "mainstats.nconns = %u\n",
+                           mainstats.nconns);
                if ((r = poll(pfd, nfds, INFTIM)) == -1) {
                        if (errno == EINTR)
                                continue;
                        warn("poll");
                        break;
                }
+
                if (vflag >= 3)
                        fprintf(stderr, "poll: %d\n", r);
                for (i = 0 ; r > 0 && i < nfds; i++) {
                        if ((pfd[i].revents & POLLIN) == 0)
                                continue;
-                       if (vflag >= 3)
-                               fprintf(stderr, "fd %d active\n", pfd[i].fd);
+                       if (pfd[i].fd == -1)
+                               errx(1, "pfd insane");
                        r--;
-                       sslen = sizeof(ss);
-                       if ((sock = accept(pfd[i].fd, (struct sockaddr *)&ss,
-                           &sslen)) == -1) {
-                               if (errno == EINTR)
+                       if (vflag >= 3)
+                               fprintf(stderr, "fd %d active i = %d\n",
+                                   pfd[i].fd, i);
+                       /* new connection */
+                       if (i < lfds) {
+                               if ((sock = accept(pfd[i].fd,
+                                   (struct sockaddr *)&ss,
+                                   &sslen)) == -1) {
+                                       if (errno == EINTR)
+                                               continue;
+                                       else if (errno == EMFILE ||
+                                           errno == ENFILE)
+                                               set_listening(pfd, lfds, 0);
+                                       warn("accept");
                                        continue;
-                               warn("accept");
-                               break;
+                               }
+                               if ((r = fcntl(sock, F_GETFL, 0)) == -1)
+                                       err(1, "fcntl(F_GETFL)");
+                               r |= O_NONBLOCK;
+                               if (fcntl(sock, F_SETFL, r) == -1)
+                                       err(1, "fcntl(F_SETFL, O_NONBLOCK)");
+                               saddr_ntop((struct sockaddr *)&ss, sslen,
+                                   tmp, sizeof(tmp));
+                               if (vflag)
+                                       fprintf(stderr,
+                                           "Accepted connection %d from "
+                                           "%s, fd = %d\n", client_id++, tmp,
+                                            sock);
+                               /* alloc more space if we're full */
+                               if (nfds == nalloc) {
+                                       nalloc *= 2;
+                                       if ((pfd = realloc(pfd,
+                                           sizeof(*pfd) * nalloc)) == NULL)
+                                               err(1, "realloc");
+                                       if ((psc = realloc(psc,
+                                           sizeof(*psc) * nalloc)) == NULL)
+                                               err(1, "realloc");
+                               }
+                               pfd[nfds].fd = sock;
+                               pfd[nfds].events = POLLIN;
+                               stats_prepare(&psc[nfds], sock, kvmh, ktcbtab);
+                               nfds++;
+                               if (!mainstats.nconns++)
+                                       set_timer(1);
+                               continue;
                        }
-                       saddr_ntop((struct sockaddr *)&ss, sslen,
-                           tmp, sizeof(tmp));
-                       if (vflag)
-                               fprintf(stderr, "Accepted connection %d from "
-                                   "%s, fd = %d\n", client_id++, tmp, sock);
-                       switch (fork()) {
-                       case -1:
-                               warn("fork");
-                               done = -1;
-                               break;
-                       case 0:
-                               for (j = 0; j < nfds; j++)
-                                       if (j != i)
-                                               close(pfd[j].fd);
-                               handle_connection(kvmh, ktcbtab, sock,
-                                   vflag, rflag, kflag, Bflag);
-                               /* NOTREACHED */
-                               _exit(1);
-                       default:
-                               close(sock);
-                               break;
+                       /* event in fd */
+                       if (vflag >= 3)
+                               fprintf(stderr,
+                                   "fd %d active", pfd[i].fd);
+                       while (handle_connection(&psc[i], pfd[i].fd,
+                           buf, Bflag) == -1) {
+                               pfd[i] = pfd[nfds - 1];
+                               pfd[nfds - 1].fd = -1;
+                               psc[i] = psc[nfds - 1];
+                               mainstats.nconns--;
+                               nfds--;
+                               /* stop display if no clients */
+                               if (!mainstats.nconns) {
+                                       proc_slice = 1;
+                                       set_timer(0);
+                               }
+                               /* if we were full */
+                               set_listening(pfd, lfds, 1);
+
+                               /* is there an event pending on the last fd? */
+                               if (pfd[i].fd == -1 ||
+                                   (pfd[i].revents & POLLIN) == 0)
+                                       break;
                        }
-                       if (done == -1)
-                               break;
                }
        }
-       for (i = 0; i < nfds; i++)
-               close(pfd[i].fd);
-       if (done > 0)
-               warnx("Terminated by signal %d", done);
-       signal(SIGTERM, SIG_IGN);
-       killpg(0, SIGTERM);
        exit(1);
 }
 
 static void __dead
-clientloop(kvm_t *kvmh, u_long ktcbtab, const char *host, const char *port,
-    int vflag, int rflag, char **kflag, int Sflag, int Bflag, int nconn)
+clientloop(kvm_t *kvmh, u_long ktcbtab, const char *host, const char *port, 
int nconn)
 {
-       char tmp[128];
-       char *buf;
-       int r, sock, herr;
        struct addrinfo *aitop, *ai, hints;
+       struct statctx *psc;
        struct pollfd *pfd;
+       char tmp[128], *buf;
+       int i, r, herr, sock = -1;
+       u_int scnt = 0;
        ssize_t n;
-       struct statctx sc;
-       u_int i, scnt = 0;
 
        if ((buf = malloc(Bflag)) == NULL)
                err(1, "malloc");
 
-       if ((pfd = calloc(nconn, sizeof(struct pollfd))) == NULL)
+       if ((pfd = calloc(nconn, sizeof(*pfd))) == NULL)
                err(1, "clientloop pfd calloc");
-
+       if ((psc = calloc(nconn, sizeof(*psc))) == NULL)
+               err(1, "clientloop psc calloc");
+       
        for (i = 0; i < nconn; i++) {
                bzero(&hints, sizeof(hints));
                hints.ai_socktype = SOCK_STREAM;
@@ -688,6 +814,8 @@ clientloop(kvm_t *kvmh, u_long ktcbtab, 
 
                pfd[i].fd = sock;
                pfd[i].events = POLLOUT;
+               stats_prepare(psc + i, sock, kvmh, ktcbtab);
+               mainstats.nconns++;
                scnt++;
        }
 
@@ -695,17 +823,14 @@ clientloop(kvm_t *kvmh, u_long ktcbtab, 
                fprintf(stderr, "%u connections established\n", scnt);
        arc4random_buf(buf, Bflag);
 
-       signal(SIGINT, exitsighand);
-       signal(SIGTERM, exitsighand);
-       signal(SIGHUP, exitsighand);
-       signal(SIGPIPE, SIG_IGN);
-
-       stats_prepare(&sc, sock, kvmh, ktcbtab, rflag, vflag, kflag);
+       print_header();
+       set_timer(1);
 
        while (!done) {
-               if (print_stats) {
-                       stats_display(&sc);
-                       print_stats = 0;
+               if (proc_slice) {
+                       process_slice(psc, scnt);
+                       stats_cleanslice();
+                       proc_slice = 0;
                }
                if (poll(pfd, nconn, INFTIM) == -1) {
                        if (errno == EINTR)
@@ -727,12 +852,11 @@ clientloop(kvm_t *kvmh, u_long ktcbtab, 
                                if (vflag >= 3)
                                        fprintf(stderr, "write: %zd bytes\n",
                                            n);
-                               stats_update(&sc, n);
+                               stats_update(psc + i, n);
                        }
                }
        }
-       stats_finish(&sc);
-
+       
        if (done > 0)
                warnx("Terminated by signal %d", done);
 
@@ -758,17 +882,20 @@ main(int argc, char **argv)
        extern char *optarg;
 
        char kerr[_POSIX2_LINE_MAX], *tmp;
-       const char *errstr;
-       int ch, herr;
        struct addrinfo *aitop, hints;
+       const char *errstr;
        kvm_t *kvmh = NULL;
+       struct rlimit rl;
+       int ch, herr;
 
        const char *host = NULL, *port = DEFAULT_PORT;
-       char **kflag = NULL;
-       int sflag = 0, vflag = 0, rflag = DEFAULT_STATS_INTERVAL, Sflag = 0;
-       int Bflag = DEFAULT_BUF;
        int nconn = 1;
 
+       Bflag = DEFAULT_BUF;
+       Sflag = sflag = vflag = rdomain = 0;
+       kflag = NULL;
+       rflag = DEFAULT_STATS_INTERVAL;
+
        struct nlist nl[] = { { "_tcbtable" }, { "" } };
 
        while ((ch = getopt(argc, argv, "B:hlk:n:p:r:sS:vV:")) != -1) {
@@ -836,9 +963,6 @@ main(int argc, char **argv)
        if (argc != (sflag ? 0 : 1))
                usage();
 
-       if (kflag != NULL && nconn > 1)
-               errx(1, "-k currently only works with a single tcp connection");
-
        if (!sflag)
                host = argv[0];
 
@@ -864,12 +988,25 @@ main(int argc, char **argv)
        } else
                drop_gid();
 
+       signal(SIGINT, exitsighand);
+       signal(SIGTERM, exitsighand);
+       signal(SIGHUP, exitsighand);
+       signal(SIGPIPE, SIG_IGN);
+       signal(SIGALRM, alarmhandler);
+
+       if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
+               err(1, "getrlimit");
+       if (rl.rlim_cur < MAX_FD)
+               rl.rlim_cur = MAX_FD;
+       if (setrlimit(RLIMIT_NOFILE, &rl))
+               err(1, "setrlimit");
+       if (getrlimit(RLIMIT_NOFILE, &rl) == -1)
+               err(1, "getrlimit");
+       
        if (sflag)
-               serverloop(kvmh, nl[0].n_value, aitop, vflag, rflag, kflag,
-                   Sflag, Bflag);
+               serverloop(kvmh, nl[0].n_value, aitop);
        else
-               clientloop(kvmh, nl[0].n_value, host, port, vflag, rflag, kflag,
-                   Sflag, Bflag, nconn);
+               clientloop(kvmh, nl[0].n_value, host, port, nconn);
 
        return 0;
 }

Reply via email to