On Tue, Aug 18, 2020 at 03:06:58PM +0000, Visa Hankala wrote:
> This diff introduces a mutex that serializes access to the kernel
> message buffers. At the moment, the buffers do not have clear
> concurrency control.
> 
> The mutex controls access to all the modifiable fields of struct msgbuf.
> It also protects logsoftc.sc_state for managing thread wakeups.
> 
> A tricky thing with the diff is that the code is indirectly used in many
> different contexts and in varied machine-specific functions. Luckily,
> the code already utilizes splhigh() in critical parts, so the transition
> to using a mutex possibly is not such a big step any longer.
> 
> To avoid creating problems with lock order, the code does not take
> new locks when the log mutex is held. This is visible in logwakeup()
> and logread(). It looks that selrecord() does not acquire new locks,
> so no extra steps are needed in logpoll().
> 
> In logread(), the code assumes that there is only one reader. This keeps
> the data extraction logic simple.
> 
> An additional benefit (?) of the diff is that now uiomove(9) is called
> without splhigh().
> 
> I have tested the diff on amd64 and octeon. However, additional tests
> on these and other platforms would be very helpful.

Here is an updated diff that has sleep_setup()'s priority parameter
fixed. The mistake was noticed by mpi@.

Index: kern/subr_log.c
===================================================================
RCS file: src/sys/kern/subr_log.c,v
retrieving revision 1.68
diff -u -p -r1.68 subr_log.c
--- kern/subr_log.c     18 Aug 2020 13:41:49 -0000      1.68
+++ kern/subr_log.c     20 Aug 2020 13:52:05 -0000
@@ -52,6 +52,7 @@
 #include <sys/socket.h>
 #include <sys/socketvar.h>
 #include <sys/fcntl.h>
+#include <sys/mutex.h>
 #include <sys/timeout.h>
 
 #ifdef KTRACE
@@ -69,8 +70,12 @@
 #define LOG_ASYNC      0x04
 #define LOG_RDWAIT     0x08
 
+/*
+ * Locking:
+ *     L       log_mtx
+ */
 struct logsoftc {
-       int     sc_state;               /* see above for possibilities */
+       int     sc_state;               /* [L] see above for possibilities */
        struct  selinfo sc_selp;        /* process waiting on select call */
        struct  sigio_ref sc_sigio;     /* async I/O registration */
        int     sc_need_wakeup;         /* if set, wake up waiters */
@@ -83,6 +88,14 @@ struct       msgbuf *msgbufp;                /* the mapped b
 struct msgbuf *consbufp;               /* console message buffer. */
 struct file *syslogf;
 
+/*
+ * Lock that serializes access to log message buffers.
+ * This should be kept as a leaf lock in order not to constrain where
+ * printf(9) can be used.
+ */
+struct mutex log_mtx =
+    MUTEX_INITIALIZER_FLAGS(IPL_HIGH, "logmtx", MTX_NOWITNESS);
+
 void filt_logrdetach(struct knote *kn);
 int filt_logread(struct knote *kn, long hint);
 
@@ -144,13 +157,11 @@ initconsbuf(void)
 void
 msgbuf_putchar(struct msgbuf *mbp, const char c)
 {
-       int s;
-
        if (mbp->msg_magic != MSG_MAGIC)
                /* Nothing we can do */
                return;
 
-       s = splhigh();
+       mtx_enter(&log_mtx);
        mbp->msg_bufc[mbp->msg_bufx++] = c;
        if (mbp->msg_bufx < 0 || mbp->msg_bufx >= mbp->msg_bufs)
                mbp->msg_bufx = 0;
@@ -160,20 +171,19 @@ msgbuf_putchar(struct msgbuf *mbp, const
                        mbp->msg_bufr = 0;
                mbp->msg_bufd++;
        }
-       splx(s);
+       mtx_leave(&log_mtx);
 }
 
 size_t
 msgbuf_getlen(struct msgbuf *mbp)
 {
        long len;
-       int s;
 
-       s = splhigh();
+       mtx_enter(&log_mtx);
        len = mbp->msg_bufx - mbp->msg_bufr;
        if (len < 0)
                len += mbp->msg_bufs;
-       splx(s);
+       mtx_leave(&log_mtx);
        return (len);
 }
 
@@ -208,34 +218,47 @@ logclose(dev_t dev, int flag, int mode, 
 int
 logread(dev_t dev, struct uio *uio, int flag)
 {
+       struct sleep_state sls;
        struct msgbuf *mbp = msgbufp;
-       size_t l;
-       int s, error = 0;
+       size_t l, rpos;
+       int error = 0;
 
-       s = splhigh();
+       mtx_enter(&log_mtx);
        while (mbp->msg_bufr == mbp->msg_bufx) {
                if (flag & IO_NDELAY) {
                        error = EWOULDBLOCK;
                        goto out;
                }
                logsoftc.sc_state |= LOG_RDWAIT;
-               error = tsleep_nsec(mbp, LOG_RDPRI | PCATCH, "klog", INFSLP);
+               mtx_leave(&log_mtx);
+               /*
+                * Set up and enter sleep manually instead of using msleep()
+                * to keep log_mtx as a leaf lock.
+                */
+               sleep_setup(&sls, mbp, LOG_RDPRI | PCATCH, "klog");
+               sleep_setup_signal(&sls);
+               sleep_finish(&sls, logsoftc.sc_state & LOG_RDWAIT);
+               error = sleep_finish_signal(&sls);
+               mtx_enter(&log_mtx);
                if (error)
                        goto out;
        }
-       logsoftc.sc_state &= ~LOG_RDWAIT;
 
        if (mbp->msg_bufd > 0) {
                char buf[64];
+               long ndropped;
 
+               ndropped = mbp->msg_bufd;
+               mtx_leave(&log_mtx);
                l = snprintf(buf, sizeof(buf),
                    "<%d>klog: dropped %ld byte%s, message buffer full\n",
-                   LOG_KERN|LOG_WARNING, mbp->msg_bufd,
-                    mbp->msg_bufd == 1 ? "" : "s");
+                   LOG_KERN|LOG_WARNING, ndropped,
+                   ndropped == 1 ? "" : "s");
                error = uiomove(buf, ulmin(l, sizeof(buf) - 1), uio);
+               mtx_enter(&log_mtx);
                if (error)
                        goto out;
-               mbp->msg_bufd = 0;
+               mbp->msg_bufd -= ndropped;
        }
 
        while (uio->uio_resid > 0) {
@@ -246,7 +269,11 @@ logread(dev_t dev, struct uio *uio, int 
                l = ulmin(l, uio->uio_resid);
                if (l == 0)
                        break;
-               error = uiomove(&mbp->msg_bufc[mbp->msg_bufr], l, uio);
+               rpos = mbp->msg_bufr;
+               mtx_leave(&log_mtx);
+               /* Ignore that concurrent readers may consume the same data. */
+               error = uiomove(&mbp->msg_bufc[rpos], l, uio);
+               mtx_enter(&log_mtx);
                if (error)
                        break;
                mbp->msg_bufr += l;
@@ -254,23 +281,23 @@ logread(dev_t dev, struct uio *uio, int 
                        mbp->msg_bufr = 0;
        }
  out:
-       splx(s);
+       mtx_leave(&log_mtx);
        return (error);
 }
 
 int
 logpoll(dev_t dev, int events, struct proc *p)
 {
-       int s, revents = 0;
+       int revents = 0;
 
-       s = splhigh();
+       mtx_enter(&log_mtx);
        if (events & (POLLIN | POLLRDNORM)) {
                if (msgbufp->msg_bufr != msgbufp->msg_bufx)
                        revents |= events & (POLLIN | POLLRDNORM);
                else
                        selrecord(p, &logsoftc.sc_selp);
        }
-       splx(s);
+       mtx_leave(&log_mtx);
        return (revents);
 }
 
@@ -339,6 +366,8 @@ logwakeup(void)
 void
 logtick(void *arg)
 {
+       int state;
+
        if (!log_open)
                return;
 
@@ -348,17 +377,20 @@ logtick(void *arg)
 
        /*
         * sc_need_wakeup has to be cleared before handling the wakeup.
-        * This ensures that no wakeup is lost.
+        * Visiting log_mtx ensures the proper order.
         */
-       membar_enter();
+
+       mtx_enter(&log_mtx);
+       state = logsoftc.sc_state;
+       if (logsoftc.sc_state & LOG_RDWAIT)
+               logsoftc.sc_state &= ~LOG_RDWAIT;
+       mtx_leave(&log_mtx);
 
        selwakeup(&logsoftc.sc_selp);
-       if (logsoftc.sc_state & LOG_ASYNC)
+       if (state & LOG_ASYNC)
                pgsigio(&logsoftc.sc_sigio, SIGIO, 0);
-       if (logsoftc.sc_state & LOG_RDWAIT) {
+       if (state & LOG_RDWAIT)
                wakeup(msgbufp);
-               logsoftc.sc_state &= ~LOG_RDWAIT;
-       }
 out:
        timeout_add_msec(&logsoftc.sc_tick, LOG_TICK);
 }
@@ -380,10 +412,12 @@ logioctl(dev_t dev, u_long com, caddr_t 
                break;
 
        case FIOASYNC:
+               mtx_enter(&log_mtx);
                if (*(int *)data)
                        logsoftc.sc_state |= LOG_ASYNC;
                else
                        logsoftc.sc_state &= ~LOG_ASYNC;
+               mtx_leave(&log_mtx);
                break;
 
        case FIOSETOWN:
Index: sys/msgbuf.h
===================================================================
RCS file: src/sys/sys/msgbuf.h,v
retrieving revision 1.12
diff -u -p -r1.12 msgbuf.h
--- sys/msgbuf.h        18 Aug 2020 13:38:24 -0000      1.12
+++ sys/msgbuf.h        20 Aug 2020 13:52:05 -0000
@@ -32,14 +32,20 @@
  *     @(#)msgbuf.h    8.1 (Berkeley) 6/2/93
  */
 
+/*
+ * Locking:
+ *     I       immutable after creation
+ *     L       log_mtx
+ *     Lw      log_mtx for writing
+ */
 struct msgbuf {
 #define        MSG_MAGIC       0x063061
-       long    msg_magic;
-       long    msg_bufx;               /* write pointer */
-       long    msg_bufr;               /* read pointer */
-       long    msg_bufs;               /* real msg_bufc size (bytes) */
-       long    msg_bufd;               /* number of dropped bytes */
-       char    msg_bufc[1];            /* buffer */
+       long    msg_magic;              /* [I] buffer magic value */
+       long    msg_bufx;               /* [L] write pointer */
+       long    msg_bufr;               /* [L] read pointer */
+       long    msg_bufs;               /* [I] real msg_bufc size (bytes) */
+       long    msg_bufd;               /* [L] number of dropped bytes */
+       char    msg_bufc[1];            /* [Lw] buffer */
 };
 #ifdef _KERNEL
 #define CONSBUFSIZE    (16 * 1024)     /* console message buffer size */

Reply via email to