On Fri, Jul 01, 2022 at 04:03:21PM +0000, Visa Hankala wrote:
> On Fri, Jul 01, 2022 at 09:59:11AM +0200, Claudio Jeker wrote:
> > On Thu, Jun 30, 2022 at 03:46:35PM +0000, Visa Hankala wrote:
> > > On Thu, Jun 30, 2022 at 11:51:52AM +0200, Claudio Jeker wrote:
> > > > After discussing this with mpi@ and jmatthew@ we came to the conclusion
> > > > that we need to smr_barrier() before refcnt_finalize() to ensure that no
> > > > other CPU is between the SMR_TAILQ_FOREACH, refcnt_take() and
> > > > smr_read_leave().
> > > 
> > > [...]
> > > 
> > > > @@ -509,7 +487,8 @@ route_input(struct mbuf *m0, struct sock
> > > >                 return;
> > > >         }
> > > >  
> > > > -       SRPL_FOREACH(rop, &sr, &rtptable.rtp_list, rop_list) {
> > > > +       smr_read_enter();
> > > > +       SMR_TAILQ_FOREACH(rop, &rtptable.rtp_list, rop_list) {
> > > >                 /*
> > > >                  * If route socket is bound to an address family only 
> > > > send
> > > >                  * messages that match the address family. Address 
> > > > family
> > > > @@ -519,7 +498,8 @@ route_input(struct mbuf *m0, struct sock
> > > >                     rop->rop_proto != sa_family)
> > > >                         continue;
> > > >  
> > > > -
> > > > +               refcnt_take(&rop->rop_refcnt);
> > > > +               smr_read_leave();
> > > >                 so = rop->rop_socket;
> > > >                 solock(so);
> > > >  
> > > > @@ -579,8 +559,10 @@ route_input(struct mbuf *m0, struct sock
> > > >                 rtm_sendup(so, m);
> > > >  next:
> > > >                 sounlock(so);
> > > > +               smr_read_enter();
> > > > +               refcnt_rele_wake(&rop->rop_refcnt);
> > > 
> > > This does not look correct.
> > > 
> > > smr_barrier() can proceed after smr_read_leave(), so refcnt_rele_wake()
> > > might drop the final reference and this thread can no longer access
> > > rop safely (SMR_TAILQ_NEXT() inside SMR_TAILQ_FOREACH()).
> > > 
> > > Also, SMR_TAILQ_NEXT() of rop becomes potentially dangling after
> > > smr_read_leave(). After this thread leaves the read-side critical
> > > section, another thread might free rop's successor.
> > 
> > So we need to either smr_barrier() before and after the refcnt_finalize()
> > to make sure that the rop pointer remains stable in both cases or we alter
> > the SMR_TAILQ_FOREACH() loop so that SMR_TAILQ_NEXT can be grabbed before
> > refcnt_rele_wake().
> > 
> > While the double smr_barrier() is trivial it is not ideal and I think it
> > is better to adjust the loop since SMR loops with sleep points is a
> > somewhat common issue and so we should have a good clear way on how to
> > solve it.
> 
> Adjusting SMR_TAILQ_FOREACH() will not help.
> 
> In general, a reader cannot resume a lockless iteration after it has
> left the read-side critical section and crossed a sleep point. The
> guarantee of consistent(-looking) forward linkage is gone. The reader
> no longer knows if the value of SMR_TAILQ_NEXT() is valid. If the
> reader wants to continue with the list, it has to re-enter the read-side
> critical section and restart the iteration.

This is not a real SMR_TAILQ_FOREACH() use case so trying to use
SMR_TAILQ_FOREACH() here is not right. The code wants to walk the list of
route pcbs linked via rop_list. The code just needs to walk all active
connections and does not care about races with sockets that are concurrently
closed or opened. In the first case SS_CANTRCVMORE will be set and the
socket is skipped and in the second case the socket is simply ignored
because new sockets are inserted at the head of the list.
 
It is not a lockless iteration over the full list. It is not required to
be either. The only thing that matters is that the forward linkage is
consitent (not pointing to invalid objects).

There is no need to restart the iteration because element on the list can
not be reinserted. They can only be removed and a removed element does not
get any message anyway (either by not visiting the object or by skipping
it in the loop).

The refcnt ensures that the currently used pcb is not freed before the
next element is picked. As long as the refcnt is hold the object can't be
removed.
 
> I guess I should finish the sleepable variant of SMR that I was
> tinkering with long ago...

I switched from SMR_TAILQ to SMR_LIST since there is no need to access the
tail. Also the code uses just SMR_LIST_FIRST() and SMR_LIST_NEXT() and not
SMR_LIST_FOREACH().

-- 
:wq Claudio

Index: rtsock.c
===================================================================
RCS file: /cvs/src/sys/net/rtsock.c,v
retrieving revision 1.334
diff -u -p -r1.334 rtsock.c
--- rtsock.c    28 Jun 2022 10:01:13 -0000      1.334
+++ rtsock.c    10 Aug 2022 08:43:31 -0000
@@ -71,7 +71,7 @@
 #include <sys/domain.h>
 #include <sys/pool.h>
 #include <sys/protosw.h>
-#include <sys/srp.h>
+#include <sys/smr.h>
 
 #include <net/if.h>
 #include <net/if_dl.h>
@@ -107,8 +107,6 @@ struct walkarg {
 };
 
 void   route_prinit(void);
-void   rcb_ref(void *, void *);
-void   rcb_unref(void *, void *);
 int    route_output(struct mbuf *, struct socket *, struct sockaddr *,
            struct mbuf *);
 int    route_ctloutput(int, struct socket *, int, int, struct mbuf *);
@@ -149,7 +147,7 @@ int          rt_setsource(unsigned int, struct 
 struct rtpcb {
        struct socket           *rop_socket;            /* [I] */
 
-       SRPL_ENTRY(rtpcb)       rop_list;
+       SMR_LIST_ENTRY(rtpcb)   rop_list;
        struct refcnt           rop_refcnt;
        struct timeout          rop_timeout;
        unsigned int            rop_msgfilter;          /* [s] */
@@ -162,8 +160,7 @@ struct rtpcb {
 #define        sotortpcb(so)   ((struct rtpcb *)(so)->so_pcb)
 
 struct rtptable {
-       SRPL_HEAD(, rtpcb)      rtp_list;
-       struct srpl_rc          rtp_rc;
+       SMR_LIST_HEAD(, rtpcb)  rtp_list;
        struct rwlock           rtp_lk;
        unsigned int            rtp_count;
 };
@@ -185,29 +182,12 @@ struct rtptable rtptable;
 void
 route_prinit(void)
 {
-       srpl_rc_init(&rtptable.rtp_rc, rcb_ref, rcb_unref, NULL);
        rw_init(&rtptable.rtp_lk, "rtsock");
-       SRPL_INIT(&rtptable.rtp_list);
+       SMR_LIST_INIT(&rtptable.rtp_list);
        pool_init(&rtpcb_pool, sizeof(struct rtpcb), 0,
            IPL_SOFTNET, PR_WAITOK, "rtpcb", NULL);
 }
 
-void
-rcb_ref(void *null, void *v)
-{
-       struct rtpcb *rop = v;
-
-       refcnt_take(&rop->rop_refcnt);
-}
-
-void
-rcb_unref(void *null, void *v)
-{
-       struct rtpcb *rop = v;
-
-       refcnt_rele_wake(&rop->rop_refcnt);
-}
-
 int
 route_usrreq(struct socket *so, int req, struct mbuf *m, struct mbuf *nam,
     struct mbuf *control, struct proc *p)
@@ -225,12 +205,6 @@ route_usrreq(struct socket *so, int req,
                goto release;
        }
 
-       rop = sotortpcb(so);
-       if (rop == NULL) {
-               error = EINVAL;
-               goto release;
-       }
-
        switch (req) {
        /* no connect, bind, accept. Socket is connected from the start */
        case PRU_CONNECT:
@@ -266,6 +240,12 @@ route_usrreq(struct socket *so, int req,
                 * If we are in a FLUSH state, check if the buffer is
                 * empty so that we can clear the flag.
                 */
+               rop = sotortpcb(so);
+               if (rop == NULL) {
+                       error = EINVAL;
+                       break;
+               }
+
                if (((rop->rop_flags & ROUTECB_FLAG_FLUSH) != 0) &&
                    ((sbspace(rop->rop_socket, &rop->rop_socket->so_rcv) ==
                    rop->rop_socket->so_rcv.sb_hiwat)))
@@ -311,7 +291,6 @@ route_attach(struct socket *so, int prot
         * and works directly on the raw socket.
         */
        rop = pool_get(&rtpcb_pool, PR_WAITOK|PR_ZERO);
-       so->so_pcb = rop;
        /* Init the timeout structure */
        timeout_set_proc(&rop->rop_timeout, rtm_senddesync_timer, so);
        refcnt_init(&rop->rop_refcnt);
@@ -321,12 +300,12 @@ route_attach(struct socket *so, int prot
 
        rop->rop_rtableid = curproc->p_p->ps_rtableid;
 
+       so->so_pcb = rop;
        soisconnected(so);
        so->so_options |= SO_USELOOPBACK;
 
        rw_enter(&rtptable.rtp_lk, RW_WRITE);
-       SRPL_INSERT_HEAD_LOCKED(&rtptable.rtp_rc, &rtptable.rtp_list, rop,
-           rop_list);
+       SMR_LIST_INSERT_HEAD_LOCKED(&rtptable.rtp_list, rop, rop_list);
        rtptable.rtp_count++;
        rw_exit(&rtptable.rtp_lk);
 
@@ -347,13 +326,13 @@ route_detach(struct socket *so)
        rw_enter(&rtptable.rtp_lk, RW_WRITE);
 
        rtptable.rtp_count--;
-       SRPL_REMOVE_LOCKED(&rtptable.rtp_rc, &rtptable.rtp_list, rop, rtpcb,
-           rop_list);
+       SMR_LIST_REMOVE_LOCKED(rop, rop_list);
        rw_exit(&rtptable.rtp_lk);
 
        sounlock(so);
 
        /* wait for all references to drop */
+       smr_barrier();
        refcnt_finalize(&rop->rop_refcnt, "rtsockrefs");
        timeout_del_barrier(&rop->rop_timeout);
 
@@ -377,6 +356,8 @@ route_ctloutput(int op, struct socket *s
        if (level != AF_ROUTE)
                return (EINVAL);
 
+       soassertlocked(so);
+
        switch (op) {
        case PRCO_SETOPT:
                switch (optname) {
@@ -498,10 +479,9 @@ void
 route_input(struct mbuf *m0, struct socket *so0, sa_family_t sa_family)
 {
        struct socket *so;
-       struct rtpcb *rop;
+       struct rtpcb *rop, *nrop;
        struct rt_msghdr *rtm;
        struct mbuf *m = m0;
-       struct srp_ref sr;
 
        /* ensure that we can access the rtm_type via mtod() */
        if (m->m_len < offsetof(struct rt_msghdr, rtm_type) + 1) {
@@ -509,17 +489,22 @@ route_input(struct mbuf *m0, struct sock
                return;
        }
 
-       SRPL_FOREACH(rop, &sr, &rtptable.rtp_list, rop_list) {
+       smr_read_enter();
+       rop = SMR_LIST_FIRST(&rtptable.rtp_list);
+       while (rop != NULL) {
                /*
                 * If route socket is bound to an address family only send
                 * messages that match the address family. Address family
                 * agnostic messages are always sent.
                 */
                if (sa_family != AF_UNSPEC && rop->rop_proto != AF_UNSPEC &&
-                   rop->rop_proto != sa_family)
+                   rop->rop_proto != sa_family) {
+                       rop = SMR_LIST_NEXT(rop, rop_list);
                        continue;
+               }
 
-
+               refcnt_take(&rop->rop_refcnt);
+               smr_read_leave();
                so = rop->rop_socket;
                solock(so);
 
@@ -579,8 +564,12 @@ route_input(struct mbuf *m0, struct sock
                rtm_sendup(so, m);
 next:
                sounlock(so);
+               smr_read_enter();
+               nrop = SMR_LIST_NEXT(rop, rop_list);
+               refcnt_rele_wake(&rop->rop_refcnt);
+               rop = nrop;
        }
-       SRPL_LEAVE(&sr);
+       smr_read_leave();
 
        m_freem(m);
 }

Reply via email to