On 26/07/21(Mon) 08:55, Martin Pieuchot wrote:
> On 21/07/21(Wed) 10:18, Martin Pieuchot wrote:
> > On 11/07/21(Sun) 14:45, Visa Hankala wrote:
> > > On Sat, Jul 10, 2021 at 05:26:57PM +0200, Martin Pieuchot wrote:
> > > > One of the reasons for the drop of performances in the kqueue-based
> > > > poll/select is the fact that kqueue filters are called up to 3 times
> > > > per syscall and that they all spin on the NET_LOCK() for TCP/UDP
> > > > packets.
> > > > 
> > > > Diff below is a RFC for improving the situation.
> > > > 
> > > > socket kqueue filters mainly check for the amount of available items to
> > > > read/write.  This involves comparing various socket buffer fields 
> > > > (sb_cc,
> > > > sb_lowat, etc).  The diff below introduces a new mutex to serialize
> > > > updates of those fields with reads in the kqueue filters.
> > > > 
> > > > Since these fields are always modified with the socket lock held, either
> > > > the mutex or the solock are enough to have a coherent view of them.
> > > > Note that either of these locks is necessary only if multiple fields
> > > > have to be read (like in sbspace()).
> > > > 
> > > > Other per-socket fields accessed in the kqueue filters are never
> > > > combined (with &&) to determine a condition.  So assuming it is fine to
> > > > read register-sized fields w/o the socket lock we can safely remove it
> > > > there.
> > > > 
> > > > Could such mutex also be used to serialize klist updates?
> > > 
> > > I think the lock should be such that it can serialize socket klists.
> > > 
> > > As the main motivator for this change is kqueue, the viability of using
> > > the mutex for the klist locking should be checked now. The mutex has to
> > > be held whenever calling KNOTE() on sb_sel.si_note, or selwakeup() on
> > > sb_sel. Then the socket f_event callbacks will not need to lock the
> > > mutex themselves.
> > > 
> > > I had a diff that serialized socket klists using solock(). It did not
> > > work well because it increased lock contention, especially when using
> > > kqueue as backend for poll(2) and select(2). The diff is not even
> > > correct any longer since recent changes to socket locking have
> > > introduced new lock order constraints that conflict with it.
> > 
> > Updated diff below does that.  It also uses a single per-socket mutex as
> > suggested by bluhm@.
> > 
> > Note that as long poll(2) & select(2) use the current implementation a
> > KERNEL_LOCK()/UNLOCK() dance is necessary in sowakeup().  The goal of
> > this change combined with the poll/select rewrite is to get rid of this
> > dance.
> 
> Updated diff after recent commits, more comments?  Oks?

Previous diff had a double mtx_enter() in filt_fifowrite_common(), this
one use the *locked() version of sbspace() to prevent it.


Index: kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.264
diff -u -p -r1.264 uipc_socket.c
--- kern/uipc_socket.c  26 Jul 2021 05:51:13 -0000      1.264
+++ kern/uipc_socket.c  26 Jul 2021 07:20:58 -0000
@@ -84,7 +84,7 @@ int   filt_solistenprocess(struct knote *k
 int    filt_solisten_common(struct knote *kn, struct socket *so);
 
 const struct filterops solisten_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_sordetach,
        .f_event        = filt_solisten,
@@ -93,7 +93,7 @@ const struct filterops solisten_filtops 
 };
 
 const struct filterops soread_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_sordetach,
        .f_event        = filt_soread,
@@ -102,7 +102,7 @@ const struct filterops soread_filtops = 
 };
 
 const struct filterops sowrite_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_sowdetach,
        .f_event        = filt_sowrite,
@@ -111,7 +111,7 @@ const struct filterops sowrite_filtops =
 };
 
 const struct filterops soexcept_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_sordetach,
        .f_event        = filt_soread,
@@ -181,6 +181,9 @@ socreate(int dom, struct socket **aso, i
        so->so_egid = p->p_ucred->cr_gid;
        so->so_cpid = p->p_p->ps_pid;
        so->so_proto = prp;
+       mtx_init(&so->so_mtx, IPL_MPFLOOR);
+       klist_init_mutex(&so->so_snd.sb_sel.si_note, &so->so_mtx);
+       klist_init_mutex(&so->so_rcv.sb_sel.si_note, &so->so_mtx);
        so->so_snd.sb_timeo_nsecs = INFSLP;
        so->so_rcv.sb_timeo_nsecs = INFSLP;
 
@@ -276,7 +279,9 @@ sofree(struct socket *so, int s)
                }
        }
 #endif /* SOCKET_SPLICE */
+       mtx_enter(&so->so_mtx);
        sbrelease(so, &so->so_snd);
+       mtx_leave(&so->so_mtx);
        sorflush(so);
        sounlock(so, s);
 #ifdef SOCKET_SPLICE
@@ -1019,8 +1024,10 @@ dontblock:
                                        *mp = m_copym(m, 0, len, M_WAIT);
                                m->m_data += len;
                                m->m_len -= len;
+                               mtx_enter(&so->so_mtx);
                                so->so_rcv.sb_cc -= len;
                                so->so_rcv.sb_datacc -= len;
+                               mtx_leave(&so->so_mtx);
                        }
                }
                if (so->so_oobmark) {
@@ -1537,8 +1544,10 @@ somove(struct socket *so, int wait)
                        }
                        so->so_rcv.sb_mb->m_data += size;
                        so->so_rcv.sb_mb->m_len -= size;
+                       mtx_enter(&so->so_mtx);
                        so->so_rcv.sb_cc -= size;
                        so->so_rcv.sb_datacc -= size;
+                       mtx_leave(&so->so_mtx);
                } else {
                        *mp = so->so_rcv.sb_mb;
                        sbfree(so, &so->so_rcv, *mp);
@@ -1777,30 +1786,40 @@ sosetopt(struct socket *so, int level, i
                        case SO_SNDBUF:
                                if (so->so_state & SS_CANTSENDMORE)
                                        return (EINVAL);
+                               mtx_enter(&so->so_mtx);
                                if (sbcheckreserve(cnt, so->so_snd.sb_wat) ||
                                    sbreserve(so, &so->so_snd, cnt))
-                                       return (ENOBUFS);
-                               so->so_snd.sb_wat = cnt;
+                                       error = ENOBUFS;
+                               if (error == 0)
+                                       so->so_snd.sb_wat = cnt;
+                               mtx_leave(&so->so_mtx);
                                break;
 
                        case SO_RCVBUF:
                                if (so->so_state & SS_CANTRCVMORE)
                                        return (EINVAL);
+                               mtx_enter(&so->so_mtx);
                                if (sbcheckreserve(cnt, so->so_rcv.sb_wat) ||
                                    sbreserve(so, &so->so_rcv, cnt))
-                                       return (ENOBUFS);
-                               so->so_rcv.sb_wat = cnt;
+                                       error = ENOBUFS;
+                               if (error == 0)
+                                       so->so_rcv.sb_wat = cnt;
+                               mtx_leave(&so->so_mtx);
                                break;
 
                        case SO_SNDLOWAT:
+                               mtx_enter(&so->so_mtx);
                                so->so_snd.sb_lowat =
                                    (cnt > so->so_snd.sb_hiwat) ?
                                    so->so_snd.sb_hiwat : cnt;
+                               mtx_leave(&so->so_mtx);
                                break;
                        case SO_RCVLOWAT:
+                               mtx_leave(&so->so_mtx);
                                so->so_rcv.sb_lowat =
                                    (cnt > so->so_rcv.sb_hiwat) ?
                                    so->so_rcv.sb_hiwat : cnt;
+                               mtx_leave(&so->so_mtx);
                                break;
                        }
                        break;
@@ -2028,7 +2047,12 @@ void
 sohasoutofband(struct socket *so)
 {
        pgsigio(&so->so_sigio, SIGURG, 0);
+       /* XXX KERNEL_LOCK() needed because of legacy poll/select */
+       KERNEL_LOCK();
+       mtx_enter(&so->so_mtx);
        selwakeup(&so->so_rcv.sb_sel);
+       mtx_leave(&so->so_mtx);
+       KERNEL_UNLOCK();
 }
 
 int
@@ -2037,8 +2061,6 @@ soo_kqfilter(struct file *fp, struct kno
        struct socket *so = kn->kn_fp->f_data;
        struct sockbuf *sb;
 
-       KERNEL_ASSERT_LOCKED();
-
        switch (kn->kn_filter) {
        case EVFILT_READ:
                if (so->so_options & SO_ACCEPTCONN)
@@ -2059,7 +2081,7 @@ soo_kqfilter(struct file *fp, struct kno
                return (EINVAL);
        }
 
-       klist_insert_locked(&sb->sb_sel.si_note, kn);
+       klist_insert(&sb->sb_sel.si_note, kn);
 
        return (0);
 }
@@ -2069,17 +2091,16 @@ filt_sordetach(struct knote *kn)
 {
        struct socket *so = kn->kn_fp->f_data;
 
-       KERNEL_ASSERT_LOCKED();
-
-       klist_remove_locked(&so->so_rcv.sb_sel.si_note, kn);
+       klist_remove(&so->so_rcv.sb_sel.si_note, kn);
 }
 
 int
 filt_soread_common(struct knote *kn, struct socket *so)
 {
+       u_int sostate = so->so_state;
        int rv = 0;
 
-       soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        kn->kn_data = so->so_rcv.sb_cc;
 #ifdef SOCKET_SPLICE
@@ -2088,15 +2109,17 @@ filt_soread_common(struct knote *kn, str
        } else
 #endif /* SOCKET_SPLICE */
        if (kn->kn_sfflags & NOTE_OOB) {
-               if (so->so_oobmark || (so->so_state & SS_RCVATMARK)) {
+               u_long oobmark = so->so_oobmark;
+
+               if (oobmark || (sostate & SS_RCVATMARK)) {
                        kn->kn_fflags |= NOTE_OOB;
-                       kn->kn_data -= so->so_oobmark;
+                       kn->kn_data -= oobmark;
                        rv = 1;
                }
-       } else if (so->so_state & SS_CANTRCVMORE) {
+       } else if (sostate & SS_CANTRCVMORE) {
                kn->kn_flags |= EV_EOF;
                if (kn->kn_flags & __EV_POLL) {
-                       if (so->so_state & SS_ISDISCONNECTED)
+                       if (sostate & SS_ISDISCONNECTED)
                                kn->kn_flags |= __EV_HUP;
                }
                kn->kn_fflags = so->so_error;
@@ -2124,12 +2147,12 @@ int
 filt_soreadmodify(struct kevent *kev, struct knote *kn)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        knote_modify(kev, kn);
        rv = filt_soread_common(kn, so);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -2138,16 +2161,16 @@ int
 filt_soreadprocess(struct knote *kn, struct kevent *kev)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
                rv = 1;
        else
                rv = filt_soread_common(kn, so);
        if (rv != 0)
                knote_submit(kn, kev);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -2157,30 +2180,29 @@ filt_sowdetach(struct knote *kn)
 {
        struct socket *so = kn->kn_fp->f_data;
 
-       KERNEL_ASSERT_LOCKED();
-
-       klist_remove_locked(&so->so_snd.sb_sel.si_note, kn);
+       klist_remove(&so->so_snd.sb_sel.si_note, kn);
 }
 
 int
 filt_sowrite_common(struct knote *kn, struct socket *so)
 {
+       u_int sostate = so->so_state;
        int rv;
 
-       soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
-       kn->kn_data = sbspace(so, &so->so_snd);
-       if (so->so_state & SS_CANTSENDMORE) {
+       kn->kn_data = sbspace_locked(so, &so->so_snd);
+       if (sostate & SS_CANTSENDMORE) {
                kn->kn_flags |= EV_EOF;
                if (kn->kn_flags & __EV_POLL) {
-                       if (so->so_state & SS_ISDISCONNECTED)
+                       if (sostate & SS_ISDISCONNECTED)
                                kn->kn_flags |= __EV_HUP;
                }
                kn->kn_fflags = so->so_error;
                rv = 1;
        } else if (so->so_error) {      /* temporary udp error */
                rv = 1;
-       } else if (((so->so_state & SS_ISCONNECTED) == 0) &&
+       } else if (((sostate & SS_ISCONNECTED) == 0) &&
            (so->so_proto->pr_flags & PR_CONNREQUIRED)) {
                rv = 0;
        } else if (kn->kn_sfflags & NOTE_LOWAT) {
@@ -2204,12 +2226,12 @@ int
 filt_sowritemodify(struct kevent *kev, struct knote *kn)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        knote_modify(kev, kn);
        rv = filt_sowrite_common(kn, so);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -2218,16 +2240,16 @@ int
 filt_sowriteprocess(struct knote *kn, struct kevent *kev)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
                rv = 1;
        else
                rv = filt_sowrite_common(kn, so);
        if (rv != 0)
                knote_submit(kn, kev);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -2235,8 +2257,6 @@ filt_sowriteprocess(struct knote *kn, st
 int
 filt_solisten_common(struct knote *kn, struct socket *so)
 {
-       soassertlocked(so);
-
        kn->kn_data = so->so_qlen;
 
        return (kn->kn_data != 0);
@@ -2254,12 +2274,12 @@ int
 filt_solistenmodify(struct kevent *kev, struct knote *kn)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        knote_modify(kev, kn);
        rv = filt_solisten_common(kn, so);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -2268,16 +2288,16 @@ int
 filt_solistenprocess(struct knote *kn, struct kevent *kev)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
                rv = 1;
        else
                rv = filt_solisten_common(kn, so);
        if (rv != 0)
                knote_submit(kn, kev);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
Index: kern/uipc_socket2.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket2.c,v
retrieving revision 1.113
diff -u -p -r1.113 uipc_socket2.c
--- kern/uipc_socket2.c 26 Jul 2021 05:51:13 -0000      1.113
+++ kern/uipc_socket2.c 26 Jul 2021 07:20:58 -0000
@@ -34,7 +34,6 @@
 
 #include <sys/param.h>
 #include <sys/systm.h>
-#include <sys/malloc.h>
 #include <sys/mbuf.h>
 #include <sys/protosw.h>
 #include <sys/domain.h>
@@ -163,6 +162,9 @@ sonewconn(struct socket *head, int conns
        if (so == NULL)
                return (NULL);
        rw_init(&so->so_lock, "solock");
+       mtx_init(&so->so_mtx, IPL_MPFLOOR);
+       klist_init_mutex(&so->so_snd.sb_sel.si_note, &so->so_mtx);
+       klist_init_mutex(&so->so_rcv.sb_sel.si_note, &so->so_mtx);
        so->so_type = head->so_type;
        so->so_options = head->so_options &~ SO_ACCEPTCONN;
        so->so_linger = head->so_linger;
@@ -423,7 +425,12 @@ sowakeup(struct socket *so, struct sockb
        }
        if (sb->sb_flags & SB_ASYNC)
                pgsigio(&so->so_sigio, SIGIO, 0);
+       /* XXX KERNEL_LOCK() needed because of legacy poll/select */
+       KERNEL_LOCK();
+       mtx_enter(&so->so_mtx);
        selwakeup(&sb->sb_sel);
+       mtx_leave(&so->so_mtx);
+       KERNEL_UNLOCK();
 }
 
 /*
@@ -463,6 +470,7 @@ soreserve(struct socket *so, u_long sndc
 {
        soassertlocked(so);
 
+       mtx_enter(&so->so_mtx);
        if (sbreserve(so, &so->so_snd, sndcc))
                goto bad;
        if (sbreserve(so, &so->so_rcv, rcvcc))
@@ -475,10 +483,12 @@ soreserve(struct socket *so, u_long sndc
                so->so_snd.sb_lowat = MCLBYTES;
        if (so->so_snd.sb_lowat > so->so_snd.sb_hiwat)
                so->so_snd.sb_lowat = so->so_snd.sb_hiwat;
+       mtx_leave(&so->so_mtx);
        return (0);
 bad2:
        sbrelease(so, &so->so_snd);
 bad:
+       mtx_leave(&so->so_mtx);
        return (ENOBUFS);
 }
 
@@ -492,6 +502,7 @@ sbreserve(struct socket *so, struct sock
 {
        KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
        soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        if (cc == 0 || cc > sb_max)
                return (1);
@@ -533,6 +544,7 @@ sbchecklowmem(void)
 void
 sbrelease(struct socket *so, struct sockbuf *sb)
 {
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        sbflush(so, sb);
        sb->sb_hiwat = sb->sb_mbmax = 0;
@@ -686,6 +698,7 @@ sbcheck(struct socket *so, struct sockbu
        struct mbuf *m, *n;
        u_long len = 0, mbcnt = 0;
 
+       mtx_enter(&so->so_mtx);
        for (m = sb->sb_mb; m; m = m->m_nextpkt) {
                for (n = m; n; n = n->m_next) {
                        len += n->m_len;
@@ -701,6 +714,7 @@ sbcheck(struct socket *so, struct sockbu
                    mbcnt, sb->sb_mbcnt);
                panic("sbcheck");
        }
+       mtx_leave(&so->so_mtx);
 }
 #endif
 
@@ -860,9 +874,11 @@ sbcompress(struct socket *so, struct soc
                        memcpy(mtod(n, caddr_t) + n->m_len, mtod(m, caddr_t),
                            m->m_len);
                        n->m_len += m->m_len;
+                       mtx_enter(&so->so_mtx);
                        sb->sb_cc += m->m_len;
                        if (m->m_type != MT_CONTROL && m->m_type != MT_SONAME)
                                sb->sb_datacc += m->m_len;
+                       mtx_leave(&so->so_mtx);
                        m = m_free(m);
                        continue;
                }
@@ -895,6 +911,7 @@ sbflush(struct socket *so, struct sockbu
 {
        KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
        KASSERT((sb->sb_flags & SB_LOCK) == 0);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        while (sb->sb_mbcnt)
                sbdrop(so, sb, (int)sb->sb_cc);
@@ -917,6 +934,7 @@ sbdrop(struct socket *so, struct sockbuf
 
        KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
        soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        next = (m = sb->sb_mb) ? m->m_nextpkt : NULL;
        while (len > 0) {
@@ -936,12 +954,12 @@ sbdrop(struct socket *so, struct sockbuf
                        break;
                }
                len -= m->m_len;
-               sbfree(so, sb, m);
+               sbfree_locked(so, sb, m);
                mn = m_free(m);
                m = mn;
        }
        while (m && m->m_len == 0) {
-               sbfree(so, sb, m);
+               sbfree_locked(so, sb, m);
                mn = m_free(m);
                m = mn;
        }
Index: kern/uipc_syscalls.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_syscalls.c,v
retrieving revision 1.193
diff -u -p -r1.193 uipc_syscalls.c
--- kern/uipc_syscalls.c        2 Jul 2021 12:17:41 -0000       1.193
+++ kern/uipc_syscalls.c        26 Jul 2021 07:20:59 -0000
@@ -308,7 +308,9 @@ doaccept(struct proc *p, int sock, struc
            : (flags & SOCK_NONBLOCK ? FNONBLOCK : 0);
 
        /* connection has been removed from the listen queue */
+       mtx_enter(&head->so_mtx);
        KNOTE(&head->so_rcv.sb_sel.si_note, 0);
+       mtx_leave(&head->so_mtx);
 
        fp->f_type = DTYPE_SOCKET;
        fp->f_flag = FREAD | FWRITE | nflag;
Index: kern/uipc_usrreq.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_usrreq.c,v
retrieving revision 1.148
diff -u -p -r1.148 uipc_usrreq.c
--- kern/uipc_usrreq.c  25 May 2021 22:45:09 -0000      1.148
+++ kern/uipc_usrreq.c  26 Jul 2021 07:20:59 -0000
@@ -208,8 +208,10 @@ uipc_usrreq(struct socket *so, int req, 
                         * Adjust backpressure on sender
                         * and wakeup any waiting to write.
                         */
+                       mtx_enter(&so2->so_mtx);
                        so2->so_snd.sb_mbcnt = so->so_rcv.sb_mbcnt;
                        so2->so_snd.sb_cc = so->so_rcv.sb_cc;
+                       mtx_leave(&so2->so_mtx);
                        sowwakeup(so2);
                        break;
 
@@ -284,8 +286,10 @@ uipc_usrreq(struct socket *so, int req, 
                                sbappendrecord(so2, &so2->so_rcv, m);
                        else
                                sbappend(so2, &so2->so_rcv, m);
+                       mtx_enter(&so->so_mtx);
                        so->so_snd.sb_mbcnt = so2->so_rcv.sb_mbcnt;
                        so->so_snd.sb_cc = so2->so_rcv.sb_cc;
+                       mtx_leave(&so->so_mtx);
                        if (so2->so_rcv.sb_cc > 0)
                                sorwakeup(so2);
                        m = NULL;
@@ -736,12 +740,16 @@ unp_disconnect(struct unpcb *unp)
 
        case SOCK_STREAM:
        case SOCK_SEQPACKET:
+               mtx_enter(&unp->unp_socket->so_mtx);
                unp->unp_socket->so_snd.sb_mbcnt = 0;
                unp->unp_socket->so_snd.sb_cc = 0;
+               mtx_leave(&unp->unp_socket->so_mtx);
                soisdisconnected(unp->unp_socket);
                unp2->unp_conn = NULL;
+               mtx_enter(&unp2->unp_socket->so_mtx);
                unp2->unp_socket->so_snd.sb_mbcnt = 0;
                unp2->unp_socket->so_snd.sb_cc = 0;
+               mtx_leave(&unp2->unp_socket->so_mtx);
                soisdisconnected(unp2->unp_socket);
                break;
        }
Index: miscfs/fifofs/fifo_vnops.c
===================================================================
RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v
retrieving revision 1.80
diff -u -p -r1.80 fifo_vnops.c
--- miscfs/fifofs/fifo_vnops.c  13 Jul 2021 07:37:50 -0000      1.80
+++ miscfs/fifofs/fifo_vnops.c  26 Jul 2021 07:20:59 -0000
@@ -114,7 +114,7 @@ int filt_fifowriteprocess(struct knote *
 int    filt_fifowrite_common(struct knote *kn, struct socket *so);
 
 const struct filterops fiforead_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_fifordetach,
        .f_event        = filt_fiforead,
@@ -123,7 +123,7 @@ const struct filterops fiforead_filtops 
 };
 
 const struct filterops fifowrite_filtops = {
-       .f_flags        = FILTEROP_ISFD,
+       .f_flags        = FILTEROP_ISFD | FILTEROP_MPSAFE,
        .f_attach       = NULL,
        .f_detach       = filt_fifowdetach,
        .f_event        = filt_fifowrite,
@@ -542,7 +542,7 @@ fifo_kqfilter(void *v)
 
        ap->a_kn->kn_hook = so;
 
-       klist_insert_locked(&sb->sb_sel.si_note, ap->a_kn);
+       klist_insert(&sb->sb_sel.si_note, ap->a_kn);
 
        return (0);
 }
@@ -552,7 +552,7 @@ filt_fifordetach(struct knote *kn)
 {
        struct socket *so = (struct socket *)kn->kn_hook;
 
-       klist_remove_locked(&so->so_rcv.sb_sel.si_note, kn);
+       klist_remove(&so->so_rcv.sb_sel.si_note, kn);
 }
 
 int
@@ -560,7 +560,7 @@ filt_fiforead_common(struct knote *kn, s
 {
        int rv;
 
-       soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
        kn->kn_data = so->so_rcv.sb_cc;
        if (so->so_state & SS_CANTRCVMORE) {
@@ -590,12 +590,12 @@ int
 filt_fiforeadmodify(struct kevent *kev, struct knote *kn)
 {
        struct socket *so = kn->kn_hook;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        knote_modify(kev, kn);
        rv = filt_fiforead_common(kn, so);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -604,16 +604,16 @@ int
 filt_fiforeadprocess(struct knote *kn, struct kevent *kev)
 {
        struct socket *so = kn->kn_hook;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
                rv = 1;
        else
                rv = filt_fiforead_common(kn, so);
        if (rv != 0)
                knote_submit(kn, kev);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -623,7 +623,7 @@ filt_fifowdetach(struct knote *kn)
 {
        struct socket *so = (struct socket *)kn->kn_hook;
 
-       klist_remove_locked(&so->so_snd.sb_sel.si_note, kn);
+       klist_remove(&so->so_snd.sb_sel.si_note, kn);
 }
 
 int
@@ -631,9 +631,9 @@ filt_fifowrite_common(struct knote *kn, 
 {
        int rv;
 
-       soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
 
-       kn->kn_data = sbspace(so, &so->so_snd);
+       kn->kn_data = sbspace_locked(so, &so->so_snd);
        if (so->so_state & SS_CANTSENDMORE) {
                kn->kn_flags |= EV_EOF;
                rv = 1;
@@ -657,12 +657,12 @@ int
 filt_fifowritemodify(struct kevent *kev, struct knote *kn)
 {
        struct socket *so = kn->kn_hook;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        knote_modify(kev, kn);
        rv = filt_fifowrite_common(kn, so);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
@@ -671,16 +671,16 @@ int
 filt_fifowriteprocess(struct knote *kn, struct kevent *kev)
 {
        struct socket *so = kn->kn_hook;
-       int rv, s;
+       int rv;
 
-       s = solock(so);
+       mtx_enter(&so->so_mtx);
        if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
                rv = 1;
        else
                rv = filt_fifowrite_common(kn, so);
        if (rv != 0)
                knote_submit(kn, kev);
-       sounlock(so, s);
+       mtx_leave(&so->so_mtx);
 
        return (rv);
 }
Index: netinet/tcp_input.c
===================================================================
RCS file: /cvs/src/sys/netinet/tcp_input.c,v
retrieving revision 1.368
diff -u -p -r1.368 tcp_input.c
--- netinet/tcp_input.c 16 Apr 2021 12:08:25 -0000      1.368
+++ netinet/tcp_input.c 26 Jul 2021 07:21:00 -0000
@@ -946,7 +946,9 @@ findpcb:
                                tcpstat_pkt(tcps_rcvackpack, tcps_rcvackbyte,
                                    acked);
                                ND6_HINT(tp);
+                               mtx_enter(&so->so_mtx);
                                sbdrop(so, &so->so_snd, acked);
+                               mtx_leave(&so->so_mtx);
 
                                /*
                                 * If we had a pending ICMP message that
@@ -1714,6 +1716,7 @@ trimthenstep6:
                            TCP_MAXWIN << tp->snd_scale);
                }
                ND6_HINT(tp);
+               mtx_enter(&so->so_mtx);
                if (acked > so->so_snd.sb_cc) {
                        if (tp->snd_wnd > so->so_snd.sb_cc)
                                tp->snd_wnd -= so->so_snd.sb_cc;
@@ -1729,6 +1732,7 @@ trimthenstep6:
                                tp->snd_wnd = 0;
                        ourfinisacked = 0;
                }
+               mtx_leave(&so->so_mtx);
 
                tcp_update_sndspace(tp);
                if (sb_notify(so, &so->so_snd)) {
@@ -2967,7 +2971,9 @@ tcp_mss_update(struct tcpcb *tp)
                bufsize = roundup(bufsize, mss);
                if (bufsize > sb_max)
                        bufsize = sb_max;
+               mtx_enter(&so->so_mtx);
                (void)sbreserve(so, &so->so_snd, bufsize);
+               mtx_leave(&so->so_mtx);
        }
 
        bufsize = so->so_rcv.sb_hiwat;
@@ -2975,7 +2981,9 @@ tcp_mss_update(struct tcpcb *tp)
                bufsize = roundup(bufsize, mss);
                if (bufsize > sb_max)
                        bufsize = sb_max;
+               mtx_enter(&so->so_mtx);
                (void)sbreserve(so, &so->so_rcv, bufsize);
+               mtx_leave(&so->so_mtx);
        }
 
 }
Index: netinet/tcp_usrreq.c
===================================================================
RCS file: /cvs/src/sys/netinet/tcp_usrreq.c,v
retrieving revision 1.181
diff -u -p -r1.181 tcp_usrreq.c
--- netinet/tcp_usrreq.c        30 Apr 2021 13:52:48 -0000      1.181
+++ netinet/tcp_usrreq.c        26 Jul 2021 07:21:00 -0000
@@ -688,7 +688,9 @@ tcp_disconnect(struct tcpcb *tp)
                tp = tcp_drop(tp, 0);
        else {
                soisdisconnecting(so);
+               mtx_enter(&so->so_mtx);
                sbflush(so, &so->so_rcv);
+               mtx_leave(&so->so_mtx);
                tp = tcp_usrclosed(tp);
                if (tp)
                        (void) tcp_output(tp);
@@ -1115,6 +1117,7 @@ tcp_update_sndspace(struct tcpcb *tp)
        struct socket *so = tp->t_inpcb->inp_socket;
        u_long nmax = so->so_snd.sb_hiwat;
 
+       mtx_enter(&so->so_mtx);
        if (sbchecklowmem()) {
                /* low on memory try to get rid of some */
                if (tcp_sendspace < nmax)
@@ -1128,7 +1131,7 @@ tcp_update_sndspace(struct tcpcb *tp)
                    tp->snd_una);
 
        /* a writable socket must be preserved because of poll(2) semantics */
-       if (sbspace(so, &so->so_snd) >= so->so_snd.sb_lowat) {
+       if (sbspace_locked(so, &so->so_snd) >= so->so_snd.sb_lowat) {
                if (nmax < so->so_snd.sb_cc + so->so_snd.sb_lowat)
                        nmax = so->so_snd.sb_cc + so->so_snd.sb_lowat;
                /* keep in sync with sbreserve() calculation */
@@ -1141,6 +1144,7 @@ tcp_update_sndspace(struct tcpcb *tp)
 
        if (nmax != so->so_snd.sb_hiwat)
                sbreserve(so, &so->so_snd, nmax);
+       mtx_leave(&so->so_mtx);
 }
 
 /*
@@ -1179,5 +1183,7 @@ tcp_update_rcvspace(struct tcpcb *tp)
 
        /* round to MSS boundary */
        nmax = roundup(nmax, tp->t_maxseg);
+       mtx_enter(&so->so_mtx);
        sbreserve(so, &so->so_rcv, nmax);
+       mtx_leave(&so->so_mtx);
 }
Index: sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
retrieving revision 1.100
diff -u -p -r1.100 socketvar.h
--- sys/socketvar.h     26 Jul 2021 05:51:13 -0000      1.100
+++ sys/socketvar.h     26 Jul 2021 07:21:00 -0000
@@ -33,10 +33,12 @@
  */
 
 #include <sys/selinfo.h>                       /* for struct selinfo */
+#include <sys/systm.h>                         /* panicstr for MUTEX_ASSERT */
 #include <sys/queue.h>
 #include <sys/sigio.h>                         /* for struct sigio_ref */
 #include <sys/task.h>
 #include <sys/timeout.h>
+#include <sys/mutex.h>
 #include <sys/rwlock.h>
 
 #ifndef        _SOCKLEN_T_DEFINED_
@@ -51,10 +53,15 @@ TAILQ_HEAD(soqhead, socket);
  * Contains send and receive buffer queues,
  * handle on protocol and pointer to protocol
  * private data and error information.
+ *
+ * Locks used to protect struct members in this file:
+ *     s       this socket solock
+ *     m       this socket `so_mtx'
  */
 struct socket {
        const struct protosw *so_proto; /* protocol handle */
        struct rwlock so_lock;          /* this socket lock */
+       struct mutex so_mtx;
        void    *so_pcb;                /* protocol control block */
        u_int   so_state;               /* internal state flags SS_*, below */
        short   so_type;                /* generic type, see socket.h */
@@ -101,13 +108,13 @@ struct socket {
        struct  sockbuf {
 /* The following fields are all zeroed on flush. */
 #define        sb_startzero    sb_cc
-               u_long  sb_cc;          /* actual chars in buffer */
-               u_long  sb_datacc;      /* data only chars in buffer */
-               u_long  sb_hiwat;       /* max actual char count */
-               u_long  sb_wat;         /* default watermark */
-               u_long  sb_mbcnt;       /* chars of mbufs used */
-               u_long  sb_mbmax;       /* max chars of mbufs to use */
-               long    sb_lowat;       /* low water mark */
+               u_long  sb_cc;          /* [s|m] actual chars in buffer */
+               u_long  sb_datacc;      /* [s|m] data only chars in buffer */
+               u_long  sb_hiwat;       /* [s|m] max actual char count */
+               u_long  sb_wat;         /* [s|m] default watermark */
+               u_long  sb_mbcnt;       /* [s|m] chars of mbufs used */
+               u_long  sb_mbmax;       /* [s|m] max chars of mbufs to use */
+               long    sb_lowat;       /* [s|m] low water mark */
                struct mbuf *sb_mb;     /* the mbuf chain */
                struct mbuf *sb_mbtail; /* the last mbuf in the chain */
                struct mbuf *sb_lastrecord;/* first mbuf of last record in
@@ -189,13 +196,27 @@ sb_notify(struct socket *so, struct sock
  * overflow and return 0.
  */
 static inline long
-sbspace(struct socket *so, struct sockbuf *sb)
+sbspace_locked(struct socket *so, struct sockbuf *sb)
 {
        KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
-       soassertlocked(so);
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);
+
        return lmin(sb->sb_hiwat - sb->sb_cc, sb->sb_mbmax - sb->sb_mbcnt);
 }
 
+static inline long
+sbspace(struct socket *so, struct sockbuf *sb)
+{
+       long space;
+
+       mtx_enter(&so->so_mtx);
+       space = sbspace_locked(so, sb);
+       mtx_leave(&so->so_mtx);
+
+       return space;
+}
+
+
 /* do we have to send all at once on a socket? */
 #define        sosendallatonce(so) \
     ((so)->so_proto->pr_flags & PR_ATOMIC)
@@ -224,22 +245,31 @@ soreadable(struct socket *so)
 
 /* adjust counters in sb reflecting allocation of m */
 #define        sballoc(so, sb, m) do {                                         
\
+       mtx_enter(&(so)->so_mtx);                                       \
        (sb)->sb_cc += (m)->m_len;                                      \
        if ((m)->m_type != MT_CONTROL && (m)->m_type != MT_SONAME)      \
                (sb)->sb_datacc += (m)->m_len;                          \
        (sb)->sb_mbcnt += MSIZE;                                        \
        if ((m)->m_flags & M_EXT)                                       \
                (sb)->sb_mbcnt += (m)->m_ext.ext_size;                  \
+       mtx_leave(&(so)->so_mtx);                                       \
 } while (/* CONSTCOND */ 0)
 
 /* adjust counters in sb reflecting freeing of m */
-#define        sbfree(so, sb, m) do {                                          
\
+#define        sbfree_locked(so, sb, m) do {                                   
\
+       MUTEX_ASSERT_LOCKED(&so->so_mtx);                               \
        (sb)->sb_cc -= (m)->m_len;                                      \
        if ((m)->m_type != MT_CONTROL && (m)->m_type != MT_SONAME)      \
                (sb)->sb_datacc -= (m)->m_len;                          \
        (sb)->sb_mbcnt -= MSIZE;                                        \
        if ((m)->m_flags & M_EXT)                                       \
                (sb)->sb_mbcnt -= (m)->m_ext.ext_size;                  \
+} while (/* CONSTCOND */ 0)
+
+#define        sbfree(so, sb, m) do {                                          
\
+       mtx_enter(&(so)->so_mtx);                                       \
+       sbfree_locked((so), (sb), (m));                                 \
+       mtx_leave(&(so)->so_mtx);                                       \
 } while (/* CONSTCOND */ 0)
 
 /*

Reply via email to