On 22.8.2022. 15:07, Alexander Bluhm wrote:
> On Sun, Aug 21, 2022 at 07:07:29PM +0200, Alexander Bluhm wrote:
>> On Fri, Aug 19, 2022 at 10:54:42PM +0200, Alexander Bluhm wrote:
>>> This diff allows to run udp_input() in parallel.
> 
> Diff rebased to -current.


Hi,

is this diff still active? I was running this diff in prod with
wireguard, remote syslog, dhcp server, ntp client for 2 months and all
seems good.



> 
> Index: kern/uipc_socket.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket.c,v
> retrieving revision 1.284
> diff -u -p -r1.284 uipc_socket.c
> --- kern/uipc_socket.c        21 Aug 2022 16:22:17 -0000      1.284
> +++ kern/uipc_socket.c        22 Aug 2022 12:01:58 -0000
> @@ -822,10 +822,10 @@ bad:
>       if (mp)
>               *mp = NULL;
>  
> -     solock(so);
> +     solock_shared(so);
>  restart:
>       if ((error = sblock(so, &so->so_rcv, SBLOCKWAIT(flags))) != 0) {
> -             sounlock(so);
> +             sounlock_shared(so);
>               return (error);
>       }
>  
> @@ -893,7 +893,7 @@ restart:
>               sbunlock(so, &so->so_rcv);
>               error = sbwait(so, &so->so_rcv);
>               if (error) {
> -                     sounlock(so);
> +                     sounlock_shared(so);
>                       return (error);
>               }
>               goto restart;
> @@ -962,11 +962,11 @@ dontblock:
>                       sbsync(&so->so_rcv, nextrecord);
>                       if (controlp) {
>                               if (pr->pr_domain->dom_externalize) {
> -                                     sounlock(so);
> +                                     sounlock_shared(so);
>                                       error =
>                                           (*pr->pr_domain->dom_externalize)
>                                           (cm, controllen, flags);
> -                                     solock(so);
> +                                     solock_shared(so);
>                               }
>                               *controlp = cm;
>                       } else {
> @@ -1040,9 +1040,9 @@ dontblock:
>                       SBLASTRECORDCHK(&so->so_rcv, "soreceive uiomove");
>                       SBLASTMBUFCHK(&so->so_rcv, "soreceive uiomove");
>                       resid = uio->uio_resid;
> -                     sounlock(so);
> +                     sounlock_shared(so);
>                       uio_error = uiomove(mtod(m, caddr_t) + moff, len, uio);
> -                     solock(so);
> +                     solock_shared(so);
>                       if (uio_error)
>                               uio->uio_resid = resid - len;
>               } else
> @@ -1126,7 +1126,7 @@ dontblock:
>                       error = sbwait(so, &so->so_rcv);
>                       if (error) {
>                               sbunlock(so, &so->so_rcv);
> -                             sounlock(so);
> +                             sounlock_shared(so);
>                               return (0);
>                       }
>                       if ((m = so->so_rcv.sb_mb) != NULL)
> @@ -1171,7 +1171,7 @@ dontblock:
>               *flagsp |= flags;
>  release:
>       sbunlock(so, &so->so_rcv);
> -     sounlock(so);
> +     sounlock_shared(so);
>       return (error);
>  }
>  
> Index: kern/uipc_socket2.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/kern/uipc_socket2.c,v
> retrieving revision 1.127
> diff -u -p -r1.127 uipc_socket2.c
> --- kern/uipc_socket2.c       13 Aug 2022 21:01:46 -0000      1.127
> +++ kern/uipc_socket2.c       22 Aug 2022 12:01:58 -0000
> @@ -360,6 +360,24 @@ solock(struct socket *so)
>       }
>  }
>  
> +void
> +solock_shared(struct socket *so)
> +{
> +     switch (so->so_proto->pr_domain->dom_family) {
> +     case PF_INET:
> +     case PF_INET6:
> +             if (so->so_proto->pr_usrreqs->pru_lock != NULL) {
> +                     NET_LOCK_SHARED();
> +                     pru_lock(so);
> +             } else
> +                     NET_LOCK();
> +             break;
> +     default:
> +             rw_enter_write(&so->so_lock);
> +             break;
> +     }
> +}
> +
>  int
>  solock_persocket(struct socket *so)
>  {
> @@ -403,6 +421,24 @@ sounlock(struct socket *so)
>  }
>  
>  void
> +sounlock_shared(struct socket *so)
> +{
> +     switch (so->so_proto->pr_domain->dom_family) {
> +     case PF_INET:
> +     case PF_INET6:
> +             if (so->so_proto->pr_usrreqs->pru_unlock != NULL) {
> +                     pru_unlock(so);
> +                     NET_UNLOCK_SHARED();
> +             } else
> +                     NET_UNLOCK();
> +             break;
> +     default:
> +             rw_exit_write(&so->so_lock);
> +             break;
> +     }
> +}
> +
> +void
>  soassertlocked(struct socket *so)
>  {
>       switch (so->so_proto->pr_domain->dom_family) {
> @@ -425,7 +461,15 @@ sosleep_nsec(struct socket *so, void *id
>       switch (so->so_proto->pr_domain->dom_family) {
>       case PF_INET:
>       case PF_INET6:
> +             if (so->so_proto->pr_usrreqs->pru_unlock != NULL &&
> +                 rw_status(&netlock) == RW_READ) {
> +                     pru_unlock(so);
> +             }
>               ret = rwsleep_nsec(ident, &netlock, prio, wmesg, nsecs);
> +             if (so->so_proto->pr_usrreqs->pru_lock != NULL &&
> +                 rw_status(&netlock) == RW_READ) {
> +                     pru_lock(so);
> +             }
>               break;
>       default:
>               ret = rwsleep_nsec(ident, &so->so_lock, prio, wmesg, nsecs);
> Index: net/if_bridge.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/net/if_bridge.c,v
> retrieving revision 1.364
> diff -u -p -r1.364 if_bridge.c
> --- net/if_bridge.c   7 Aug 2022 00:57:43 -0000       1.364
> +++ net/if_bridge.c   22 Aug 2022 12:01:58 -0000
> @@ -1590,7 +1590,7 @@ bridge_ipsec(struct ifnet *ifp, struct e
>                           off);
>                       tdb_unref(tdb);
>                       if (prot != IPPROTO_DONE)
> -                             ip_deliver(&m, &hlen, prot, af);
> +                             ip_deliver(&m, &hlen, prot, af, 0);
>                       return (1);
>               } else {
>                       tdb_unref(tdb);
> Index: netinet/in_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/in_proto.c,v
> retrieving revision 1.99
> diff -u -p -r1.99 in_proto.c
> --- netinet/in_proto.c        15 Aug 2022 09:11:38 -0000      1.99
> +++ netinet/in_proto.c        22 Aug 2022 12:01:58 -0000
> @@ -185,7 +185,7 @@ const struct protosw inetsw[] = {
>    .pr_type   = SOCK_DGRAM,
>    .pr_domain = &inetdomain,
>    .pr_protocol       = IPPROTO_UDP,
> -  .pr_flags  = PR_ATOMIC|PR_ADDR|PR_SPLICE,
> +  .pr_flags  = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSAFE,
>    .pr_input  = udp_input,
>    .pr_ctlinput       = udp_ctlinput,
>    .pr_ctloutput      = ip_ctloutput,
> Index: netinet/ip_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_input.c,v
> retrieving revision 1.380
> diff -u -p -r1.380 ip_input.c
> --- netinet/ip_input.c        21 Aug 2022 14:15:55 -0000      1.380
> +++ netinet/ip_input.c        22 Aug 2022 12:01:58 -0000
> @@ -230,6 +230,11 @@ ip_init(void)
>  #endif
>  }
>  
> +struct ip_offnxt {
> +     int     ion_off;
> +     int     ion_nxt;
> +};
> +
>  /*
>   * Enqueue packet for local delivery.  Queuing is used as a boundary
>   * between the network layer (input/forward path) running with
> @@ -246,6 +251,30 @@ ip_ours(struct mbuf **mp, int *offp, int
>       if (af != AF_UNSPEC)
>               return nxt;
>  
> +     nxt = ip_deliver(mp, offp, nxt, AF_INET, 1);
> +     if (nxt == IPPROTO_DONE)
> +             return IPPROTO_DONE;
> +
> +        /* save values for later, use after dequeue */
> +     if (*offp != sizeof(struct ip)) {
> +             struct m_tag *mtag;
> +             struct ip_offnxt *ion;
> +
> +             /* mbuf tags are expensive, but only used for header options */
> +             mtag = m_tag_get(PACKET_TAG_IP_OFFNXT, sizeof(*ion),
> +                 M_NOWAIT);
> +             if (mtag == NULL) {
> +                     ipstat_inc(ips_idropped);
> +                     m_freemp(mp);
> +                     return IPPROTO_DONE;
> +             }
> +             ion = (struct ip_offnxt *)(mtag + 1);
> +             ion->ion_off = *offp;
> +             ion->ion_nxt = nxt;
> +
> +             m_tag_prepend(*mp, mtag);
> +     }
> +
>       niq_enqueue(&ipintrq, *mp);
>       *mp = NULL;
>       return IPPROTO_DONE;
> @@ -261,18 +290,31 @@ ipintr(void)
>       struct mbuf *m;
>  
>       while ((m = niq_dequeue(&ipintrq)) != NULL) {
> -             struct ip *ip;
> +             struct m_tag *mtag;
>               int off, nxt;
>  
>  #ifdef DIAGNOSTIC
>               if ((m->m_flags & M_PKTHDR) == 0)
>                       panic("ipintr no HDR");
>  #endif
> -             ip = mtod(m, struct ip *);
> -             off = ip->ip_hl << 2;
> -             nxt = ip->ip_p;
> +             mtag = m_tag_find(m, PACKET_TAG_IP_OFFNXT, NULL);
> +             if (mtag != NULL) {
> +                     struct ip_offnxt *ion;
> +
> +                     ion = (struct ip_offnxt *)(mtag + 1);
> +                     off = ion->ion_off;
> +                     nxt = ion->ion_nxt;
> +
> +                     m_tag_delete(m, mtag);
> +             } else {
> +                     struct ip *ip;
>  
> -             nxt = ip_deliver(&m, &off, nxt, AF_INET);
> +                     ip = mtod(m, struct ip *);
> +                     off = ip->ip_hl << 2;
> +                     nxt = ip->ip_p;
> +             }
> +
> +             nxt = ip_deliver(&m, &off, nxt, AF_INET, 0);
>               KASSERT(nxt == IPPROTO_DONE);
>       }
>  }
> @@ -673,7 +715,7 @@ ip_fragcheck(struct mbuf **mp, int *offp
>  #endif
>  
>  int
> -ip_deliver(struct mbuf **mp, int *offp, int nxt, int af)
> +ip_deliver(struct mbuf **mp, int *offp, int nxt, int af, int shared)
>  {
>       const struct protosw *psw;
>       int naf = af;
> @@ -681,26 +723,24 @@ ip_deliver(struct mbuf **mp, int *offp, 
>       int nest = 0;
>  #endif /* INET6 */
>  
> -     NET_ASSERT_LOCKED_EXCLUSIVE();
> -
> -     /* pf might have modified stuff, might have to chksum */
> -     switch (af) {
> -     case AF_INET:
> -             in_proto_cksum_out(*mp, NULL);
> -             break;
> -#ifdef INET6
> -     case AF_INET6:
> -             in6_proto_cksum_out(*mp, NULL);
> -             break;
> -#endif /* INET6 */
> -     }
> -
>       /*
>        * Tell launch routine the next header
>        */
>       IPSTAT_INC(delivered);
>  
>       while (nxt != IPPROTO_DONE) {
> +             switch (af) {
> +             case AF_INET:
> +                     psw = &inetsw[ip_protox[nxt]];
> +                     break;
> +#ifdef INET6
> +             case AF_INET6:
> +                     psw = &inet6sw[ip6_protox[nxt]];
> +                     break;
> +#endif /* INET6 */
> +             }
> +             if (shared && !ISSET(psw->pr_flags, PR_MPSAFE))
> +                     break;
>  #ifdef INET6
>               if (af == AF_INET6 &&
>                   ip6_hdrnestlimit && (++nest > ip6_hdrnestlimit)) {
> @@ -737,16 +777,6 @@ ip_deliver(struct mbuf **mp, int *offp, 
>               case IPPROTO_IPV6:
>                       naf = AF_INET6;
>                       ip6stat_inc(ip6s_delivered);
> -                     break;
> -#endif /* INET6 */
> -             }
> -             switch (af) {
> -             case AF_INET:
> -                     psw = &inetsw[ip_protox[nxt]];
> -                     break;
> -#ifdef INET6
> -             case AF_INET6:
> -                     psw = &inet6sw[ip6_protox[nxt]];
>                       break;
>  #endif /* INET6 */
>               }
> Index: netinet/ip_var.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/ip_var.h,v
> retrieving revision 1.99
> diff -u -p -r1.99 ip_var.h
> --- netinet/ip_var.h  21 Aug 2022 22:45:55 -0000      1.99
> +++ netinet/ip_var.h  22 Aug 2022 12:01:58 -0000
> @@ -249,7 +249,7 @@ int        ip_sysctl(int *, u_int, void *, siz
>  void  ip_savecontrol(struct inpcb *, struct mbuf **, struct ip *,
>           struct mbuf *);
>  int   ip_input_if(struct mbuf **, int *, int, int, struct ifnet *);
> -int   ip_deliver(struct mbuf **, int *, int, int);
> +int   ip_deliver(struct mbuf **, int *, int, int, int);
>  void  ip_forward(struct mbuf *, struct ifnet *, struct rtentry *, int);
>  int   rip_ctloutput(int, struct socket *, int, int, struct mbuf *);
>  void  rip_init(void);
> Index: netinet/udp_usrreq.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet/udp_usrreq.c,v
> retrieving revision 1.287
> diff -u -p -r1.287 udp_usrreq.c
> --- netinet/udp_usrreq.c      22 Aug 2022 10:37:27 -0000      1.287
> +++ netinet/udp_usrreq.c      22 Aug 2022 12:01:58 -0000
> @@ -122,10 +122,15 @@ u_int   udp_sendspace = 9216;           /* really m
>  u_int        udp_recvspace = 40 * (1024 + sizeof(struct sockaddr_in));
>                                       /* 40 1K datagrams */
>  
> +void udp_lock(struct socket *);
> +void udp_unlock(struct socket *);
> +
>  const struct pr_usrreqs udp_usrreqs = {
>       .pru_usrreq     = udp_usrreq,
>       .pru_attach     = udp_attach,
>       .pru_detach     = udp_detach,
> +     .pru_lock       = udp_lock,
> +     .pru_unlock     = udp_unlock,
>       .pru_bind       = udp_bind,
>       .pru_connect    = udp_connect,
>  };
> @@ -653,12 +658,17 @@ udp_sbappend(struct inpcb *inp, struct m
>       }
>  #endif
>       m_adj(m, hlen);
> +
> +     mtx_enter(&inp->inp_mtx);
>       if (sbappendaddr(so, &so->so_rcv, srcaddr, m, opts) == 0) {
> +             mtx_leave(&inp->inp_mtx);
>               udpstat_inc(udps_fullsock);
>               m_freem(m);
>               m_freem(opts);
>               return;
>       }
> +     mtx_leave(&inp->inp_mtx);
> +
>       sorwakeup(so);
>  }
>  
> @@ -1245,6 +1255,24 @@ udp_detach(struct socket *so)
>  
>       in_pcbdetach(inp);
>       return (0);
> +}
> +
> +void
> +udp_lock(struct socket *so)
> +{
> +     struct inpcb *inp = sotoinpcb(so);
> +
> +     NET_ASSERT_LOCKED();
> +     mtx_enter(&inp->inp_mtx);
> +}
> +
> +void
> +udp_unlock(struct socket *so)
> +{
> +     struct inpcb *inp = sotoinpcb(so);
> +
> +     NET_ASSERT_LOCKED();
> +     mtx_leave(&inp->inp_mtx);
>  }
>  
>  int
> Index: netinet6/in6_proto.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/in6_proto.c,v
> retrieving revision 1.110
> diff -u -p -r1.110 in6_proto.c
> --- netinet6/in6_proto.c      15 Aug 2022 09:11:39 -0000      1.110
> +++ netinet6/in6_proto.c      22 Aug 2022 12:01:58 -0000
> @@ -136,7 +136,7 @@ const struct protosw inet6sw[] = {
>    .pr_type   = SOCK_DGRAM,
>    .pr_domain = &inet6domain,
>    .pr_protocol       = IPPROTO_UDP,
> -  .pr_flags  = PR_ATOMIC|PR_ADDR|PR_SPLICE,
> +  .pr_flags  = PR_ATOMIC|PR_ADDR|PR_SPLICE|PR_MPSAFE,
>    .pr_input  = udp_input,
>    .pr_ctlinput       = udp6_ctlinput,
>    .pr_ctloutput      = ip6_ctloutput,
> Index: netinet6/ip6_input.c
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/netinet6/ip6_input.c,v
> retrieving revision 1.254
> diff -u -p -r1.254 ip6_input.c
> --- netinet6/ip6_input.c      21 Aug 2022 14:15:55 -0000      1.254
> +++ netinet6/ip6_input.c      22 Aug 2022 12:01:58 -0000
> @@ -190,6 +190,10 @@ ip6_ours(struct mbuf **mp, int *offp, in
>       if (af != AF_UNSPEC)
>               return nxt;
>  
> +     nxt = ip_deliver(mp, offp, nxt, AF_INET6, 1);
> +     if (nxt == IPPROTO_DONE)
> +             return IPPROTO_DONE;
> +
>       /* save values for later, use after dequeue */
>       if (*offp != sizeof(struct ip6_hdr)) {
>               struct m_tag *mtag;
> @@ -248,7 +252,7 @@ ip6intr(void)
>                       off = sizeof(struct ip6_hdr);
>                       nxt = ip6->ip6_nxt;
>               }
> -             nxt = ip_deliver(&m, &off, nxt, AF_INET6);
> +             nxt = ip_deliver(&m, &off, nxt, AF_INET6, 0);
>               KASSERT(nxt == IPPROTO_DONE);
>       }
>  }
> Index: sys/mbuf.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/sys/mbuf.h,v
> retrieving revision 1.255
> diff -u -p -r1.255 mbuf.h
> --- sys/mbuf.h        15 Aug 2022 16:15:37 -0000      1.255
> +++ sys/mbuf.h        22 Aug 2022 12:01:58 -0000
> @@ -471,6 +471,8 @@ struct m_tag *m_tag_next(struct mbuf *, 
>  #define PACKET_TAG_IPSEC_IN_DONE     0x0001  /* IPsec applied, in */
>  #define PACKET_TAG_IPSEC_OUT_DONE    0x0002  /* IPsec applied, out */
>  #define PACKET_TAG_IPSEC_FLOWINFO    0x0004  /* IPsec flowinfo */
> +#define PACKET_TAG_IP_OFFNXT         0x0010  /* IPv4 offset and next proto */
> +#define PACKET_TAG_IP6_OFFNXT                0x0020  /* IPv6 offset and next 
> proto */
>  #define PACKET_TAG_WIREGUARD         0x0040  /* WireGuard data */
>  #define PACKET_TAG_GRE                       0x0080  /* GRE processing done 
> */
>  #define PACKET_TAG_DLT                       0x0100 /* data link layer type 
> */
> @@ -479,7 +481,6 @@ struct m_tag *m_tag_next(struct mbuf *, 
>  #define PACKET_TAG_SRCROUTE          0x1000 /* IPv4 source routing options */
>  #define PACKET_TAG_TUNNEL            0x2000  /* Tunnel endpoint address */
>  #define PACKET_TAG_CARP_BAL_IP               0x4000  /* carp(4) ip balanced 
> marker */
> -#define PACKET_TAG_IP6_OFFNXT                0x8000  /* IPv6 offset and next 
> proto */
>  
>  #define MTAG_BITS \
>      ("\20\1IPSEC_IN_DONE\2IPSEC_OUT_DONE\3IPSEC_FLOWINFO" \
> Index: sys/protosw.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/sys/protosw.h,v
> retrieving revision 1.41
> diff -u -p -r1.41 protosw.h
> --- sys/protosw.h     22 Aug 2022 08:08:47 -0000      1.41
> +++ sys/protosw.h     22 Aug 2022 12:01:58 -0000
> @@ -66,6 +66,8 @@ struct pr_usrreqs {
>  
>       int     (*pru_attach)(struct socket *, int);
>       int     (*pru_detach)(struct socket *);
> +     void    (*pru_lock)(struct socket *);
> +     void    (*pru_unlock)(struct socket *);
>       int     (*pru_bind)(struct socket *, struct mbuf *, struct proc *);
>       int     (*pru_listen)(struct socket *);
>       int     (*pru_connect)(struct socket *, struct mbuf *);
> @@ -116,6 +118,7 @@ struct protosw {
>  #define      PR_ABRTACPTDIS  0x20            /* abort on accept(2) to 
> disconnected
>                                          socket */
>  #define      PR_SPLICE       0x40            /* socket splicing is possible 
> */
> +#define      PR_MPSAFE       0x80            /* input runs with shared 
> netlock */
>  
>  /*
>   * The arguments to usrreq are:
> @@ -263,6 +266,18 @@ static inline int
>  pru_detach(struct socket *so)
>  {
>       return (*so->so_proto->pr_usrreqs->pru_detach)(so);
> +}
> +
> +static inline void
> +pru_lock(struct socket *so)
> +{
> +     (*so->so_proto->pr_usrreqs->pru_lock)(so);
> +}
> +
> +static inline void
> +pru_unlock(struct socket *so)
> +{
> +     (*so->so_proto->pr_usrreqs->pru_unlock)(so);
>  }
>  
>  static inline int
> Index: sys/socketvar.h
> ===================================================================
> RCS file: /data/mirror/openbsd/cvs/src/sys/sys/socketvar.h,v
> retrieving revision 1.108
> diff -u -p -r1.108 socketvar.h
> --- sys/socketvar.h   21 Aug 2022 16:22:18 -0000      1.108
> +++ sys/socketvar.h   22 Aug 2022 12:01:58 -0000
> @@ -349,9 +349,11 @@ int      sockargs(struct mbuf **, const void 
>  
>  int  sosleep_nsec(struct socket *, void *, int, const char *, uint64_t);
>  void solock(struct socket *);
> +void solock_shared(struct socket *);
>  int  solock_persocket(struct socket *);
>  void solock_pair(struct socket *, struct socket *);
>  void sounlock(struct socket *);
> +void sounlock_shared(struct socket *);
>  
>  int  sendit(struct proc *, int, struct msghdr *, int, register_t *);
>  int  recvit(struct proc *, int, struct msghdr *, caddr_t, register_t *);
> 

Reply via email to