Module Name:    src
Committed By:   riastradh
Date:           Mon Sep  7 01:14:42 UTC 2020

Modified Files:
        src/sys/net: if_wg.c

Log Message:
wg: Use a global pktqueue rather than a per-peer pcq.

- Improves scalability -- won't hit limit on softints no matter how
  many peers there are.
- Improves parallelism -- softint was kernel-locked to serialize
  access to the pcq.
- Requires per-peer queue on handshake init to avoid dropping first
  packet.
  . Per-peer queue is currently a single packet -- should serve well
    enough for pings, dns queries, tcp connections, &c.


To generate a diff of this commit:
cvs rdiff -u -r1.53 -r1.54 src/sys/net/if_wg.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/sys/net/if_wg.c
diff -u src/sys/net/if_wg.c:1.53 src/sys/net/if_wg.c:1.54
--- src/sys/net/if_wg.c:1.53	Mon Sep  7 00:33:08 2020
+++ src/sys/net/if_wg.c	Mon Sep  7 01:14:42 2020
@@ -1,4 +1,4 @@
-/*	$NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $	*/
+/*	$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $	*/
 
 /*
  * Copyright (C) Ryota Ozaki <ozaki.ry...@gmail.com>
@@ -41,7 +41,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.53 2020/09/07 00:33:08 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
 
 #ifdef _KERNEL_OPT
 #include "opt_inet.h"
@@ -65,7 +65,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
 #include <sys/mbuf.h>
 #include <sys/module.h>
 #include <sys/mutex.h>
-#include <sys/pcq.h>
 #include <sys/percpu.h>
 #include <sys/pserialize.h>
 #include <sys/psref.h>
@@ -85,6 +84,7 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
 #include <net/if.h>
 #include <net/if_types.h>
 #include <net/if_wg.h>
+#include <net/pktqueue.h>
 #include <net/route.h>
 
 #include <netinet/in.h>
@@ -124,8 +124,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
  *     runs event handlers
  *   - It has its own two routing tables: one is for IPv4 and the other IPv6
  * - struct wg_peer is a representative of a peer
- *   - It has a softint that is used to send packets over an wg interface
- *     to a peer
  *   - It has a pair of session instances (struct wg_session)
  *   - It has a pair of endpoint instances (struct wg_sockaddr)
  *     - Normally one endpoint is used and the second one is used only on
@@ -562,12 +560,12 @@ struct wg_peer {
 			/* The preshared key (optional) */
 	uint8_t		wgp_psk[WG_PRESHARED_KEY_LEN];
 
-	void		*wgp_si;
-	pcq_t		*wgp_q;
-
 	struct wg_session	*wgp_session_stable;
 	struct wg_session	*wgp_session_unstable;
 
+	/* first outgoing packet awaiting session initiation */
+	struct mbuf		*wgp_pending;
+
 	/* timestamp in big-endian */
 	wg_timestamp_t	wgp_timestamp_latest_init;
 
@@ -706,6 +704,7 @@ static int	wg_bind_port(struct wg_softc 
 static int	wg_init(struct ifnet *);
 static void	wg_stop(struct ifnet *, int);
 
+static void	wgintr(void *);
 static void	wg_purge_pending_packets(struct wg_peer *);
 
 static int	wg_clone_create(struct if_clone *, int);
@@ -788,6 +787,7 @@ struct psref_class *wg_psref_class __rea
 static struct if_clone wg_cloner =
     IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
 
+static struct pktqueue *wg_pktq __read_mostly;
 
 void wgattach(int);
 /* ARGSUSED */
@@ -808,6 +808,10 @@ wginit(void)
 
 	mutex_init(&wg_softcs.lock, MUTEX_DEFAULT, IPL_NONE);
 	LIST_INIT(&wg_softcs.list);
+
+	wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
+	KASSERT(wg_pktq != NULL);
+
 	if_clone_attach(&wg_cloner);
 }
 
@@ -1650,6 +1654,9 @@ wg_send_handshake_msg_init(struct wg_sof
 		    MIN(wg_rekey_timeout, INT_MAX/hz) * hz);
 	} else {
 		wg_put_session_index(wg, wgs);
+		/* Initiation failed; toss packet waiting for it if any.  */
+		if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
+			m_freem(m);
 	}
 
 	return error;
@@ -1780,6 +1787,7 @@ wg_handle_msg_resp(struct wg_softc *wg, 
 	int error;
 	uint8_t mac1[WG_MAC_LEN];
 	struct wg_session *wgs_prev;
+	struct mbuf *m;
 
 	wg_algo_mac_mac1(mac1, sizeof(mac1),
 	    wg->wg_pubkey, sizeof(wg->wg_pubkey),
@@ -1932,17 +1940,21 @@ wg_handle_msg_resp(struct wg_softc *wg, 
 	wg_update_endpoint_if_necessary(wgp, src);
 
 	/*
-	 * Send something immediately (same as the official implementation)
-	 * XXX if there are pending data packets, we don't need to send
-	 *     a keepalive message.
+	 * If we had a data packet queued up, send it; otherwise send a
+	 * keepalive message -- either way we have to send something
+	 * immediately or else the responder will never answer.
 	 */
-	wg_send_keepalive_msg(wgp, wgs);
+	if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+		const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
 
-	/* Anyway run a softint to flush pending packets */
-	kpreempt_disable();
-	softint_schedule(wgp->wgp_si);
-	kpreempt_enable();
-	WG_TRACE("softint scheduled");
+		M_SETCTX(m, wgp);
+		if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+			WGLOG(LOG_ERR, "pktq full, dropping\n");
+			m_freem(m);
+		}
+	} else {
+		wg_send_keepalive_msg(wgp, wgs);
+	}
 
 	if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
 		/* Wait for wg_get_stable_session to drain.  */
@@ -2874,6 +2886,7 @@ static void
 wg_task_establish_session(struct wg_softc *wg, struct wg_peer *wgp)
 {
 	struct wg_session *wgs, *wgs_prev;
+	struct mbuf *m;
 
 	KASSERT(mutex_owned(wgp->wgp_lock));
 
@@ -2896,6 +2909,17 @@ wg_task_establish_session(struct wg_soft
 	wgp->wgp_last_sent_mac1_valid = false;
 	wgp->wgp_last_sent_cookie_valid = false;
 
+	/* If we had a data packet queued up, send it.  */
+	if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL) {
+		const uint32_t h = curcpu()->ci_index; // pktq_rps_hash(m)
+
+		M_SETCTX(m, wgp);
+		if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+			WGLOG(LOG_ERR, "pktq full, dropping\n");
+			m_freem(m);
+		}
+	}
+
 	if (wgs_prev->wgs_state == WGS_STATE_ESTABLISHED) {
 		/* Wait for wg_get_stable_session to drain.  */
 		pserialize_perform(wgp->wgp_psz);
@@ -2911,11 +2935,6 @@ wg_task_establish_session(struct wg_soft
 		wg_clear_states(wgs_prev);
 		wgs_prev->wgs_state = WGS_STATE_UNKNOWN;
 	}
-
-	/* Anyway run a softint to flush pending packets */
-	kpreempt_disable();
-	softint_schedule(wgp->wgp_si);
-	kpreempt_enable();
 }
 
 static void
@@ -3290,29 +3309,32 @@ wg_session_hit_limits(struct wg_session 
 }
 
 static void
-wg_peer_softint(void *arg)
+wgintr(void *cookie)
 {
-	struct wg_peer *wgp = arg;
+	struct wg_peer *wgp;
 	struct wg_session *wgs;
 	struct mbuf *m;
 	struct psref psref;
 
-	if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
-		/* XXX how to treat? */
-		WG_TRACE("skipped");
-		return;
-	}
-	if (wg_session_hit_limits(wgs)) {
-		wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
-		goto out;
-	}
-	WG_TRACE("running");
-
-	while ((m = pcq_get(wgp->wgp_q)) != NULL) {
+	while ((m = pktq_dequeue(wg_pktq)) != NULL) {
+		wgp = M_GETCTX(m, struct wg_peer *);
+		if ((wgs = wg_get_stable_session(wgp, &psref)) == NULL) {
+			WG_TRACE("no stable session");
+			wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+			goto next0;
+		}
+		if (__predict_false(wg_session_hit_limits(wgs))) {
+			WG_TRACE("stable session hit limits");
+			wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+			goto next1;
+		}
 		wg_send_data_msg(wgp, wgs, m);
+		m = NULL;	/* consumed */
+next1:		wg_put_session(wgs, &psref);
+next0:		if (m)
+			m_freem(m);
+		/* XXX Yield to avoid userland starvation?  */
 	}
-out:
-	wg_put_session(wgs, &psref);
 }
 
 static void
@@ -3328,9 +3350,9 @@ wg_purge_pending_packets(struct wg_peer 
 {
 	struct mbuf *m;
 
-	while ((m = pcq_get(wgp->wgp_q)) != NULL) {
+	if ((m = atomic_swap_ptr(&wgp->wgp_pending, NULL)) != NULL)
 		m_freem(m);
-	}
+	pktq_barrier(wg_pktq);
 }
 
 static void
@@ -3351,8 +3373,6 @@ wg_alloc_peer(struct wg_softc *wg)
 	wgp = kmem_zalloc(sizeof(*wgp), KM_SLEEP);
 
 	wgp->wgp_sc = wg;
-	wgp->wgp_q = pcq_create(1024, KM_SLEEP);
-	wgp->wgp_si = softint_establish(SOFTINT_NET, wg_peer_softint, wgp);
 	callout_init(&wgp->wgp_rekey_timer, CALLOUT_MPSAFE);
 	callout_setfunc(&wgp->wgp_rekey_timer, wg_rekey_timer, wgp);
 	callout_init(&wgp->wgp_handshake_timeout_timer, CALLOUT_MPSAFE);
@@ -3430,7 +3450,6 @@ wg_destroy_peer(struct wg_peer *wgp)
 	wg_purge_pending_packets(wgp);
 
 	/* Halt all packet processing and timeouts.  */
-	softint_disestablish(wgp->wgp_si);
 	callout_halt(&wgp->wgp_rekey_timer, NULL);
 	callout_halt(&wgp->wgp_handshake_timeout_timer, NULL);
 	callout_halt(&wgp->wgp_session_dtor_timer, NULL);
@@ -3467,7 +3486,6 @@ wg_destroy_peer(struct wg_peer *wgp)
 	kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0));
 
 	pserialize_destroy(wgp->wgp_psz);
-	pcq_destroy(wgp->wgp_q);
 	mutex_obj_free(wgp->wgp_lock);
 
 	kmem_free(wgp, sizeof(*wgp));
@@ -3581,21 +3599,28 @@ wg_if_attach(struct wg_softc *wg)
 	return 0;
 }
 
+static void
+wg_if_detach(struct wg_softc *wg)
+{
+	struct ifnet *ifp = &wg->wg_if;
+
+	bpf_detach(ifp);
+	if_detach(ifp);
+}
+
 static int
 wg_clone_create(struct if_clone *ifc, int unit)
 {
 	struct wg_softc *wg;
 	int error;
 
-	wg = kmem_zalloc(sizeof(struct wg_softc), KM_SLEEP);
+	wg = kmem_zalloc(sizeof(*wg), KM_SLEEP);
 
 	if_initname(&wg->wg_if, ifc->ifc_name, unit);
 
 	error = wg_worker_init(wg);
-	if (error != 0) {
-		kmem_free(wg, sizeof(struct wg_softc));
-		return error;
-	}
+	if (error)
+		goto fail0;
 
 	rn_inithead((void **)&wg->wg_rtable_ipv4,
 	    offsetof(struct sockaddr_in, sin_addr) * NBBY);
@@ -3613,26 +3638,34 @@ wg_clone_create(struct if_clone *ifc, in
 	wg->wg_ops = &wg_ops_rumpkernel;
 
 	error = wg_if_attach(wg);
-	if (error != 0) {
-		wg_worker_destroy(wg);
-		if (wg->wg_rtable_ipv4 != NULL)
-			free(wg->wg_rtable_ipv4, M_RTABLE);
-		if (wg->wg_rtable_ipv6 != NULL)
-			free(wg->wg_rtable_ipv6, M_RTABLE);
-		PSLIST_DESTROY(&wg->wg_peers);
-		mutex_obj_free(wg->wg_lock);
-		thmap_destroy(wg->wg_sessions_byindex);
-		thmap_destroy(wg->wg_peers_byname);
-		thmap_destroy(wg->wg_peers_bypubkey);
-		kmem_free(wg, sizeof(struct wg_softc));
-		return error;
-	}
+	if (error)
+		goto fail1;
 
 	mutex_enter(&wg_softcs.lock);
 	LIST_INSERT_HEAD(&wg_softcs.list, wg, wg_list);
 	mutex_exit(&wg_softcs.lock);
 
 	return 0;
+
+fail2: __unused
+	mutex_enter(&wg_softcs.lock);
+	LIST_REMOVE(wg, wg_list);
+	mutex_exit(&wg_softcs.lock);
+	wg_if_detach(wg);
+	wg_destroy_all_peers(wg);
+	rw_obj_free(wg->wg_rwlock);
+	mutex_obj_free(wg->wg_lock);
+	thmap_destroy(wg->wg_sessions_byindex);
+	thmap_destroy(wg->wg_peers_byname);
+	thmap_destroy(wg->wg_peers_bypubkey);
+	PSLIST_DESTROY(&wg->wg_peers);
+	if (wg->wg_rtable_ipv6 != NULL)
+		free(wg->wg_rtable_ipv6, M_RTABLE);
+	if (wg->wg_rtable_ipv4 != NULL)
+		free(wg->wg_rtable_ipv4, M_RTABLE);
+fail1:	wg_worker_destroy(wg);
+fail0:	kmem_free(wg, sizeof(*wg));
+	return error;
 }
 
 static int
@@ -3640,10 +3673,6 @@ wg_clone_destroy(struct ifnet *ifp)
 {
 	struct wg_softc *wg = container_of(ifp, struct wg_softc, wg_if);
 
-	mutex_enter(&wg_softcs.lock);
-	LIST_REMOVE(wg, wg_list);
-	mutex_exit(&wg_softcs.lock);
-
 #ifdef WG_RUMPKERNEL
 	if (wg_user_mode(wg)) {
 		rumpuser_wg_destroy(wg->wg_user);
@@ -3651,23 +3680,23 @@ wg_clone_destroy(struct ifnet *ifp)
 	}
 #endif
 
-	bpf_detach(ifp);
-	if_detach(ifp);
-	wg_worker_destroy(wg);
+	mutex_enter(&wg_softcs.lock);
+	LIST_REMOVE(wg, wg_list);
+	mutex_exit(&wg_softcs.lock);
+	wg_if_detach(wg);
 	wg_destroy_all_peers(wg);
+	rw_obj_free(wg->wg_rwlock);
+	mutex_obj_free(wg->wg_lock);
+	thmap_destroy(wg->wg_sessions_byindex);
+	thmap_destroy(wg->wg_peers_byname);
+	thmap_destroy(wg->wg_peers_bypubkey);
+	PSLIST_DESTROY(&wg->wg_peers);
 	if (wg->wg_rtable_ipv4 != NULL)
 		free(wg->wg_rtable_ipv4, M_RTABLE);
 	if (wg->wg_rtable_ipv6 != NULL)
 		free(wg->wg_rtable_ipv6, M_RTABLE);
-
-	PSLIST_DESTROY(&wg->wg_peers);
-	thmap_destroy(wg->wg_sessions_byindex);
-	thmap_destroy(wg->wg_peers_byname);
-	thmap_destroy(wg->wg_peers_bypubkey);
-	mutex_obj_free(wg->wg_lock);
-	rw_obj_free(wg->wg_rwlock);
-
-	kmem_free(wg, sizeof(struct wg_softc));
+	wg_worker_destroy(wg);
+	kmem_free(wg, sizeof(*wg));
 
 	return 0;
 }
@@ -3739,7 +3768,7 @@ wg_output(struct ifnet *ifp, struct mbuf
 	error = if_tunnel_check_nesting(ifp, m, 1);
 	if (error) {
 		WGLOG(LOG_ERR, "tunneling loop detected and packet dropped\n");
-		goto out;
+		goto out0;
 	}
 
 	IFQ_CLASSIFY(&ifp->if_snd, m, dst->sa_family);
@@ -3752,37 +3781,50 @@ wg_output(struct ifnet *ifp, struct mbuf
 	if (wgp == NULL) {
 		WG_TRACE("peer not found");
 		error = EHOSTUNREACH;
-		goto out;
+		goto out0;
 	}
 
 	/* Clear checksum-offload flags. */
 	m->m_pkthdr.csum_flags = 0;
 	m->m_pkthdr.csum_data = 0;
 
-	if (!pcq_put(wgp->wgp_q, m)) {
-		error = ENOBUFS;
-		goto out;
+	/* Check whether there's an established session.  */
+	wgs = wg_get_stable_session(wgp, &wgs_psref);
+	if (wgs == NULL) {
+		/*
+		 * No established session.  If we're the first to try
+		 * sending data, schedule a handshake and queue the
+		 * packet for when the handshake is done; otherwise
+		 * just drop the packet and let the ongoing handshake
+		 * attempt continue.  We could queue more data packets
+		 * but it's not clear that's worthwhile.
+		 */
+		if (atomic_cas_ptr(&wgp->wgp_pending, NULL, m) == NULL) {
+			m = NULL; /* consume */
+			WG_TRACE("queued first packet; init handshake");
+			wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
+		} else {
+			WG_TRACE("first packet already queued, dropping");
+		}
+		goto out1;
 	}
-	m = NULL;		/* consumed */
 
-	wgs = wg_get_stable_session(wgp, &wgs_psref);
-	if (wgs != NULL && !wg_session_hit_limits(wgs)) {
-		kpreempt_disable();
-		softint_schedule(wgp->wgp_si);
-		kpreempt_enable();
-		WG_TRACE("softint scheduled");
-	} else {
-		wg_schedule_peer_task(wgp, WGP_TASK_SEND_INIT_MESSAGE);
-		WG_TRACE("softint NOT scheduled");
+	/* There's an established session.  Toss it in the queue.  */
+	kpreempt_disable();
+	const uint32_t h = curcpu()->ci_index;	// pktq_rps_hash(m)
+	M_SETCTX(m, wgp);
+	if (__predict_false(!pktq_enqueue(wg_pktq, m, h))) {
+		WGLOG(LOG_ERR, "pktq full, dropping\n");
+		error = ENOBUFS;
+		goto out2;
 	}
+	m = NULL;		/* consumed */
 	error = 0;
+out2:	kpreempt_enable();
 
-out:
-	if (wgs != NULL)
-		wg_put_session(wgs, &wgs_psref);
-	if (wgp != NULL)
-		wg_put_peer(wgp, &wgp_psref);
-	if (m != NULL)
+	wg_put_session(wgs, &wgs_psref);
+out1:	wg_put_peer(wgp, &wgp_psref);
+out0:	if (m)
 		m_freem(m);
 	curlwp_bindx(bound);
 	return error;

Reply via email to