On Thu, Jul 29, 2021 at 09:51:43AM +0200, Martin Pieuchot wrote:
> On 26/07/21(Mon) 09:23, Martin Pieuchot wrote:
> > On 26/07/21(Mon) 08:55, Martin Pieuchot wrote:
> > > On 21/07/21(Wed) 10:18, Martin Pieuchot wrote:
> > > > On 11/07/21(Sun) 14:45, Visa Hankala wrote:
> > > > > On Sat, Jul 10, 2021 at 05:26:57PM +0200, Martin Pieuchot wrote:
> > > > > > One of the reasons for the drop of performances in the kqueue-based
> > > > > > poll/select is the fact that kqueue filters are called up to 3 times
> > > > > > per syscall and that they all spin on the NET_LOCK() for TCP/UDP
> > > > > > packets.
> > > > > >
> > > > > > Diff below is a RFC for improving the situation.
> > > > > >
> > > > > > socket kqueue filters mainly check for the amount of available
> > > > > > items to
> > > > > > read/write. This involves comparing various socket buffer fields
> > > > > > (sb_cc,
> > > > > > sb_lowat, etc). The diff below introduces a new mutex to serialize
> > > > > > updates of those fields with reads in the kqueue filters.
> > > > > >
> > > > > > Since these fields are always modified with the socket lock held,
> > > > > > either
> > > > > > the mutex or the solock are enough to have a coherent view of them.
> > > > > > Note that either of these locks is necessary only if multiple fields
> > > > > > have to be read (like in sbspace()).
> > > > > >
> > > > > > Other per-socket fields accessed in the kqueue filters are never
> > > > > > combined (with &&) to determine a condition. So assuming it is
> > > > > > fine to
> > > > > > read register-sized fields w/o the socket lock we can safely remove
> > > > > > it
> > > > > > there.
> > > > > >
> > > > > > Could such mutex also be used to serialize klist updates?
> > > > >
> > > > > I think the lock should be such that it can serialize socket klists.
> > > > >
> > > > > As the main motivator for this change is kqueue, the viability of
> > > > > using
> > > > > the mutex for the klist locking should be checked now. The mutex has
> > > > > to
> > > > > be held whenever calling KNOTE() on sb_sel.si_note, or selwakeup() on
> > > > > sb_sel. Then the socket f_event callbacks will not need to lock the
> > > > > mutex themselves.
> > > > >
> > > > > I had a diff that serialized socket klists using solock(). It did not
> > > > > work well because it increased lock contention, especially when using
> > > > > kqueue as backend for poll(2) and select(2). The diff is not even
> > > > > correct any longer since recent changes to socket locking have
> > > > > introduced new lock order constraints that conflict with it.
> > > >
> > > > Updated diff below does that. It also uses a single per-socket mutex as
> > > > suggested by bluhm@.
> > > >
> > > > Note that as long poll(2) & select(2) use the current implementation a
> > > > KERNEL_LOCK()/UNLOCK() dance is necessary in sowakeup(). The goal of
> > > > this change combined with the poll/select rewrite is to get rid of this
> > > > dance.
> > >
> > > Updated diff after recent commits, more comments? Oks?
> >
> > Previous diff had a double mtx_enter() in filt_fifowrite_common(), this
> > one use the *locked() version of sbspace() to prevent it.
>
> New diff fixing a locking dance pointed out by visa@.
I think the diff is fine. It does show a few places that should be
improved on. e.g. some of those macros in socketvar.h should be
implemented as functions or inline functions or the fact that high level
code (e.g. TCP) is directly manipulating socket buffer internals.
I'm not a big fan of either-this-lock-or-that-lock locking schemes. They
are just confusing. I guess in the long run the mutex should be used all
the time.
Maybe it is best to commit it and look for any fallout.
> Index: kern/uipc_socket.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_socket.c,v
> retrieving revision 1.264
> diff -u -p -r1.264 uipc_socket.c
> --- kern/uipc_socket.c 26 Jul 2021 05:51:13 -0000 1.264
> +++ kern/uipc_socket.c 29 Jul 2021 07:31:32 -0000
> @@ -84,7 +84,7 @@ int filt_solistenprocess(struct knote *k
> int filt_solisten_common(struct knote *kn, struct socket *so);
>
> const struct filterops solisten_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_sordetach,
> .f_event = filt_solisten,
> @@ -93,7 +93,7 @@ const struct filterops solisten_filtops
> };
>
> const struct filterops soread_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_sordetach,
> .f_event = filt_soread,
> @@ -102,7 +102,7 @@ const struct filterops soread_filtops =
> };
>
> const struct filterops sowrite_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_sowdetach,
> .f_event = filt_sowrite,
> @@ -111,7 +111,7 @@ const struct filterops sowrite_filtops =
> };
>
> const struct filterops soexcept_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_sordetach,
> .f_event = filt_soread,
> @@ -181,6 +181,9 @@ socreate(int dom, struct socket **aso, i
> so->so_egid = p->p_ucred->cr_gid;
> so->so_cpid = p->p_p->ps_pid;
> so->so_proto = prp;
> + mtx_init(&so->so_mtx, IPL_MPFLOOR);
> + klist_init_mutex(&so->so_snd.sb_sel.si_note, &so->so_mtx);
> + klist_init_mutex(&so->so_rcv.sb_sel.si_note, &so->so_mtx);
> so->so_snd.sb_timeo_nsecs = INFSLP;
> so->so_rcv.sb_timeo_nsecs = INFSLP;
>
> @@ -276,7 +279,9 @@ sofree(struct socket *so, int s)
> }
> }
> #endif /* SOCKET_SPLICE */
> + mtx_enter(&so->so_mtx);
> sbrelease(so, &so->so_snd);
> + mtx_leave(&so->so_mtx);
> sorflush(so);
> sounlock(so, s);
> #ifdef SOCKET_SPLICE
> @@ -1019,8 +1024,10 @@ dontblock:
> *mp = m_copym(m, 0, len, M_WAIT);
> m->m_data += len;
> m->m_len -= len;
> + mtx_enter(&so->so_mtx);
> so->so_rcv.sb_cc -= len;
> so->so_rcv.sb_datacc -= len;
> + mtx_leave(&so->so_mtx);
> }
> }
> if (so->so_oobmark) {
> @@ -1537,8 +1544,10 @@ somove(struct socket *so, int wait)
> }
> so->so_rcv.sb_mb->m_data += size;
> so->so_rcv.sb_mb->m_len -= size;
> + mtx_enter(&so->so_mtx);
> so->so_rcv.sb_cc -= size;
> so->so_rcv.sb_datacc -= size;
> + mtx_leave(&so->so_mtx);
> } else {
> *mp = so->so_rcv.sb_mb;
> sbfree(so, &so->so_rcv, *mp);
> @@ -1777,30 +1786,40 @@ sosetopt(struct socket *so, int level, i
> case SO_SNDBUF:
> if (so->so_state & SS_CANTSENDMORE)
> return (EINVAL);
> + mtx_enter(&so->so_mtx);
> if (sbcheckreserve(cnt, so->so_snd.sb_wat) ||
> sbreserve(so, &so->so_snd, cnt))
> - return (ENOBUFS);
> - so->so_snd.sb_wat = cnt;
> + error = ENOBUFS;
> + if (error == 0)
> + so->so_snd.sb_wat = cnt;
> + mtx_leave(&so->so_mtx);
> break;
>
> case SO_RCVBUF:
> if (so->so_state & SS_CANTRCVMORE)
> return (EINVAL);
> + mtx_enter(&so->so_mtx);
> if (sbcheckreserve(cnt, so->so_rcv.sb_wat) ||
> sbreserve(so, &so->so_rcv, cnt))
> - return (ENOBUFS);
> - so->so_rcv.sb_wat = cnt;
> + error = ENOBUFS;
> + if (error == 0)
> + so->so_rcv.sb_wat = cnt;
> + mtx_leave(&so->so_mtx);
> break;
>
> case SO_SNDLOWAT:
> + mtx_enter(&so->so_mtx);
> so->so_snd.sb_lowat =
> (cnt > so->so_snd.sb_hiwat) ?
> so->so_snd.sb_hiwat : cnt;
> + mtx_leave(&so->so_mtx);
> break;
> case SO_RCVLOWAT:
> + mtx_enter(&so->so_mtx);
> so->so_rcv.sb_lowat =
> (cnt > so->so_rcv.sb_hiwat) ?
> so->so_rcv.sb_hiwat : cnt;
> + mtx_leave(&so->so_mtx);
> break;
> }
> break;
> @@ -2028,7 +2047,12 @@ void
> sohasoutofband(struct socket *so)
> {
> pgsigio(&so->so_sigio, SIGURG, 0);
> + /* XXX KERNEL_LOCK() needed because of legacy poll/select */
> + KERNEL_LOCK();
> + mtx_enter(&so->so_mtx);
> selwakeup(&so->so_rcv.sb_sel);
> + mtx_leave(&so->so_mtx);
> + KERNEL_UNLOCK();
> }
>
> int
> @@ -2037,8 +2061,6 @@ soo_kqfilter(struct file *fp, struct kno
> struct socket *so = kn->kn_fp->f_data;
> struct sockbuf *sb;
>
> - KERNEL_ASSERT_LOCKED();
> -
> switch (kn->kn_filter) {
> case EVFILT_READ:
> if (so->so_options & SO_ACCEPTCONN)
> @@ -2059,7 +2081,7 @@ soo_kqfilter(struct file *fp, struct kno
> return (EINVAL);
> }
>
> - klist_insert_locked(&sb->sb_sel.si_note, kn);
> + klist_insert(&sb->sb_sel.si_note, kn);
>
> return (0);
> }
> @@ -2069,17 +2091,16 @@ filt_sordetach(struct knote *kn)
> {
> struct socket *so = kn->kn_fp->f_data;
>
> - KERNEL_ASSERT_LOCKED();
> -
> - klist_remove_locked(&so->so_rcv.sb_sel.si_note, kn);
> + klist_remove(&so->so_rcv.sb_sel.si_note, kn);
> }
>
> int
> filt_soread_common(struct knote *kn, struct socket *so)
> {
> + u_int sostate = so->so_state;
> int rv = 0;
>
> - soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> kn->kn_data = so->so_rcv.sb_cc;
> #ifdef SOCKET_SPLICE
> @@ -2088,15 +2109,17 @@ filt_soread_common(struct knote *kn, str
> } else
> #endif /* SOCKET_SPLICE */
> if (kn->kn_sfflags & NOTE_OOB) {
> - if (so->so_oobmark || (so->so_state & SS_RCVATMARK)) {
> + u_long oobmark = so->so_oobmark;
> +
> + if (oobmark || (sostate & SS_RCVATMARK)) {
> kn->kn_fflags |= NOTE_OOB;
> - kn->kn_data -= so->so_oobmark;
> + kn->kn_data -= oobmark;
> rv = 1;
> }
> - } else if (so->so_state & SS_CANTRCVMORE) {
> + } else if (sostate & SS_CANTRCVMORE) {
> kn->kn_flags |= EV_EOF;
> if (kn->kn_flags & __EV_POLL) {
> - if (so->so_state & SS_ISDISCONNECTED)
> + if (sostate & SS_ISDISCONNECTED)
> kn->kn_flags |= __EV_HUP;
> }
> kn->kn_fflags = so->so_error;
> @@ -2124,12 +2147,12 @@ int
> filt_soreadmodify(struct kevent *kev, struct knote *kn)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> knote_modify(kev, kn);
> rv = filt_soread_common(kn, so);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -2138,16 +2161,16 @@ int
> filt_soreadprocess(struct knote *kn, struct kevent *kev)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
> rv = 1;
> else
> rv = filt_soread_common(kn, so);
> if (rv != 0)
> knote_submit(kn, kev);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -2157,30 +2180,29 @@ filt_sowdetach(struct knote *kn)
> {
> struct socket *so = kn->kn_fp->f_data;
>
> - KERNEL_ASSERT_LOCKED();
> -
> - klist_remove_locked(&so->so_snd.sb_sel.si_note, kn);
> + klist_remove(&so->so_snd.sb_sel.si_note, kn);
> }
>
> int
> filt_sowrite_common(struct knote *kn, struct socket *so)
> {
> + u_int sostate = so->so_state;
> int rv;
>
> - soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> - kn->kn_data = sbspace(so, &so->so_snd);
> - if (so->so_state & SS_CANTSENDMORE) {
> + kn->kn_data = sbspace_locked(so, &so->so_snd);
> + if (sostate & SS_CANTSENDMORE) {
> kn->kn_flags |= EV_EOF;
> if (kn->kn_flags & __EV_POLL) {
> - if (so->so_state & SS_ISDISCONNECTED)
> + if (sostate & SS_ISDISCONNECTED)
> kn->kn_flags |= __EV_HUP;
> }
> kn->kn_fflags = so->so_error;
> rv = 1;
> } else if (so->so_error) { /* temporary udp error */
> rv = 1;
> - } else if (((so->so_state & SS_ISCONNECTED) == 0) &&
> + } else if (((sostate & SS_ISCONNECTED) == 0) &&
> (so->so_proto->pr_flags & PR_CONNREQUIRED)) {
> rv = 0;
> } else if (kn->kn_sfflags & NOTE_LOWAT) {
> @@ -2204,12 +2226,12 @@ int
> filt_sowritemodify(struct kevent *kev, struct knote *kn)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> knote_modify(kev, kn);
> rv = filt_sowrite_common(kn, so);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -2218,16 +2240,16 @@ int
> filt_sowriteprocess(struct knote *kn, struct kevent *kev)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
> rv = 1;
> else
> rv = filt_sowrite_common(kn, so);
> if (rv != 0)
> knote_submit(kn, kev);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -2235,8 +2257,6 @@ filt_sowriteprocess(struct knote *kn, st
> int
> filt_solisten_common(struct knote *kn, struct socket *so)
> {
> - soassertlocked(so);
> -
> kn->kn_data = so->so_qlen;
>
> return (kn->kn_data != 0);
> @@ -2254,12 +2274,12 @@ int
> filt_solistenmodify(struct kevent *kev, struct knote *kn)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> knote_modify(kev, kn);
> rv = filt_solisten_common(kn, so);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -2268,16 +2288,16 @@ int
> filt_solistenprocess(struct knote *kn, struct kevent *kev)
> {
> struct socket *so = kn->kn_fp->f_data;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
> rv = 1;
> else
> rv = filt_solisten_common(kn, so);
> if (rv != 0)
> knote_submit(kn, kev);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> Index: kern/uipc_socket2.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_socket2.c,v
> retrieving revision 1.113
> diff -u -p -r1.113 uipc_socket2.c
> --- kern/uipc_socket2.c 26 Jul 2021 05:51:13 -0000 1.113
> +++ kern/uipc_socket2.c 26 Jul 2021 05:52:29 -0000
> @@ -34,7 +34,6 @@
>
> #include <sys/param.h>
> #include <sys/systm.h>
> -#include <sys/malloc.h>
> #include <sys/mbuf.h>
> #include <sys/protosw.h>
> #include <sys/domain.h>
> @@ -163,6 +162,9 @@ sonewconn(struct socket *head, int conns
> if (so == NULL)
> return (NULL);
> rw_init(&so->so_lock, "solock");
> + mtx_init(&so->so_mtx, IPL_MPFLOOR);
> + klist_init_mutex(&so->so_snd.sb_sel.si_note, &so->so_mtx);
> + klist_init_mutex(&so->so_rcv.sb_sel.si_note, &so->so_mtx);
> so->so_type = head->so_type;
> so->so_options = head->so_options &~ SO_ACCEPTCONN;
> so->so_linger = head->so_linger;
> @@ -423,7 +425,12 @@ sowakeup(struct socket *so, struct sockb
> }
> if (sb->sb_flags & SB_ASYNC)
> pgsigio(&so->so_sigio, SIGIO, 0);
> + /* XXX KERNEL_LOCK() needed because of legacy poll/select */
> + KERNEL_LOCK();
> + mtx_enter(&so->so_mtx);
> selwakeup(&sb->sb_sel);
> + mtx_leave(&so->so_mtx);
> + KERNEL_UNLOCK();
> }
>
> /*
> @@ -463,6 +470,7 @@ soreserve(struct socket *so, u_long sndc
> {
> soassertlocked(so);
>
> + mtx_enter(&so->so_mtx);
> if (sbreserve(so, &so->so_snd, sndcc))
> goto bad;
> if (sbreserve(so, &so->so_rcv, rcvcc))
> @@ -475,10 +483,12 @@ soreserve(struct socket *so, u_long sndc
> so->so_snd.sb_lowat = MCLBYTES;
> if (so->so_snd.sb_lowat > so->so_snd.sb_hiwat)
> so->so_snd.sb_lowat = so->so_snd.sb_hiwat;
> + mtx_leave(&so->so_mtx);
> return (0);
> bad2:
> sbrelease(so, &so->so_snd);
> bad:
> + mtx_leave(&so->so_mtx);
> return (ENOBUFS);
> }
>
> @@ -492,6 +502,7 @@ sbreserve(struct socket *so, struct sock
> {
> KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
> soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> if (cc == 0 || cc > sb_max)
> return (1);
> @@ -533,6 +544,7 @@ sbchecklowmem(void)
> void
> sbrelease(struct socket *so, struct sockbuf *sb)
> {
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> sbflush(so, sb);
> sb->sb_hiwat = sb->sb_mbmax = 0;
> @@ -686,6 +698,7 @@ sbcheck(struct socket *so, struct sockbu
> struct mbuf *m, *n;
> u_long len = 0, mbcnt = 0;
>
> + mtx_enter(&so->so_mtx);
> for (m = sb->sb_mb; m; m = m->m_nextpkt) {
> for (n = m; n; n = n->m_next) {
> len += n->m_len;
> @@ -701,6 +714,7 @@ sbcheck(struct socket *so, struct sockbu
> mbcnt, sb->sb_mbcnt);
> panic("sbcheck");
> }
> + mtx_leave(&so->so_mtx);
> }
> #endif
>
> @@ -860,9 +874,11 @@ sbcompress(struct socket *so, struct soc
> memcpy(mtod(n, caddr_t) + n->m_len, mtod(m, caddr_t),
> m->m_len);
> n->m_len += m->m_len;
> + mtx_enter(&so->so_mtx);
> sb->sb_cc += m->m_len;
> if (m->m_type != MT_CONTROL && m->m_type != MT_SONAME)
> sb->sb_datacc += m->m_len;
> + mtx_leave(&so->so_mtx);
> m = m_free(m);
> continue;
> }
> @@ -895,6 +911,7 @@ sbflush(struct socket *so, struct sockbu
> {
> KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
> KASSERT((sb->sb_flags & SB_LOCK) == 0);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> while (sb->sb_mbcnt)
> sbdrop(so, sb, (int)sb->sb_cc);
> @@ -917,6 +934,7 @@ sbdrop(struct socket *so, struct sockbuf
>
> KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
> soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> next = (m = sb->sb_mb) ? m->m_nextpkt : NULL;
> while (len > 0) {
> @@ -936,12 +954,12 @@ sbdrop(struct socket *so, struct sockbuf
> break;
> }
> len -= m->m_len;
> - sbfree(so, sb, m);
> + sbfree_locked(so, sb, m);
> mn = m_free(m);
> m = mn;
> }
> while (m && m->m_len == 0) {
> - sbfree(so, sb, m);
> + sbfree_locked(so, sb, m);
> mn = m_free(m);
> m = mn;
> }
> Index: kern/uipc_syscalls.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_syscalls.c,v
> retrieving revision 1.193
> diff -u -p -r1.193 uipc_syscalls.c
> --- kern/uipc_syscalls.c 2 Jul 2021 12:17:41 -0000 1.193
> +++ kern/uipc_syscalls.c 26 Jul 2021 05:51:52 -0000
> @@ -308,7 +308,9 @@ doaccept(struct proc *p, int sock, struc
> : (flags & SOCK_NONBLOCK ? FNONBLOCK : 0);
>
> /* connection has been removed from the listen queue */
> + mtx_enter(&head->so_mtx);
> KNOTE(&head->so_rcv.sb_sel.si_note, 0);
> + mtx_leave(&head->so_mtx);
>
> fp->f_type = DTYPE_SOCKET;
> fp->f_flag = FREAD | FWRITE | nflag;
> Index: kern/uipc_usrreq.c
> ===================================================================
> RCS file: /cvs/src/sys/kern/uipc_usrreq.c,v
> retrieving revision 1.148
> diff -u -p -r1.148 uipc_usrreq.c
> --- kern/uipc_usrreq.c 25 May 2021 22:45:09 -0000 1.148
> +++ kern/uipc_usrreq.c 26 Jul 2021 05:51:52 -0000
> @@ -208,8 +208,10 @@ uipc_usrreq(struct socket *so, int req,
> * Adjust backpressure on sender
> * and wakeup any waiting to write.
> */
> + mtx_enter(&so2->so_mtx);
> so2->so_snd.sb_mbcnt = so->so_rcv.sb_mbcnt;
> so2->so_snd.sb_cc = so->so_rcv.sb_cc;
> + mtx_leave(&so2->so_mtx);
> sowwakeup(so2);
> break;
>
> @@ -284,8 +286,10 @@ uipc_usrreq(struct socket *so, int req,
> sbappendrecord(so2, &so2->so_rcv, m);
> else
> sbappend(so2, &so2->so_rcv, m);
> + mtx_enter(&so->so_mtx);
> so->so_snd.sb_mbcnt = so2->so_rcv.sb_mbcnt;
> so->so_snd.sb_cc = so2->so_rcv.sb_cc;
> + mtx_leave(&so->so_mtx);
> if (so2->so_rcv.sb_cc > 0)
> sorwakeup(so2);
> m = NULL;
> @@ -736,12 +740,16 @@ unp_disconnect(struct unpcb *unp)
>
> case SOCK_STREAM:
> case SOCK_SEQPACKET:
> + mtx_enter(&unp->unp_socket->so_mtx);
> unp->unp_socket->so_snd.sb_mbcnt = 0;
> unp->unp_socket->so_snd.sb_cc = 0;
> + mtx_leave(&unp->unp_socket->so_mtx);
> soisdisconnected(unp->unp_socket);
> unp2->unp_conn = NULL;
> + mtx_enter(&unp2->unp_socket->so_mtx);
> unp2->unp_socket->so_snd.sb_mbcnt = 0;
> unp2->unp_socket->so_snd.sb_cc = 0;
> + mtx_leave(&unp2->unp_socket->so_mtx);
> soisdisconnected(unp2->unp_socket);
> break;
> }
> Index: miscfs/fifofs/fifo_vnops.c
> ===================================================================
> RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v
> retrieving revision 1.80
> diff -u -p -r1.80 fifo_vnops.c
> --- miscfs/fifofs/fifo_vnops.c 13 Jul 2021 07:37:50 -0000 1.80
> +++ miscfs/fifofs/fifo_vnops.c 26 Jul 2021 07:02:12 -0000
> @@ -114,7 +114,7 @@ int filt_fifowriteprocess(struct knote *
> int filt_fifowrite_common(struct knote *kn, struct socket *so);
>
> const struct filterops fiforead_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_fifordetach,
> .f_event = filt_fiforead,
> @@ -123,7 +123,7 @@ const struct filterops fiforead_filtops
> };
>
> const struct filterops fifowrite_filtops = {
> - .f_flags = FILTEROP_ISFD,
> + .f_flags = FILTEROP_ISFD | FILTEROP_MPSAFE,
> .f_attach = NULL,
> .f_detach = filt_fifowdetach,
> .f_event = filt_fifowrite,
> @@ -542,7 +542,7 @@ fifo_kqfilter(void *v)
>
> ap->a_kn->kn_hook = so;
>
> - klist_insert_locked(&sb->sb_sel.si_note, ap->a_kn);
> + klist_insert(&sb->sb_sel.si_note, ap->a_kn);
>
> return (0);
> }
> @@ -552,7 +552,7 @@ filt_fifordetach(struct knote *kn)
> {
> struct socket *so = (struct socket *)kn->kn_hook;
>
> - klist_remove_locked(&so->so_rcv.sb_sel.si_note, kn);
> + klist_remove(&so->so_rcv.sb_sel.si_note, kn);
> }
>
> int
> @@ -560,7 +560,7 @@ filt_fiforead_common(struct knote *kn, s
> {
> int rv;
>
> - soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> kn->kn_data = so->so_rcv.sb_cc;
> if (so->so_state & SS_CANTRCVMORE) {
> @@ -590,12 +590,12 @@ int
> filt_fiforeadmodify(struct kevent *kev, struct knote *kn)
> {
> struct socket *so = kn->kn_hook;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> knote_modify(kev, kn);
> rv = filt_fiforead_common(kn, so);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -604,16 +604,16 @@ int
> filt_fiforeadprocess(struct knote *kn, struct kevent *kev)
> {
> struct socket *so = kn->kn_hook;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
> rv = 1;
> else
> rv = filt_fiforead_common(kn, so);
> if (rv != 0)
> knote_submit(kn, kev);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -623,7 +623,7 @@ filt_fifowdetach(struct knote *kn)
> {
> struct socket *so = (struct socket *)kn->kn_hook;
>
> - klist_remove_locked(&so->so_snd.sb_sel.si_note, kn);
> + klist_remove(&so->so_snd.sb_sel.si_note, kn);
> }
>
> int
> @@ -631,9 +631,9 @@ filt_fifowrite_common(struct knote *kn,
> {
> int rv;
>
> - soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
>
> - kn->kn_data = sbspace(so, &so->so_snd);
> + kn->kn_data = sbspace_locked(so, &so->so_snd);
> if (so->so_state & SS_CANTSENDMORE) {
> kn->kn_flags |= EV_EOF;
> rv = 1;
> @@ -657,12 +657,12 @@ int
> filt_fifowritemodify(struct kevent *kev, struct knote *kn)
> {
> struct socket *so = kn->kn_hook;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> knote_modify(kev, kn);
> rv = filt_fifowrite_common(kn, so);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> @@ -671,16 +671,16 @@ int
> filt_fifowriteprocess(struct knote *kn, struct kevent *kev)
> {
> struct socket *so = kn->kn_hook;
> - int rv, s;
> + int rv;
>
> - s = solock(so);
> + mtx_enter(&so->so_mtx);
> if (kev != NULL && (kn->kn_flags & EV_ONESHOT))
> rv = 1;
> else
> rv = filt_fifowrite_common(kn, so);
> if (rv != 0)
> knote_submit(kn, kev);
> - sounlock(so, s);
> + mtx_leave(&so->so_mtx);
>
> return (rv);
> }
> Index: netinet/tcp_input.c
> ===================================================================
> RCS file: /cvs/src/sys/netinet/tcp_input.c,v
> retrieving revision 1.368
> diff -u -p -r1.368 tcp_input.c
> --- netinet/tcp_input.c 16 Apr 2021 12:08:25 -0000 1.368
> +++ netinet/tcp_input.c 26 Jul 2021 05:51:52 -0000
> @@ -946,7 +946,9 @@ findpcb:
> tcpstat_pkt(tcps_rcvackpack, tcps_rcvackbyte,
> acked);
> ND6_HINT(tp);
> + mtx_enter(&so->so_mtx);
> sbdrop(so, &so->so_snd, acked);
> + mtx_leave(&so->so_mtx);
>
> /*
> * If we had a pending ICMP message that
> @@ -1714,6 +1716,7 @@ trimthenstep6:
> TCP_MAXWIN << tp->snd_scale);
> }
> ND6_HINT(tp);
> + mtx_enter(&so->so_mtx);
> if (acked > so->so_snd.sb_cc) {
> if (tp->snd_wnd > so->so_snd.sb_cc)
> tp->snd_wnd -= so->so_snd.sb_cc;
> @@ -1729,6 +1732,7 @@ trimthenstep6:
> tp->snd_wnd = 0;
> ourfinisacked = 0;
> }
> + mtx_leave(&so->so_mtx);
>
> tcp_update_sndspace(tp);
> if (sb_notify(so, &so->so_snd)) {
> @@ -2967,7 +2971,9 @@ tcp_mss_update(struct tcpcb *tp)
> bufsize = roundup(bufsize, mss);
> if (bufsize > sb_max)
> bufsize = sb_max;
> + mtx_enter(&so->so_mtx);
> (void)sbreserve(so, &so->so_snd, bufsize);
> + mtx_leave(&so->so_mtx);
> }
>
> bufsize = so->so_rcv.sb_hiwat;
> @@ -2975,7 +2981,9 @@ tcp_mss_update(struct tcpcb *tp)
> bufsize = roundup(bufsize, mss);
> if (bufsize > sb_max)
> bufsize = sb_max;
> + mtx_enter(&so->so_mtx);
> (void)sbreserve(so, &so->so_rcv, bufsize);
> + mtx_leave(&so->so_mtx);
> }
>
> }
> Index: netinet/tcp_usrreq.c
> ===================================================================
> RCS file: /cvs/src/sys/netinet/tcp_usrreq.c,v
> retrieving revision 1.181
> diff -u -p -r1.181 tcp_usrreq.c
> --- netinet/tcp_usrreq.c 30 Apr 2021 13:52:48 -0000 1.181
> +++ netinet/tcp_usrreq.c 26 Jul 2021 05:51:52 -0000
> @@ -688,7 +688,9 @@ tcp_disconnect(struct tcpcb *tp)
> tp = tcp_drop(tp, 0);
> else {
> soisdisconnecting(so);
> + mtx_enter(&so->so_mtx);
> sbflush(so, &so->so_rcv);
> + mtx_leave(&so->so_mtx);
> tp = tcp_usrclosed(tp);
> if (tp)
> (void) tcp_output(tp);
> @@ -1115,6 +1117,7 @@ tcp_update_sndspace(struct tcpcb *tp)
> struct socket *so = tp->t_inpcb->inp_socket;
> u_long nmax = so->so_snd.sb_hiwat;
>
> + mtx_enter(&so->so_mtx);
> if (sbchecklowmem()) {
> /* low on memory try to get rid of some */
> if (tcp_sendspace < nmax)
> @@ -1128,7 +1131,7 @@ tcp_update_sndspace(struct tcpcb *tp)
> tp->snd_una);
>
> /* a writable socket must be preserved because of poll(2) semantics */
> - if (sbspace(so, &so->so_snd) >= so->so_snd.sb_lowat) {
> + if (sbspace_locked(so, &so->so_snd) >= so->so_snd.sb_lowat) {
> if (nmax < so->so_snd.sb_cc + so->so_snd.sb_lowat)
> nmax = so->so_snd.sb_cc + so->so_snd.sb_lowat;
> /* keep in sync with sbreserve() calculation */
> @@ -1141,6 +1144,7 @@ tcp_update_sndspace(struct tcpcb *tp)
>
> if (nmax != so->so_snd.sb_hiwat)
> sbreserve(so, &so->so_snd, nmax);
> + mtx_leave(&so->so_mtx);
> }
>
> /*
> @@ -1179,5 +1183,7 @@ tcp_update_rcvspace(struct tcpcb *tp)
>
> /* round to MSS boundary */
> nmax = roundup(nmax, tp->t_maxseg);
> + mtx_enter(&so->so_mtx);
> sbreserve(so, &so->so_rcv, nmax);
> + mtx_leave(&so->so_mtx);
> }
> Index: sys/socketvar.h
> ===================================================================
> RCS file: /cvs/src/sys/sys/socketvar.h,v
> retrieving revision 1.100
> diff -u -p -r1.100 socketvar.h
> --- sys/socketvar.h 26 Jul 2021 05:51:13 -0000 1.100
> +++ sys/socketvar.h 26 Jul 2021 05:57:31 -0000
> @@ -33,10 +33,12 @@
> */
>
> #include <sys/selinfo.h> /* for struct selinfo */
> +#include <sys/systm.h> /* panicstr for
> MUTEX_ASSERT */
> #include <sys/queue.h>
> #include <sys/sigio.h> /* for struct sigio_ref
> */
> #include <sys/task.h>
> #include <sys/timeout.h>
> +#include <sys/mutex.h>
> #include <sys/rwlock.h>
>
> #ifndef _SOCKLEN_T_DEFINED_
> @@ -51,10 +53,15 @@ TAILQ_HEAD(soqhead, socket);
> * Contains send and receive buffer queues,
> * handle on protocol and pointer to protocol
> * private data and error information.
> + *
> + * Locks used to protect struct members in this file:
> + * s this socket solock
> + * m this socket `so_mtx'
> */
> struct socket {
> const struct protosw *so_proto; /* protocol handle */
> struct rwlock so_lock; /* this socket lock */
> + struct mutex so_mtx;
> void *so_pcb; /* protocol control block */
> u_int so_state; /* internal state flags SS_*, below */
> short so_type; /* generic type, see socket.h */
> @@ -101,13 +108,13 @@ struct socket {
> struct sockbuf {
> /* The following fields are all zeroed on flush. */
> #define sb_startzero sb_cc
> - u_long sb_cc; /* actual chars in buffer */
> - u_long sb_datacc; /* data only chars in buffer */
> - u_long sb_hiwat; /* max actual char count */
> - u_long sb_wat; /* default watermark */
> - u_long sb_mbcnt; /* chars of mbufs used */
> - u_long sb_mbmax; /* max chars of mbufs to use */
> - long sb_lowat; /* low water mark */
> + u_long sb_cc; /* [s|m] actual chars in buffer */
> + u_long sb_datacc; /* [s|m] data only chars in buffer */
> + u_long sb_hiwat; /* [s|m] max actual char count */
> + u_long sb_wat; /* [s|m] default watermark */
> + u_long sb_mbcnt; /* [s|m] chars of mbufs used */
> + u_long sb_mbmax; /* [s|m] max chars of mbufs to use */
> + long sb_lowat; /* [s|m] low water mark */
> struct mbuf *sb_mb; /* the mbuf chain */
> struct mbuf *sb_mbtail; /* the last mbuf in the chain */
> struct mbuf *sb_lastrecord;/* first mbuf of last record in
> @@ -189,13 +196,27 @@ sb_notify(struct socket *so, struct sock
> * overflow and return 0.
> */
> static inline long
> -sbspace(struct socket *so, struct sockbuf *sb)
> +sbspace_locked(struct socket *so, struct sockbuf *sb)
> {
> KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
> - soassertlocked(so);
> + MUTEX_ASSERT_LOCKED(&so->so_mtx);
> +
> return lmin(sb->sb_hiwat - sb->sb_cc, sb->sb_mbmax - sb->sb_mbcnt);
> }
>
> +static inline long
> +sbspace(struct socket *so, struct sockbuf *sb)
> +{
> + long space;
> +
> + mtx_enter(&so->so_mtx);
> + space = sbspace_locked(so, sb);
> + mtx_leave(&so->so_mtx);
> +
> + return space;
> +}
> +
> +
> /* do we have to send all at once on a socket? */
> #define sosendallatonce(so) \
> ((so)->so_proto->pr_flags & PR_ATOMIC)
> @@ -224,22 +245,31 @@ soreadable(struct socket *so)
>
> /* adjust counters in sb reflecting allocation of m */
> #define sballoc(so, sb, m) do {
> \
> + mtx_enter(&(so)->so_mtx); \
> (sb)->sb_cc += (m)->m_len; \
> if ((m)->m_type != MT_CONTROL && (m)->m_type != MT_SONAME) \
> (sb)->sb_datacc += (m)->m_len; \
> (sb)->sb_mbcnt += MSIZE; \
> if ((m)->m_flags & M_EXT) \
> (sb)->sb_mbcnt += (m)->m_ext.ext_size; \
> + mtx_leave(&(so)->so_mtx); \
> } while (/* CONSTCOND */ 0)
>
> /* adjust counters in sb reflecting freeing of m */
> -#define sbfree(so, sb, m) do {
> \
> +#define sbfree_locked(so, sb, m) do {
> \
> + MUTEX_ASSERT_LOCKED(&so->so_mtx); \
> (sb)->sb_cc -= (m)->m_len; \
> if ((m)->m_type != MT_CONTROL && (m)->m_type != MT_SONAME) \
> (sb)->sb_datacc -= (m)->m_len; \
> (sb)->sb_mbcnt -= MSIZE; \
> if ((m)->m_flags & M_EXT) \
> (sb)->sb_mbcnt -= (m)->m_ext.ext_size; \
> +} while (/* CONSTCOND */ 0)
> +
> +#define sbfree(so, sb, m) do {
> \
> + mtx_enter(&(so)->so_mtx); \
> + sbfree_locked((so), (sb), (m)); \
> + mtx_leave(&(so)->so_mtx); \
> } while (/* CONSTCOND */ 0)
>
> /*
--
:wq Claudio