On Tue, Oct 11, 2016 at 02:02:46AM +0200, Rafael Zalamena wrote:
> This diff brings the relayd(8) proc.c up-to-date and removes the file limit
> alteration in relayd.c. The file limit alteration is not needed anymore
> since now the number of descriptors pre-allocated is very small (only one
> descriptor per child + 2 to distribute fds between child).
> 
> It would be nice to have some feedback in this diff since this daemon is
> the one that most uses the proc.c multiple instances of child process.

Here is an updated diff with the proc_flush_imsg() fix from reyk@.

Summary:
 * Fixes the msgbuf_write() usage idiom;
 * Add context to fatal() messages;
 * Use proc_flush_imsg() instead of manually using imsg_flush();
 * Use less fds on startup;

ok?

Index: proc.c
===================================================================
RCS file: /home/obsdcvs/src/usr.sbin/relayd/proc.c,v
retrieving revision 1.36
diff -u -p -r1.36 proc.c
--- proc.c      5 Oct 2016 17:31:28 -0000       1.36
+++ proc.c      14 Oct 2016 15:14:21 -0000
@@ -37,8 +37,6 @@
 
 void    proc_exec(struct privsep *, struct privsep_proc *, unsigned int,
            int, char **);
-void    proc_connectpeer(struct privsep *, enum privsep_procid, int,
-           struct privsep_pipes *);
 void    proc_setup(struct privsep *, struct privsep_proc *, unsigned int);
 void    proc_open(struct privsep *, int, int);
 void    proc_accept(struct privsep *, int, enum privsep_procid,
@@ -157,72 +155,38 @@ proc_exec(struct privsep *ps, struct pri
 }
 
 void
-proc_connectpeer(struct privsep *ps, enum privsep_procid id, int inst,
-    struct privsep_pipes *pp)
-{
-       unsigned int             i, j;
-       struct privsep_fd        pf;
-
-       for (i = 0; i < PROC_MAX; i++) {
-               /* Parent is already connected with everyone. */
-               if (i == PROC_PARENT)
-                       continue;
-
-               for (j = 0; j < ps->ps_instances[i]; j++) {
-                       /* Don't send socket to child itself. */
-                       if (i == (unsigned int)id &&
-                           j == (unsigned int)inst)
-                               continue;
-                       if (pp->pp_pipes[i][j] == -1)
-                               continue;
-
-                       pf.pf_procid = i;
-                       pf.pf_instance = j;
-                       proc_compose_imsg(ps, id, inst, IMSG_CTL_PROCFD,
-                           -1, pp->pp_pipes[i][j], &pf, sizeof(pf));
-                       pp->pp_pipes[i][j] = -1;
-               }
-       }
-}
-
-/* Inter-connect all process except with ourself. */
-void
 proc_connect(struct privsep *ps)
 {
-       unsigned int             src, i, j;
-       struct privsep_pipes    *pp;
        struct imsgev           *iev;
+       unsigned int             src, dst, inst;
 
-       /* Listen on appropriate pipes. */
-       src = privsep_process;
-       pp = &ps->ps_pipes[src][ps->ps_instance];
-
-       for (i = 0; i < PROC_MAX; i++) {
-               /* Don't listen to ourself. */
-               if (i == src)
-                       continue;
+       /* Don't distribute any sockets if we are not really going to run. */
+       if (ps->ps_noaction)
+               return;
 
-               for (j = 0; j < ps->ps_instances[i]; j++) {
-                       if (pp->pp_pipes[i][j] == -1)
-                               continue;
+       for (dst = 0; dst < PROC_MAX; dst++) {
+               /* We don't communicate with ourselves. */
+               if (dst == PROC_PARENT)
+                       continue;
 
-                       iev = &ps->ps_ievs[i][j];
-                       imsg_init(&iev->ibuf, pp->pp_pipes[i][j]);
+               for (inst = 0; inst < ps->ps_instances[dst]; inst++) {
+                       iev = &ps->ps_ievs[dst][inst];
+                       imsg_init(&iev->ibuf, ps->ps_pp->pp_pipes[dst][inst]);
                        event_set(&iev->ev, iev->ibuf.fd, iev->events,
                            iev->handler, iev->data);
                        event_add(&iev->ev, NULL);
                }
        }
 
-       /* Exchange pipes between process. */
-       for (i = 0; i < PROC_MAX; i++) {
-               /* Parent is already connected with everyone. */
-               if (i == PROC_PARENT)
-                       continue;
+       /* Distribute the socketpair()s for everyone. */
+       for (src = 0; src < PROC_MAX; src++)
+               for (dst = src; dst < PROC_MAX; dst++) {
+                       /* Parent already distributed its fds. */
+                       if (src == PROC_PARENT || dst == PROC_PARENT)
+                               continue;
 
-               for (j = 0; j < ps->ps_instances[i]; j++)
-                       proc_connectpeer(ps, i, j, &ps->ps_pipes[i][j]);
-       }
+                       proc_open(ps, src, dst);
+               }
 }
 
 void
@@ -230,17 +194,41 @@ proc_init(struct privsep *ps, struct pri
     int argc, char **argv, enum privsep_procid proc_id)
 {
        struct privsep_proc     *p = NULL;
+       struct privsep_pipes    *pa, *pb;
        unsigned int             proc;
-       unsigned int             src, dst;
+       unsigned int             dst;
+       int                      fds[2];
+
+       /* Don't initiate anything if we are not really going to run. */
+       if (ps->ps_noaction)
+               return;
 
        if (proc_id == PROC_PARENT) {
                privsep_process = PROC_PARENT;
                proc_setup(ps, procs, nproc);
 
-               /* Open socketpair()s for everyone. */
-               for (src = 0; src < PROC_MAX; src++)
-                       for (dst = 0; dst < PROC_MAX; dst++)
-                               proc_open(ps, src, dst);
+               /*
+                * Create the children sockets so we can use them 
+                * to distribute the rest of the socketpair()s using
+                * proc_connect() later.
+                */
+               for (dst = 0; dst < PROC_MAX; dst++) {
+                       /* Don't create socket for ourselves. */
+                       if (dst == PROC_PARENT)
+                               continue;
+
+                       for (proc = 0; proc < ps->ps_instances[dst]; proc++) {
+                               pa = &ps->ps_pipes[PROC_PARENT][0];
+                               pb = &ps->ps_pipes[dst][proc];
+                               if (socketpair(AF_UNIX,
+                                   SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
+                                   PF_UNSPEC, fds) == -1)
+                                       fatal("%s: socketpair", __func__);
+
+                               pa->pp_pipes[dst][proc] = fds[0];
+                               pb->pp_pipes[PROC_PARENT][0] = fds[1];
+                       }
+               }
 
                /* Engage! */
                proc_exec(ps, procs, nproc, argc, argv);
@@ -317,7 +305,7 @@ proc_setup(struct privsep *ps, struct pr
                ps->ps_title[id] = procs[src].p_title;
                if ((ps->ps_ievs[id] = calloc(ps->ps_instances[id],
                    sizeof(struct imsgev))) == NULL)
-                       fatal(__func__);
+                       fatal("%s: calloc", __func__);
 
                /* With this set up, we are ready to call imsg_init(). */
                for (i = 0; i < ps->ps_instances[id]; i++) {
@@ -409,9 +397,11 @@ void
 proc_open(struct privsep *ps, int src, int dst)
 {
        struct privsep_pipes    *pa, *pb;
+       struct privsep_fd        pf;
        int                      fds[2];
        unsigned int             i, j;
 
+       /* Exchange pipes between process. */
        for (i = 0; i < ps->ps_instances[src]; i++) {
                for (j = 0; j < ps->ps_instances[dst]; j++) {
                        /* Don't create sockets for ourself. */
@@ -420,16 +410,33 @@ proc_open(struct privsep *ps, int src, i
 
                        pa = &ps->ps_pipes[src][i];
                        pb = &ps->ps_pipes[dst][j];
-                       if (pb->pp_pipes[dst][j] != -1)
-                               continue;
-
                        if (socketpair(AF_UNIX,
                            SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC,
                            PF_UNSPEC, fds) == -1)
-                               fatal(__func__);
+                               fatal("%s: socketpair", __func__);
 
                        pa->pp_pipes[dst][j] = fds[0];
                        pb->pp_pipes[src][i] = fds[1];
+
+                       pf.pf_procid = src;
+                       pf.pf_instance = i;
+                       if (proc_compose_imsg(ps, dst, j, IMSG_CTL_PROCFD,
+                           -1, pb->pp_pipes[src][i], &pf, sizeof(pf)) == -1)
+                               fatal("%s: proc_compose_imsg", __func__);
+
+                       pf.pf_procid = dst;
+                       pf.pf_instance = j;
+                       if (proc_compose_imsg(ps, src, i, IMSG_CTL_PROCFD,
+                           -1, pa->pp_pipes[dst][j], &pf, sizeof(pf)) == -1)
+                               fatal("%s: proc_compose_imsg", __func__);
+
+                       /*
+                        * We have to flush to send the descriptors and close
+                        * them to avoid the fd ramp on startup.
+                        */
+                       if (proc_flush_imsg(ps, src, i) == -1 ||
+                           proc_flush_imsg(ps, dst, j) == -1)
+                               fatal("%s: imsg_flush", __func__);
                }
        }
 }
@@ -512,9 +519,6 @@ proc_run(struct privsep *ps, struct priv
        const char              *root;
        struct control_sock     *rcs;
 
-       if (ps->ps_noaction)
-               exit(0);
-
        log_procinit(p->p_title);
 
        /* Set the process group of the current process */
@@ -522,10 +526,10 @@ proc_run(struct privsep *ps, struct priv
 
        if (p->p_id == PROC_CONTROL && ps->ps_instance == 0) {
                if (control_init(ps, &ps->ps_csock) == -1)
-                       fatalx(__func__);
+                       fatalx("%s: control_init", __func__);
                TAILQ_FOREACH(rcs, &ps->ps_rcsocks, cs_entry)
                        if (control_init(ps, rcs) == -1)
-                               fatalx(__func__);
+                               fatalx("%s: control_init", __func__);
        }
 
        /* Use non-standard user */
@@ -575,10 +579,10 @@ proc_run(struct privsep *ps, struct priv
        if (p->p_id == PROC_CONTROL && ps->ps_instance == 0) {
                TAILQ_INIT(&ctl_conns);
                if (control_listen(&ps->ps_csock) == -1)
-                       fatalx(__func__);
+                       fatalx("%s: control_listen", __func__);
                TAILQ_FOREACH(rcs, &ps->ps_rcsocks, cs_entry)
                        if (control_listen(rcs) == -1)
-                               fatalx(__func__);
+                               fatalx("%s: control_listen", __func__);
        }
 
        DPRINTF("%s: %s %d/%d, pid %d", __func__, p->p_title,
@@ -610,7 +614,7 @@ proc_dispatch(int fd, short event, void 
 
        if (event & EV_READ) {
                if ((n = imsg_read(ibuf)) == -1 && errno != EAGAIN)
-                       fatal(__func__);
+                       fatal("%s: imsg_read", __func__);
                if (n == 0) {
                        /* this pipe is dead, so remove the event handler */
                        event_del(&iev->ev);
@@ -620,13 +624,19 @@ proc_dispatch(int fd, short event, void 
        }
 
        if (event & EV_WRITE) {
-               if (msgbuf_write(&ibuf->w) <= 0 && errno != EAGAIN)
-                       fatal(__func__);
+               if ((n = msgbuf_write(&ibuf->w)) == -1 && errno != EAGAIN)
+                       fatal("%s: msgbuf_write", __func__);
+               if (n == 0) {
+                       /* this pipe is dead, so remove the event handler */
+                       event_del(&iev->ev);
+                       event_loopexit(NULL);
+                       return;
+               }
        }
 
        for (;;) {
                if ((n = imsg_get(ibuf, &imsg)) == -1)
-                       fatal(__func__);
+                       fatal("%s: imsg_get", __func__);
                if (n == 0)
                        break;
 
@@ -661,12 +671,11 @@ proc_dispatch(int fd, short event, void 
                            pf.pf_instance);
                        break;
                default:
-                       log_warnx("%s: %s %d got invalid imsg %d peerid %d "
+                       fatalx("%s: %s %d got invalid imsg %d peerid %d "
                            "from %s %d",
                            __func__, title, ps->ps_instance + 1,
                            imsg.hdr.type, imsg.hdr.peerid,
                            p->p_title, imsg.hdr.pid);
-                       fatalx(__func__);
                }
                imsg_free(&imsg);
        }
@@ -808,4 +817,26 @@ proc_iev(struct privsep *ps, enum privse
 
        proc_range(ps, id, &n, &m);
        return (&ps->ps_ievs[id][n]);
+}
+
+/* This function should only be called with care as it breaks async I/O */
+int
+proc_flush_imsg(struct privsep *ps, enum privsep_procid id, int n)
+{
+       struct imsgbuf  *ibuf;
+       int              m, ret = 0;
+
+       proc_range(ps, id, &n, &m);
+       for (; n < m; n++) {
+               if ((ibuf = proc_ibuf(ps, id, n)) == NULL)
+                       return (-1);
+               do {
+                       ret = imsg_flush(ibuf);
+               } while (ret == -1 && errno == EAGAIN);
+               if (ret == -1)
+                       break;
+               imsg_event_add(&ps->ps_ievs[id][n]);
+       }
+
+       return (ret);
 }
Index: relayd.c
===================================================================
RCS file: /home/obsdcvs/src/usr.sbin/relayd/relayd.c,v
retrieving revision 1.162
diff -u -p -r1.162 relayd.c
--- relayd.c    28 Sep 2016 12:16:44 -0000      1.162
+++ relayd.c    14 Oct 2016 15:26:36 -0000
@@ -214,11 +214,6 @@ main(int argc, char *argv[])
        if (title != NULL)
                ps->ps_title[proc_id] = title;
 
-       if (proc_id == PROC_PARENT) {
-               /* XXX the parent opens too many fds in proc_open() */
-               socket_rlimit(-1);
-       }
-
        /* only the parent returns */
        proc_init(ps, procs, nitems(procs), argc0, argv, proc_id);
 
Index: relayd.h
===================================================================
RCS file: /home/obsdcvs/src/usr.sbin/relayd/relayd.h,v
retrieving revision 1.235
diff -u -p -r1.235 relayd.h
--- relayd.h    5 Oct 2016 16:58:19 -0000       1.235
+++ relayd.h    14 Oct 2016 15:13:15 -0000
@@ -1401,6 +1401,7 @@ int        imsg_compose_event(struct imsgev *,
            pid_t, int, void *, uint16_t);
 int     imsg_composev_event(struct imsgev *, uint16_t, uint32_t,
            pid_t, int, const struct iovec *, int);
+int     proc_flush_imsg(struct privsep *, enum privsep_procid, int);
 
 /* config.c */
 int     config_init(struct relayd *);

Reply via email to