Module Name: src Committed By: riastradh Date: Mon Sep 7 01:14:42 UTC 2020
Modified Files: src/sys/net: if_wg.c Log Message: wg: Use a global pktqueue rather than a per-peer pcq. - Improves scalability -- won't hit limit on softints no matter how many peers there are. - Improves parallelism -- softint was kernel-locked to serialize access to the pcq. - Requires per-peer queue on handshake init to avoid dropping first packet. . Per-peer queue is currently a single packet -- should serve well enough for pings, dns queries, tcp connections, &c. To generate a diff of this commit: cvs rdiff -u -r1.53 -r1.54 src/sys/net/if_wg.c Please note that diffs are not public domain; they are subject to the copyright notices on the relevant files.
Modified files: Index: src/sys/net/if_wg.c diff -u src/sys/net/if_wg.c:1.53 src/sys/net/if_wg.c:1.54 --- src/sys/net/if_wg.c:1.53 Mon Sep 7 00:33:08 2020 +++ src/sys/net/if_wg.c Mon Sep 7 01:14:42 2020 @@ -1,4 +1,4 @@ -/* $NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $ */ +/* $NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $ */ /* * Copyright (C) Ryota Ozaki <ozaki.ry...@gmail.com> @@ -41,7 +41,7 @@ */ #include <sys/cdefs.h> -__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $"); +__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $"); #ifdef _KERNEL_OPT #include "opt_inet.h" @@ -65,7 +65,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1. #include <sys/mbuf.h> #include <sys/module.h> #include <sys/mutex.h> -#include <sys/pcq.h> #include <sys/percpu.h> #include <sys/pserialize.h> #include <sys/psref.h> @@ -85,6 +84,7 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1. #include <net/if.h> #include <net/if_types.h> #include <net/if_wg.h> +#include <net/pktqueue.h> #include <net/route.h> #include <netinet/in.h> @@ -124,8 +124,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1. * runs event handlers * - It has its own two routing tables: one is for IPv4 and the other IPv6 * - struct wg_peer is a representative of a peer - * - It has a softint that is used to send packets over an wg interface - * to a peer * - It has a pair of session instances (struct wg_session) * - It has a pair of endpoint instances (struct wg_sockaddr) * - Normally one endpoint is used and the second one is used only on @@ -562,12 +560,12 @@ struct wg_peer { /* The preshared key (optional) */ uint8_t wgp_psk[WG_PRESHARED_KEY_LEN]; - void *wgp_si; - pcq_t *wgp_q; - struct wg_session *wgp_session_stable; struct wg_session *wgp_session_unstable; + /* first outgoing packet awaiting session initiation */ + struct mbuf *wgp_pending; + /* timestamp in big-endian */ wg_timestamp_t wgp_timestamp_latest_init; @@ -706,6 +704,7 @@ static int wg_bind_port(struct wg_softc static int wg_init(struct ifnet *); static void wg_stop(struct ifnet *, int); +static void wgintr(void *); static void wg_purge_pending_packets(struct wg_peer *); static int wg_clone_create(struct if_clone *, int); @@ -788,6 +787,7 @@ struct psref_class *wg_psref_class __rea static struct if_clone wg_cloner = IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy); +static struct pktqueue *wg_pktq __read_mostly; void wgattach(int); /* ARGSUSED */ @@ -808,6 +808,10 @@ wginit(void) mutex_init(&wg_softcs.lock, MUTEX_DEFAULT, IPL_NONE); LIST_INIT(&wg_softcs.list); + + wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL); + KASSERT(wg_pktq != NULL); + if_clone_attach(&wg_cloner); } @@ -1650,6 +1654,9 @@ wg_send_handshake_msg_init(struct wg_sof MIN(wg_rekey_timeout, INT_MAX/hz) * hz); } else { wg_put_session_index(wg, wgs); + /* Initiation failed; toss packet waiting for it if any. */ + if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) + m_freem(m); } return error; @@ -1780,6 +1787,7 @@ wg_handle_msg_resp(struct wg_softc *wg, int error; uint8_t mac1[WG_MAC_LEN]; struct wg_session *wgs_prev; + struct mbuf *m; wg_algo_mac_mac1(mac1, sizeof(mac1), wg->wg_pubkey, sizeof(wg->wg_pubkey), @@ -1932,17 +1940,21 @@ wg_handle_msg_resp(struct wg_softc *wg, wg_update_endpoint_if_necessary(wgp, src); /* - * Send something immediately (same as the official implementation) - * XXX if there are pending data packets, we don't need to send - * a keepalive message. + * If we had a data packet queued up, send it; otherwise send a + * keepalive message -- either way we have to send something + * immediately or else the responder will never answer. */ - wg_send_keepalive_msg(wgp, wgs); + if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) { + const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m) - /* Anyway run a softint to flush pending packets */ - kpreempt_disable(); - softint_schedule(wgp->wgp_si); - kpreempt_enable(); - WG_TRACE("softint scheduled"); + M_SETCTX(m, wgp); + if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) { + WGLOG(LOG_ERR, "pktq full, dropping\n"); + m_freem(m); + } + } else { + wg_send_keepalive_msg(wgp, wgs); + } if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) { /* Wait for wg_get_stable_session to drain. */ @@ -2874,6 +2886,7 @@ static void wg_task_establish_session(struct wg_softc *wg, struct wg_peer *wgp) { struct wg_session *wgs, *wgs_prev; + struct mbuf *m; KASSERT(mutex_owned(wgp->wgp_lock)); @@ -2896,6 +2909,17 @@ wg_task_establish_session(struct wg_soft wgp->wgp_last_sent_mac1_valid = false; wgp->wgp_last_sent_cookie_valid = false; + /* If we had a data packet queued up, send it. */ + if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) { + const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m) + + M_SETCTX(m, wgp); + if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) { + WGLOG(LOG_ERR, "pktq full, dropping\n"); + m_freem(m); + } + } + if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) { /* Wait for wg_get_stable_session to drain. */ pserialize_perform(wgp->wgp_psz); @@ -2911,11 +2935,6 @@ wg_task_establish_session(struct wg_soft wg_clear_states(wgs_prev); wgs_prev->wgs_state = WGS_STATE_UNKNOWN; } - - /* Anyway run a softint to flush pending packets */ - kpreempt_disable(); - softint_schedule(wgp->wgp_si); - kpreempt_enable(); } static void @@ -3290,29 +3309,32 @@ wg_session_hit_limits(struct wg_session } static void -wg_peer_softint(void *arg) +wgintr(void *cookie) { - struct wg_peer *wgp = arg; + struct wg_peer *wgp; struct wg_session *wgs; struct mbuf *m; struct psref psref; - if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) { - /* XXX how to treat? */ - WG_TRACE("skipped"); - return; - } - if (wg_session_hit_limits(wgs)) { - wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE); - goto out; - } - WG_TRACE("running"); - - while ((m = pcq_get(wgp->wgp_q)) != NULL) { + while ((m = pktq_dequeue(wg_pktq)) != NULL) { + wgp = M_GETCTX(m, struct wg_peer *); + if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) { + WG_TRACE("no stable session"); + wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE); + goto next0; + } + if (__predict_false(wg_session_hit_limits(wgs))) { + WG_TRACE("stable session hit limits"); + wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE); + goto next1; + } wg_send_data_msg(wgp, wgs, m); + m = NULL; /* consumed */ +next1: wg_put_session(wgs, &psref); +next0: if (m) + m_freem(m); + /* XXX Yield to avoid userland starvation? */ } -out: - wg_put_session(wgs, &psref); } static void @@ -3328,9 +3350,9 @@ wg_purge_pending_packets(struct wg_peer { struct mbuf *m; - while ((m = pcq_get(wgp->wgp_q)) != NULL) { + if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) m_freem(m); - } + pktq_barrier(wg_pktq); } static void @@ -3351,8 +3373,6 @@ wg_alloc_peer(struct wg_softc *wg) wgp = kmem_zalloc(sizeof(*wgp), KM_SLEEP); wgp->wgp_sc = wg; - wgp->wgp_q = pcq_create(1024, KM_SLEEP); - wgp->wgp_si = softint_establish(SOFTINT_NET, wg_peer_softint, wgp); callout_init(&wgp->wgp_rekey_timer, CALLOUT_MPSAFE); callout_setfunc(&wgp->wgp_rekey_timer, wg_rekey_timer, wgp); callout_init(&wgp->wgp_handshake_timeout_timer, CALLOUT_MPSAFE); @@ -3430,7 +3450,6 @@ wg_destroy_peer(struct wg_peer *wgp) wg_purge_pending_packets(wgp); /* Halt all packet processing and timeouts. */ - softint_disestablish(wgp->wgp_si); callout_halt(&wgp->wgp_rekey_timer, NULL); callout_halt(&wgp->wgp_handshake_timeout_timer, NULL); callout_halt(&wgp->wgp_session_dtor_timer, NULL); @@ -3467,7 +3486,6 @@ wg_destroy_peer(struct wg_peer *wgp) kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0)); pserialize_destroy(wgp->wgp_psz); - pcq_destroy(wgp->wgp_q); mutex_obj_free(wgp->wgp_lock); kmem_free(wgp, sizeof(*wgp)); @@ -3581,21 +3599,28 @@ wg_if_attach(struct wg_softc *wg) return 0; } +static void +wg_if_detach(struct wg_softc *wg) +{ + struct ifnet *ifp = &wg->wg_if; + + bpf_detach(ifp); + if_detach(ifp); +} + static int wg_clone_create(struct if_clone *ifc, int unit) { struct wg_softc *wg; int error; - wg = kmem_zalloc(sizeof(struct wg_softc), KM_SLEEP); + wg = kmem_zalloc(sizeof(*wg), KM_SLEEP); if_initname(&wg->wg_if, ifc->ifc_name, unit); error = wg_worker_init(wg); - if (error != 0) { - kmem_free(wg, sizeof(struct wg_softc)); - return error; - } + if (error) + goto fail0; rn_inithead((void **)&wg->wg_rtable_ipv4, offsetof(struct sockaddr_in, sin_addr) * NBBY); @@ -3613,26 +3638,34 @@ wg_clone_create(struct if_clone *ifc, in wg->wg_ops = &wg_ops_rumpkernel; error = wg_if_attach(wg); - if (error != 0) { - wg_worker_destroy(wg); - if (wg->wg_rtable_ipv4 != NULL) - free(wg->wg_rtable_ipv4, M_RTABLE); - if (wg->wg_rtable_ipv6 != NULL) - free(wg->wg_rtable_ipv6, M_RTABLE); - PSLIST_DESTROY(&wg->wg_peers); - mutex_obj_free(wg->wg_lock); - thmap_destroy(wg->wg_sessions_byindex); - thmap_destroy(wg->wg_peers_byname); - thmap_destroy(wg->wg_peers_bypubkey); - kmem_free(wg, sizeof(struct wg_softc)); - return error; - } + if (error) + goto fail1; mutex_enter(&wg_softcs.lock); LIST_INSERT_HEAD(&wg_softcs.list, wg, wg_list); mutex_exit(&wg_softcs.lock); return 0; + +fail2: __unused + mutex_enter(&wg_softcs.lock); + LIST_REMOVE(wg, wg_list); + mutex_exit(&wg_softcs.lock); + wg_if_detach(wg); + wg_destroy_all_peers(wg); + rw_obj_free(wg->wg_rwlock); + mutex_obj_free(wg->wg_lock); + thmap_destroy(wg->wg_sessions_byindex); + thmap_destroy(wg->wg_peers_byname); + thmap_destroy(wg->wg_peers_bypubkey); + PSLIST_DESTROY(&wg->wg_peers); + if (wg->wg_rtable_ipv6 != NULL) + free(wg->wg_rtable_ipv6, M_RTABLE); + if (wg->wg_rtable_ipv4 != NULL) + free(wg->wg_rtable_ipv4, M_RTABLE); +fail1: wg_worker_destroy(wg); +fail0: kmem_free(wg, sizeof(*wg)); + return error; } static int @@ -3640,10 +3673,6 @@ wg_clone_destroy(struct ifnet *ifp) { struct wg_softc *wg = container_of(ifp, struct wg_softc, wg_if); - mutex_enter(&wg_softcs.lock); - LIST_REMOVE(wg, wg_list); - mutex_exit(&wg_softcs.lock); - #ifdef WG_RUMPKERNEL if (wg_user_mode(wg)) { rumpuser_wg_destroy(wg->wg_user); @@ -3651,23 +3680,23 @@ wg_clone_destroy(struct ifnet *ifp) } #endif - bpf_detach(ifp); - if_detach(ifp); - wg_worker_destroy(wg); + mutex_enter(&wg_softcs.lock); + LIST_REMOVE(wg, wg_list); + mutex_exit(&wg_softcs.lock); + wg_if_detach(wg); wg_destroy_all_peers(wg); + rw_obj_free(wg->wg_rwlock); + mutex_obj_free(wg->wg_lock); + thmap_destroy(wg->wg_sessions_byindex); + thmap_destroy(wg->wg_peers_byname); + thmap_destroy(wg->wg_peers_bypubkey); + PSLIST_DESTROY(&wg->wg_peers); if (wg->wg_rtable_ipv4 != NULL) free(wg->wg_rtable_ipv4, M_RTABLE); if (wg->wg_rtable_ipv6 != NULL) free(wg->wg_rtable_ipv6, M_RTABLE); - - PSLIST_DESTROY(&wg->wg_peers); - thmap_destroy(wg->wg_sessions_byindex); - thmap_destroy(wg->wg_peers_byname); - thmap_destroy(wg->wg_peers_bypubkey); - mutex_obj_free(wg->wg_lock); - rw_obj_free(wg->wg_rwlock); - - kmem_free(wg, sizeof(struct wg_softc)); + wg_worker_destroy(wg); + kmem_free(wg, sizeof(*wg)); return 0; } @@ -3739,7 +3768,7 @@ wg_output(struct ifnet *ifp, struct mbuf error = if_tunnel_check_nesting(ifp, m, 1); if (error) { WGLOG(LOG_ERR, "tunneling loop detected and packet dropped\n"); - goto out; + goto out0; } IFQ_CLASSIFY(&ifp->if_snd, m, dst->sa_family); @@ -3752,37 +3781,50 @@ wg_output(struct ifnet *ifp, struct mbuf if (wgp == NULL) { WG_TRACE("peer not found"); error = EHOSTUNREACH; - goto out; + goto out0; } /* Clear checksum-offload flags. */ m->m_pkthdr.csum_flags = 0; m->m_pkthdr.csum_data = 0; - if (!pcq_put(wgp->wgp_q, m)) { - error = ENOBUFS; - goto out; + /* Check whether there's an established session. */ + wgs = wg_get_stable_session(wgp, &wgs_psref); + if (wgs == NULL) { + /* + * No established session. If we're the first to try + * sending data, schedule a handshake and queue the + * packet for when the handshake is done; otherwise + * just drop the packet and let the ongoing handshake + * attempt continue. We could queue more data packets + * but it's not clear that's worthwhile. + */ + if (atomic_cas_ptr(&wgp->wgp_pending, NULL, m) == NULL) { + m = NULL; /* consume */ + WG_TRACE("queued first packet; init handshake"); + wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE); + } else { + WG_TRACE("first packet already queued, dropping"); + } + goto out1; } - m = NULL; /* consumed */ - wgs = wg_get_stable_session(wgp, &wgs_psref); - if (wgs != NULL && !wg_session_hit_limits(wgs)) { - kpreempt_disable(); - softint_schedule(wgp->wgp_si); - kpreempt_enable(); - WG_TRACE("softint scheduled"); - } else { - wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE); - WG_TRACE("softint NOT scheduled"); + /* There's an established session. Toss it in the queue. */ + kpreempt_disable(); + const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m) + M_SETCTX(m, wgp); + if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) { + WGLOG(LOG_ERR, "pktq full, dropping\n"); + error = ENOBUFS; + goto out2; } + m = NULL; /* consumed */ error = 0; +out2: kpreempt_enable(); -out: - if (wgs != NULL) - wg_put_session(wgs, &wgs_psref); - if (wgp != NULL) - wg_put_peer(wgp, &wgp_psref); - if (m != NULL) + wg_put_session(wgs, &wgs_psref); +out1: wg_put_peer(wgp, &wgp_psref); +out0: if (m) m_freem(m); curlwp_bindx(bound); return error;