Module Name:    src
Committed By:   riastradh
Date:           Mon Sep  7 01:15:25 UTC 2020

Modified Files:
        src/sys/net: if_wg.c

Log Message:
wg: Use threadpool(9) and workqueue(9) for asynchronous tasks.

- Using threadpool(9) job per interface to receive incoming handshake
  messages gives the same concurrency for active interfaces but
  doesn't waste kthreads for inactive ones.

  => Can't really do this with a global workqueue(9) because there's
     no bound on the amount of time wg_receive_packets() might run
     for; we really need separate threads or threadpool jobs in order
     to avoid having one interface starve all the others.

- Using a global workqueue(9) for asynchronous peer tasks avoids
  creating unnecessary kthreads.

  => Each task does a more or less bounded amount of work, so it's OK
     to share a global workqueue -- there's no advantage to adding
     concurrency for what is almost certainly going to be CPU-bound
     asymmetric crypto.

  => This way we don't need a thread per peer or iteration over a
     list of all peers, so the task mechanism should no longer be a
     bottleneck to scaling to thousands of peers.

XXX This doesn't distribute the load across CPUs -- it keeps it on
the same CPU where the packet came in.  Should consider doing
something to balance the load -- maybe note if the current CPU is
loaded, and if so, sort CPUs by queue length or some other measure of
load and pick the least loaded one or something.


To generate a diff of this commit:
cvs rdiff -u -r1.54 -r1.55 src/sys/net/if_wg.c

Please note that diffs are not public domain; they are subject to the
copyright notices on the relevant files.

Modified files:

Index: src/sys/net/if_wg.c
diff -u src/sys/net/if_wg.c:1.54 src/sys/net/if_wg.c:1.55
--- src/sys/net/if_wg.c:1.54	Mon Sep  7 01:14:42 2020
+++ src/sys/net/if_wg.c	Mon Sep  7 01:15:25 2020
@@ -1,4 +1,4 @@
-/*	$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $	*/
+/*	$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $	*/
 
 /*
  * Copyright (C) Ryota Ozaki <ozaki.ry...@gmail.com>
@@ -41,7 +41,7 @@
  */
 
 #include <sys/cdefs.h>
-__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.54 2020/09/07 01:14:42 riastradh Exp $");
+__KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.55 2020/09/07 01:15:25 riastradh Exp $");
 
 #ifdef _KERNEL_OPT
 #include "opt_inet.h"
@@ -61,7 +61,6 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
 #include <sys/ioctl.h>
 #include <sys/kernel.h>
 #include <sys/kmem.h>
-#include <sys/kthread.h>
 #include <sys/mbuf.h>
 #include <sys/module.h>
 #include <sys/mutex.h>
@@ -77,8 +76,10 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
 #include <sys/syslog.h>
 #include <sys/systm.h>
 #include <sys/thmap.h>
+#include <sys/threadpool.h>
 #include <sys/time.h>
 #include <sys/timespec.h>
+#include <sys/workqueue.h>
 
 #include <net/bpf.h>
 #include <net/if.h>
@@ -120,10 +121,11 @@ __KERNEL_RCSID(0, "$NetBSD: if_wg.c,v 1.
  * Data structures
  * - struct wg_softc is an instance of wg interfaces
  *   - It has a list of peers (struct wg_peer)
- *   - It has a kthread that sends/receives handshake messages and
+ *   - It has a threadpool job that sends/receives handshake messages and
  *     runs event handlers
  *   - It has its own two routing tables: one is for IPv4 and the other IPv6
  * - struct wg_peer is a representative of a peer
+ *   - It has a struct work to handle handshakes and timer tasks
  *   - It has a pair of session instances (struct wg_session)
  *   - It has a pair of endpoint instances (struct wg_sockaddr)
  *     - Normally one endpoint is used and the second one is used only on
@@ -446,18 +448,6 @@ sliwin_update(struct sliwin *W, uint64_t
 	return 0;
 }
 
-struct wg_worker {
-	kmutex_t	wgw_lock;
-	kcondvar_t	wgw_cv;
-	bool		wgw_todie;
-	struct socket	*wgw_so4;
-	struct socket	*wgw_so6;
-	int		wgw_wakeup_reasons;
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4	__BIT(0)
-#define WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6	__BIT(1)
-#define WG_WAKEUP_REASON_PEER			__BIT(2)
-};
-
 struct wg_session {
 	struct wg_peer	*wgs_peer;
 	struct psref_target
@@ -550,6 +540,7 @@ struct wg_peer {
 	pserialize_t		wgp_psz;
 	struct psref_target	wgp_psref;
 	kmutex_t		*wgp_lock;
+	kmutex_t		*wgp_intr_lock;
 
 	uint8_t	wgp_pubkey[WG_STATIC_KEY_LEN];
 	struct wg_sockaddr	*wgp_endpoint;
@@ -594,7 +585,8 @@ struct wg_peer {
 
 	struct wg_ppsratecheck	wgp_ppsratecheck;
 
-	volatile unsigned int	wgp_tasks;
+	struct work		wgp_work;
+	unsigned int		wgp_tasks;
 #define WGP_TASK_SEND_INIT_MESSAGE		__BIT(0)
 #define WGP_TASK_RETRY_HANDSHAKE		__BIT(1)
 #define WGP_TASK_ESTABLISH_SESSION		__BIT(2)
@@ -609,6 +601,7 @@ struct wg_softc {
 	struct ifnet	wg_if;
 	LIST_ENTRY(wg_softc) wg_list;
 	kmutex_t	*wg_lock;
+	kmutex_t	*wg_intr_lock;
 	krwlock_t	*wg_rwlock;
 
 	uint8_t		wg_privkey[WG_STATIC_KEY_LEN];
@@ -621,11 +614,21 @@ struct wg_softc {
 	struct thmap	*wg_sessions_byindex;
 	uint16_t	wg_listen_port;
 
-	struct wg_worker	*wg_worker;
-	lwp_t			*wg_worker_lwp;
+	struct threadpool	*wg_threadpool;
 
+	struct threadpool_job	wg_job;
+	int			wg_upcalls;
+#define	WG_UPCALL_INET	__BIT(0)
+#define	WG_UPCALL_INET6	__BIT(1)
+
+#ifdef INET
+	struct socket		*wg_so4;
 	struct radix_node_head	*wg_rtable_ipv4;
+#endif
+#ifdef INET6
+	struct socket		*wg_so6;
 	struct radix_node_head	*wg_rtable_ipv6;
+#endif
 
 	struct wg_ppsratecheck	wg_ppsratecheck;
 
@@ -659,8 +662,6 @@ static unsigned wg_keepalive_timeout = W
 static struct mbuf *
 		wg_get_mbuf(size_t, size_t);
 
-static void	wg_wakeup_worker(struct wg_worker *, int);
-
 static int	wg_send_data_msg(struct wg_peer *, struct wg_session *,
 		    struct mbuf *);
 static int	wg_send_cookie_msg(struct wg_softc *, struct wg_peer *,
@@ -704,6 +705,8 @@ static int	wg_bind_port(struct wg_softc 
 static int	wg_init(struct ifnet *);
 static void	wg_stop(struct ifnet *, int);
 
+static void	wg_peer_work(struct work *, void *);
+static void	wg_job(struct threadpool_job *);
 static void	wgintr(void *);
 static void	wg_purge_pending_packets(struct wg_peer *);
 
@@ -788,6 +791,7 @@ static struct if_clone wg_cloner =
     IF_CLONE_INITIALIZER("wg", wg_clone_create, wg_clone_destroy);
 
 static struct pktqueue *wg_pktq __read_mostly;
+static struct workqueue *wg_wq __read_mostly;
 
 void wgattach(int);
 /* ARGSUSED */
@@ -803,6 +807,7 @@ wgattach(int count)
 static void
 wginit(void)
 {
+	int error __diagused;
 
 	wg_psref_class = psref_class_create("wg", IPL_SOFTNET);
 
@@ -812,6 +817,10 @@ wginit(void)
 	wg_pktq = pktq_create(IFQ_MAXLEN, wgintr, NULL);
 	KASSERT(wg_pktq != NULL);
 
+	error = workqueue_create(&wg_wq, "wgpeer", wg_peer_work, NULL,
+	    PRI_NONE, IPL_SOFTNET, WQ_MPSAFE|WQ_PERCPU);
+	KASSERT(error == 0);
+
 	if_clone_attach(&wg_cloner);
 }
 
@@ -1555,17 +1564,17 @@ out:
 }
 
 static struct socket *
-wg_get_so_by_af(struct wg_worker *wgw, const int af)
+wg_get_so_by_af(struct wg_softc *wg, const int af)
 {
 
-	return (af == AF_INET) ? wgw->wgw_so4 : wgw->wgw_so6;
+	return (af == AF_INET) ? wg->wg_so4 : wg->wg_so6;
 }
 
 static struct socket *
 wg_get_so_by_peer(struct wg_peer *wgp, struct wg_sockaddr *wgsa)
 {
 
-	return wg_get_so_by_af(wgp->wgp_sc->wg_worker, wgsa_family(wgsa));
+	return wg_get_so_by_af(wgp->wgp_sc, wgsa_family(wgsa));
 }
 
 static struct wg_sockaddr *
@@ -2246,9 +2255,18 @@ static void
 wg_schedule_peer_task(struct wg_peer *wgp, int task)
 {
 
-	atomic_or_uint(&wgp->wgp_tasks, task);
+	mutex_enter(wgp->wgp_intr_lock);
 	WG_DLOG("tasks=%d, task=%d\n", wgp->wgp_tasks, task);
-	wg_wakeup_worker(wgp->wgp_sc->wg_worker, WG_WAKEUP_REASON_PEER);
+	if (wgp->wgp_tasks == 0)
+		/*
+		 * XXX If the current CPU is already loaded -- e.g., if
+		 * there's already a bunch of handshakes queued up --
+		 * consider tossing this over to another CPU to
+		 * distribute the load.
+		 */
+		workqueue_enqueue(wg_wq, &wgp->wgp_work, NULL);
+	wgp->wgp_tasks |= task;
+	mutex_exit(wgp->wgp_intr_lock);
 }
 
 static void
@@ -2783,7 +2801,7 @@ wg_receive_packets(struct wg_softc *wg, 
 		struct mbuf *paddr = NULL;
 		struct sockaddr *src;
 
-		so = wg_get_so_by_af(wg->wg_worker, af);
+		so = wg_get_so_by_af(wg, af);
 		flags = MSG_DONTWAIT;
 		dummy_uio.uio_resid = 1000000000;
 
@@ -2987,28 +3005,16 @@ wg_task_destroy_prev_session(struct wg_s
 }
 
 static void
-wg_process_peer_tasks(struct wg_softc *wg)
+wg_peer_work(struct work *wk, void *cookie)
 {
-	struct wg_peer *wgp;
-	int s;
-
-	/* XXX should avoid checking all peers */
-	s = pserialize_read_enter();
-	WG_PEER_READER_FOREACH(wgp, wg) {
-		struct psref psref;
-		unsigned int tasks;
-
-		if (wgp->wgp_tasks == 0)
-			continue;
-
-		wg_get_peer(wgp, &psref);
-		pserialize_read_exit(s);
-
-	restart:
-		tasks = atomic_swap_uint(&wgp->wgp_tasks, 0);
-		KASSERT(tasks != 0);
+	struct wg_peer *wgp = container_of(wk, struct wg_peer, wgp_work);
+	struct wg_softc *wg = wgp->wgp_sc;
+	int tasks;
 
-		WG_DLOG("tasks=%x\n", tasks);
+	mutex_enter(wgp->wgp_intr_lock);
+	while ((tasks = wgp->wgp_tasks) != 0) {
+		wgp->wgp_tasks = 0;
+		mutex_exit(wgp->wgp_intr_lock);
 
 		mutex_enter(wgp->wgp_lock);
 		if (ISSET(tasks, WGP_TASK_SEND_INIT_MESSAGE))
@@ -3025,66 +3031,37 @@ wg_process_peer_tasks(struct wg_softc *w
 			wg_task_destroy_prev_session(wg, wgp);
 		mutex_exit(wgp->wgp_lock);
 
-		/* New tasks may be scheduled during processing tasks */
-		WG_DLOG("wgp_tasks=%d\n", wgp->wgp_tasks);
-		if (wgp->wgp_tasks != 0)
-			goto restart;
-
-		s = pserialize_read_enter();
-		wg_put_peer(wgp, &psref);
+		mutex_enter(wgp->wgp_intr_lock);
 	}
-	pserialize_read_exit(s);
+	mutex_exit(wgp->wgp_intr_lock);
 }
 
 static void
-wg_worker(void *arg)
+wg_job(struct threadpool_job *job)
 {
-	struct wg_softc *wg = arg;
-	struct wg_worker *wgw = wg->wg_worker;
-	bool todie = false;
-
-	KASSERT(wg != NULL);
-	KASSERT(wgw != NULL);
-
-	while (!todie) {
-		int reasons;
-		int bound;
-
-		mutex_enter(&wgw->wgw_lock);
-		/* New tasks may come during task handling */
-		while ((reasons = wgw->wgw_wakeup_reasons) == 0 &&
-		    !(todie = wgw->wgw_todie))
-			cv_wait(&wgw->wgw_cv, &wgw->wgw_lock);
-		wgw->wgw_wakeup_reasons = 0;
-		mutex_exit(&wgw->wgw_lock);
+	struct wg_softc *wg = container_of(job, struct wg_softc, wg_job);
+	int bound, upcalls;
 
+	mutex_enter(wg->wg_intr_lock);
+	while ((upcalls = wg->wg_upcalls) != 0) {
+		wg->wg_upcalls = 0;
+		mutex_exit(wg->wg_intr_lock);
 		bound = curlwp_bind();
-		if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4))
+		if (ISSET(upcalls, WG_UPCALL_INET))
 			wg_receive_packets(wg, AF_INET);
-		if (ISSET(reasons, WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6))
+		if (ISSET(upcalls, WG_UPCALL_INET6))
 			wg_receive_packets(wg, AF_INET6);
-		if (ISSET(reasons, WG_WAKEUP_REASON_PEER))
-			wg_process_peer_tasks(wg);
 		curlwp_bindx(bound);
+		mutex_enter(wg->wg_intr_lock);
 	}
-	kthread_exit(0);
-}
-
-static void
-wg_wakeup_worker(struct wg_worker *wgw, const int reason)
-{
-
-	mutex_enter(&wgw->wgw_lock);
-	wgw->wgw_wakeup_reasons |= reason;
-	cv_broadcast(&wgw->wgw_cv);
-	mutex_exit(&wgw->wgw_lock);
+	threadpool_job_done(job);
+	mutex_exit(wg->wg_intr_lock);
 }
 
 static int
 wg_bind_port(struct wg_softc *wg, const uint16_t port)
 {
 	int error;
-	struct wg_worker *wgw = wg->wg_worker;
 	uint16_t old_port = wg->wg_listen_port;
 
 	if (port != 0 && old_port == port)
@@ -3096,7 +3073,7 @@ wg_bind_port(struct wg_softc *wg, const 
 	sin->sin_addr.s_addr = INADDR_ANY;
 	sin->sin_port = htons(port);
 
-	error = sobind(wgw->wgw_so4, sintosa(sin), curlwp);
+	error = sobind(wg->wg_so4, sintosa(sin), curlwp);
 	if (error != 0)
 		return error;
 
@@ -3107,7 +3084,7 @@ wg_bind_port(struct wg_softc *wg, const 
 	sin6->sin6_addr = in6addr_any;
 	sin6->sin6_port = htons(port);
 
-	error = sobind(wgw->wgw_so6, sin6tosa(sin6), curlwp);
+	error = sobind(wg->wg_so6, sin6tosa(sin6), curlwp);
 	if (error != 0)
 		return error;
 #endif
@@ -3118,15 +3095,19 @@ wg_bind_port(struct wg_softc *wg, const 
 }
 
 static void
-wg_so_upcall(struct socket *so, void *arg, int events, int waitflag)
+wg_so_upcall(struct socket *so, void *cookie, int events, int waitflag)
 {
-	struct wg_worker *wgw = arg;
+	struct wg_softc *wg = cookie;
 	int reason;
 
 	reason = (so->so_proto->pr_domain->dom_family == AF_INET) ?
-	    WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV4 :
-	    WG_WAKEUP_REASON_RECEIVE_PACKETS_IPV6;
-	wg_wakeup_worker(wgw, reason);
+	    WG_UPCALL_INET :
+	    WG_UPCALL_INET6;
+
+	mutex_enter(wg->wg_intr_lock);
+	wg->wg_upcalls |= reason;
+	threadpool_schedule_job(wg->wg_threadpool, &wg->wg_job);
+	mutex_exit(wg->wg_intr_lock);
 }
 
 static int
@@ -3184,8 +3165,7 @@ wg_overudp_cb(struct mbuf **mp, int offs
 }
 
 static int
-wg_worker_socreate(struct wg_softc *wg, struct wg_worker *wgw, const int af,
-    struct socket **sop)
+wg_socreate(struct wg_softc *wg, int af, struct socket **sop)
 {
 	int error;
 	struct socket *so;
@@ -3195,7 +3175,7 @@ wg_worker_socreate(struct wg_softc *wg, 
 		return error;
 
 	solock(so);
-	so->so_upcallarg = wgw;
+	so->so_upcallarg = wg;
 	so->so_upcall = wg_so_upcall;
 	so->so_rcv.sb_flags |= SB_UPCALL;
 	if (af == AF_INET)
@@ -3211,79 +3191,6 @@ wg_worker_socreate(struct wg_softc *wg, 
 	return 0;
 }
 
-static int
-wg_worker_init(struct wg_softc *wg)
-{
-	int error;
-	struct wg_worker *wgw;
-	const char *ifname = wg->wg_if.if_xname;
-	struct socket *so;
-
-	wgw = kmem_zalloc(sizeof(*wgw), KM_SLEEP);
-
-	mutex_init(&wgw->wgw_lock, MUTEX_DEFAULT, IPL_SOFTNET);
-	cv_init(&wgw->wgw_cv, ifname);
-	wgw->wgw_todie = false;
-	wgw->wgw_wakeup_reasons = 0;
-
-	error = wg_worker_socreate(wg, wgw, AF_INET, &so);
-	if (error != 0)
-		goto error;
-	wgw->wgw_so4 = so;
-#ifdef INET6
-	error = wg_worker_socreate(wg, wgw, AF_INET6, &so);
-	if (error != 0)
-		goto error;
-	wgw->wgw_so6 = so;
-#endif
-
-	wg->wg_worker = wgw;
-
-	error = kthread_create(PRI_NONE, KTHREAD_MPSAFE | KTHREAD_MUSTJOIN,
-	    NULL, wg_worker, wg, &wg->wg_worker_lwp, "%s", ifname);
-	if (error != 0)
-		goto error;
-
-	return 0;
-
-error:
-#ifdef INET6
-	if (wgw->wgw_so6 != NULL)
-		soclose(wgw->wgw_so6);
-#endif
-	if (wgw->wgw_so4 != NULL)
-		soclose(wgw->wgw_so4);
-	cv_destroy(&wgw->wgw_cv);
-	mutex_destroy(&wgw->wgw_lock);
-
-	kmem_free(wgw, sizeof(*wgw));
-
-	return error;
-}
-
-static void
-wg_worker_destroy(struct wg_softc *wg)
-{
-	struct wg_worker *wgw = wg->wg_worker;
-
-	mutex_enter(&wgw->wgw_lock);
-	wgw->wgw_todie = true;
-	wgw->wgw_wakeup_reasons = 0;
-	cv_broadcast(&wgw->wgw_cv);
-	mutex_exit(&wgw->wgw_lock);
-
-	kthread_join(wg->wg_worker_lwp);
-
-#ifdef INET6
-	soclose(wgw->wgw_so6);
-#endif
-	soclose(wgw->wgw_so4);
-	cv_destroy(&wgw->wgw_cv);
-	mutex_destroy(&wgw->wgw_lock);
-	kmem_free(wg->wg_worker, sizeof(struct wg_worker));
-	wg->wg_worker = NULL;
-}
-
 static bool
 wg_session_hit_limits(struct wg_session *wgs)
 {
@@ -3385,6 +3292,7 @@ wg_alloc_peer(struct wg_softc *wg)
 	wgp->wgp_endpoint_changing = false;
 	wgp->wgp_endpoint_available = false;
 	wgp->wgp_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
+	wgp->wgp_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET);
 	wgp->wgp_psz = pserialize_create();
 	psref_target_init(&wgp->wgp_psref, wg_psref_class);
 
@@ -3454,6 +3362,9 @@ wg_destroy_peer(struct wg_peer *wgp)
 	callout_halt(&wgp->wgp_handshake_timeout_timer, NULL);
 	callout_halt(&wgp->wgp_session_dtor_timer, NULL);
 
+	/* Wait for any queued work to complete.  */
+	workqueue_wait(wg_wq, &wgp->wgp_work);
+
 	wgs = wgp->wgp_session_unstable;
 	if (wgs->wgs_state != WGS_STATE_UNKNOWN) {
 		mutex_enter(wgp->wgp_lock);
@@ -3486,6 +3397,7 @@ wg_destroy_peer(struct wg_peer *wgp)
 	kmem_free(wgp->wgp_endpoint0, sizeof(*wgp->wgp_endpoint0));
 
 	pserialize_destroy(wgp->wgp_psz);
+	mutex_obj_free(wgp->wgp_intr_lock);
 	mutex_obj_free(wgp->wgp_lock);
 
 	kmem_free(wgp, sizeof(*wgp));
@@ -3618,28 +3530,39 @@ wg_clone_create(struct if_clone *ifc, in
 
 	if_initname(&wg->wg_if, ifc->ifc_name, unit);
 
-	error = wg_worker_init(wg);
+	PSLIST_INIT(&wg->wg_peers);
+	wg->wg_peers_bypubkey = thmap_create(0, NULL, THMAP_NOCOPY);
+	wg->wg_peers_byname = thmap_create(0, NULL, THMAP_NOCOPY);
+	wg->wg_sessions_byindex = thmap_create(0, NULL, THMAP_NOCOPY);
+	wg->wg_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
+	wg->wg_intr_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_SOFTNET);
+	wg->wg_rwlock = rw_obj_alloc();
+	threadpool_job_init(&wg->wg_job, wg_job, wg->wg_intr_lock,
+	    "%s", if_name(&wg->wg_if));
+	wg->wg_ops = &wg_ops_rumpkernel;
+
+	error = threadpool_get(&wg->wg_threadpool, PRI_NONE);
 	if (error)
 		goto fail0;
 
+#ifdef INET
+	error = wg_socreate(wg, AF_INET, &wg->wg_so4);
+	if (error)
+		goto fail1;
 	rn_inithead((void **)&wg->wg_rtable_ipv4,
 	    offsetof(struct sockaddr_in, sin_addr) * NBBY);
+#endif
 #ifdef INET6
+	error = wg_socreate(wg, AF_INET6, &wg->wg_so6);
+	if (error)
+		goto fail2;
 	rn_inithead((void **)&wg->wg_rtable_ipv6,
 	    offsetof(struct sockaddr_in6, sin6_addr) * NBBY);
 #endif
 
-	PSLIST_INIT(&wg->wg_peers);
-	wg->wg_peers_bypubkey = thmap_create(0, NULL, THMAP_NOCOPY);
-	wg->wg_peers_byname = thmap_create(0, NULL, THMAP_NOCOPY);
-	wg->wg_sessions_byindex = thmap_create(0, NULL, THMAP_NOCOPY);
-	wg->wg_lock = mutex_obj_alloc(MUTEX_DEFAULT, IPL_NONE);
-	wg->wg_rwlock = rw_obj_alloc();
-	wg->wg_ops = &wg_ops_rumpkernel;
-
 	error = wg_if_attach(wg);
 	if (error)
-		goto fail1;
+		goto fail3;
 
 	mutex_enter(&wg_softcs.lock);
 	LIST_INSERT_HEAD(&wg_softcs.list, wg, wg_list);
@@ -3647,24 +3570,47 @@ wg_clone_create(struct if_clone *ifc, in
 
 	return 0;
 
-fail2: __unused
+fail4: __unused
 	mutex_enter(&wg_softcs.lock);
 	LIST_REMOVE(wg, wg_list);
 	mutex_exit(&wg_softcs.lock);
 	wg_if_detach(wg);
-	wg_destroy_all_peers(wg);
+fail3:	wg_destroy_all_peers(wg);
+#ifdef INET6
+	solock(wg->wg_so6);
+	wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL;
+	sounlock(wg->wg_so6);
+#endif
+#ifdef INET
+	solock(wg->wg_so4);
+	wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL;
+	sounlock(wg->wg_so4);
+#endif
+	mutex_enter(wg->wg_intr_lock);
+	threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job);
+	mutex_exit(wg->wg_intr_lock);
+#ifdef INET6
+	if (wg->wg_rtable_ipv6 != NULL)
+		free(wg->wg_rtable_ipv6, M_RTABLE);
+	soclose(wg->wg_so6);
+fail2:
+#endif
+#ifdef INET
+	if (wg->wg_rtable_ipv4 != NULL)
+		free(wg->wg_rtable_ipv4, M_RTABLE);
+	soclose(wg->wg_so4);
+fail1:
+#endif
+	threadpool_put(wg->wg_threadpool, PRI_NONE);
+fail0:	threadpool_job_destroy(&wg->wg_job);
 	rw_obj_free(wg->wg_rwlock);
+	mutex_obj_free(wg->wg_intr_lock);
 	mutex_obj_free(wg->wg_lock);
 	thmap_destroy(wg->wg_sessions_byindex);
 	thmap_destroy(wg->wg_peers_byname);
 	thmap_destroy(wg->wg_peers_bypubkey);
 	PSLIST_DESTROY(&wg->wg_peers);
-	if (wg->wg_rtable_ipv6 != NULL)
-		free(wg->wg_rtable_ipv6, M_RTABLE);
-	if (wg->wg_rtable_ipv4 != NULL)
-		free(wg->wg_rtable_ipv4, M_RTABLE);
-fail1:	wg_worker_destroy(wg);
-fail0:	kmem_free(wg, sizeof(*wg));
+	kmem_free(wg, sizeof(*wg));
 	return error;
 }
 
@@ -3685,17 +3631,38 @@ wg_clone_destroy(struct ifnet *ifp)
 	mutex_exit(&wg_softcs.lock);
 	wg_if_detach(wg);
 	wg_destroy_all_peers(wg);
+#ifdef INET6
+	solock(wg->wg_so6);
+	wg->wg_so6->so_rcv.sb_flags &= ~SB_UPCALL;
+	sounlock(wg->wg_so6);
+#endif
+#ifdef INET
+	solock(wg->wg_so4);
+	wg->wg_so4->so_rcv.sb_flags &= ~SB_UPCALL;
+	sounlock(wg->wg_so4);
+#endif
+	mutex_enter(wg->wg_intr_lock);
+	threadpool_cancel_job(wg->wg_threadpool, &wg->wg_job);
+	mutex_exit(wg->wg_intr_lock);
+#ifdef INET6
+	if (wg->wg_rtable_ipv6 != NULL)
+		free(wg->wg_rtable_ipv6, M_RTABLE);
+	soclose(wg->wg_so6);
+#endif
+#ifdef INET
+	if (wg->wg_rtable_ipv4 != NULL)
+		free(wg->wg_rtable_ipv4, M_RTABLE);
+	soclose(wg->wg_so4);
+#endif
+	threadpool_put(wg->wg_threadpool, PRI_NONE);
+	threadpool_job_destroy(&wg->wg_job);
 	rw_obj_free(wg->wg_rwlock);
+	mutex_obj_free(wg->wg_intr_lock);
 	mutex_obj_free(wg->wg_lock);
 	thmap_destroy(wg->wg_sessions_byindex);
 	thmap_destroy(wg->wg_peers_byname);
 	thmap_destroy(wg->wg_peers_bypubkey);
 	PSLIST_DESTROY(&wg->wg_peers);
-	if (wg->wg_rtable_ipv4 != NULL)
-		free(wg->wg_rtable_ipv4, M_RTABLE);
-	if (wg->wg_rtable_ipv6 != NULL)
-		free(wg->wg_rtable_ipv6, M_RTABLE);
-	wg_worker_destroy(wg);
 	kmem_free(wg, sizeof(*wg));
 
 	return 0;

Reply via email to