On 23/06/21(Wed) 15:53, Alexander Bluhm wrote:
> On Wed, Jun 23, 2021 at 11:40:18AM +0200, Martin Pieuchot wrote:
> > Our previous attempt [0] to replace the current select(2) implementation
> > has been reverted due to non-acceptable latency increase on sockets [1].
>
> I have measured the performance difference.
>
> http://bluhm.genua.de/perform/results/2021-06-21T09%3A44%3A18Z/perform.html
>
> Worst 20% throughput drop is in 'iperf3 -c10.3.45.35 -u -b10G -w1m
> -t10 -R' which can be seen here.
>
> http://bluhm.genua.de/perform/results/2021-06-21T09%3A44%3A18Z/gnuplot/udp.html
>
> Note that iperf3 calls select(2) multiple times per UDP packet.
>
> As a new feature I have links to btrace kstack flame graphs in the
> table.
Thanks a lot for the tests. The FlameGraphs have shown that lazy
removal wasn't working correctly. Updated diff below now works as
expected.
I'm aware of the throughput drop in the UDP iperf3 test, this is not a
real case scenario so I don't consider it as a blocker. However it is
very useful to check the contention on the NET_LOCK() in select(2). I'm
working on this issue on another thread, but there's an interdependency
between the two diffs, due to lock ordering.
Comments?
Index: kern/kern_event.c
===================================================================
RCS file: /cvs/src/sys/kern/kern_event.c,v
retrieving revision 1.167
diff -u -p -r1.167 kern_event.c
--- kern/kern_event.c 16 Jun 2021 14:26:30 -0000 1.167
+++ kern/kern_event.c 13 Jul 2021 07:21:03 -0000
@@ -92,7 +92,7 @@ void kqueue_do_check(struct kqueue *kq,
#define kqueue_check(kq) do {} while (0)
#endif
-void kqpoll_dequeue(struct proc *p);
+void kqpoll_dequeue(struct proc *p, int all);
static int filter_attach(struct knote *kn);
static void filter_detach(struct knote *kn);
@@ -720,12 +720,12 @@ kqpoll_init(void)
if (p->p_kq != NULL) {
/*
- * Discard any knotes that have been enqueued after
+ * Discard any badfd knotes that have been enqueued after
* previous scan.
- * This prevents accumulation of enqueued badfd knotes
- * in case scan does not make progress for some reason.
+ * This prevents them from accumulating in case
+ * scan does not make progress for some reason.
*/
- kqpoll_dequeue(p);
+ kqpoll_dequeue(p, 0);
return;
}
@@ -747,7 +747,7 @@ kqpoll_exit(void)
kqueue_purge(p, p->p_kq);
/* Clear any detached knotes that remain in the queue. */
- kqpoll_dequeue(p);
+ kqpoll_dequeue(p, 1);
kqueue_terminate(p, p->p_kq);
KASSERT(p->p_kq->kq_refs == 1);
KQRELE(p->p_kq);
@@ -755,33 +755,50 @@ kqpoll_exit(void)
}
void
-kqpoll_dequeue(struct proc *p)
+kqpoll_dequeue(struct proc *p, int all)
{
+ struct knote marker;
struct knote *kn;
struct kqueue *kq = p->p_kq;
+ /* Bail out early without locking if the queue appears empty. */
+ if (kq->kq_count == 0)
+ return;
+
+ memset(&marker, 0, sizeof(marker));
+ marker.kn_filter = EVFILT_MARKER;
+ marker.kn_status = KN_PROCESSING;
+
mtx_enter(&kq->kq_lock);
- while ((kn = TAILQ_FIRST(&kq->kq_head)) != NULL) {
+ kn = TAILQ_FIRST(&kq->kq_head);
+ while (kn != NULL) {
/* This kqueue should not be scanned by other threads. */
KASSERT(kn->kn_filter != EVFILT_MARKER);
- if (!knote_acquire(kn, NULL, 0)) {
- /* knote_acquire() has released kq_lock. */
- mtx_enter(&kq->kq_lock);
+ if (all == 0 && (kn->kn_status & KN_ATTACHED)) {
+ kn = TAILQ_NEXT(kn, kn_tqe);
continue;
}
- kqueue_check(kq);
- TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
- kn->kn_status &= ~KN_QUEUED;
- kq->kq_count--;
- mtx_leave(&kq->kq_lock);
+ TAILQ_INSERT_BEFORE(kn, &marker, kn_tqe);
+
+ if (!knote_acquire(kn, NULL, 0)) {
+ /* knote_acquire() has released kq_lock. */
+ } else {
+ kqueue_check(kq);
+ TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+ kn->kn_status &= ~KN_QUEUED;
+ kq->kq_count--;
+ mtx_leave(&kq->kq_lock);
- filter_detach(kn);
- knote_drop(kn, p);
+ filter_detach(kn);
+ knote_drop(kn, p);
+ }
mtx_enter(&kq->kq_lock);
kqueue_check(kq);
+ kn = TAILQ_NEXT(&marker, kn_tqe);
+ TAILQ_REMOVE(&kq->kq_head, &marker, kn_tqe);
}
mtx_leave(&kq->kq_lock);
}
Index: kern/sys_generic.c
===================================================================
RCS file: /cvs/src/sys/kern/sys_generic.c,v
retrieving revision 1.135
diff -u -p -r1.135 sys_generic.c
--- kern/sys_generic.c 8 Jan 2021 09:29:04 -0000 1.135
+++ kern/sys_generic.c 13 Jul 2021 07:21:03 -0000
@@ -55,6 +55,7 @@
#include <sys/time.h>
#include <sys/malloc.h>
#include <sys/poll.h>
+#include <sys/eventvar.h>
#ifdef KTRACE
#include <sys/ktrace.h>
#endif
@@ -66,8 +67,21 @@
#include <uvm/uvm_extern.h>
-int selscan(struct proc *, fd_set *, fd_set *, int, int, register_t *);
-void pollscan(struct proc *, struct pollfd *, u_int, register_t *);
+/*
+ * Debug values:
+ * 1 - print implementation errors, things that should not happen.
+ * 2 - print ppoll(2) information, somewhat verbose
+ * 3 - print pselect(2) and ppoll(2) information, very verbose
+ */
+int kqpoll_debug = 0;
+#define DPRINTFN(v, x...) if (kqpoll_debug > v) { \
+ printf("%s(%d): ", curproc->p_p->ps_comm, curproc->p_tid); \
+ printf(x); \
+}
+
+int pselregister(struct proc *, fd_set *[], int, int *);
+int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
+
int pollout(struct pollfd *, struct pollfd *, u_int);
int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
struct timespec *, const sigset_t *, register_t *);
@@ -582,13 +596,12 @@ sys_pselect(struct proc *p, void *v, reg
int
dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex,
- struct timespec *timeout, const sigset_t *sigmask, register_t *retval)
+ struct timespec *tsp, const sigset_t *sigmask, register_t *retval)
{
+ struct kqueue_scan_state scan;
fd_mask bits[6];
fd_set *pibits[3], *pobits[3];
- struct timespec elapsed, start, stop;
- uint64_t nsecs;
- int s, ncoll, error = 0;
+ int error, n, ncollected = 0, nevents = 0;
u_int ni;
if (nd < 0)
@@ -618,6 +631,8 @@ dopselect(struct proc *p, int nd, fd_set
pobits[2] = (fd_set *)&bits[5];
}
+ kqpoll_init();
+
#define getbits(name, x) \
if (name && (error = copyin(name, pibits[x], ni))) \
goto done;
@@ -636,43 +651,65 @@ dopselect(struct proc *p, int nd, fd_set
if (sigmask)
dosigsuspend(p, *sigmask &~ sigcantmask);
-retry:
- ncoll = nselcoll;
- atomic_setbits_int(&p->p_flag, P_SELECT);
- error = selscan(p, pibits[0], pobits[0], nd, ni, retval);
- if (error || *retval)
+ /* Register kqueue events */
+ error = pselregister(p, pibits, nd, &nevents);
+ if (error != 0)
goto done;
- if (timeout == NULL || timespecisset(timeout)) {
- if (timeout != NULL) {
- getnanouptime(&start);
- nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP);
- } else
- nsecs = INFSLP;
- s = splhigh();
- if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) {
- splx(s);
- goto retry;
- }
- atomic_clearbits_int(&p->p_flag, P_SELECT);
- error = tsleep_nsec(&selwait, PSOCK | PCATCH, "select", nsecs);
- splx(s);
- if (timeout != NULL) {
- getnanouptime(&stop);
- timespecsub(&stop, &start, &elapsed);
- timespecsub(timeout, &elapsed, timeout);
- if (timeout->tv_sec < 0)
- timespecclear(timeout);
+
+ /*
+ * The poll/select family of syscalls has been designed to
+ * block when file descriptors are not available, even if
+ * there's nothing to wait for.
+ */
+ if (nevents == 0) {
+ uint64_t nsecs = INFSLP;
+
+ if (tsp != NULL) {
+ if (!timespecisset(tsp))
+ goto done;
+ nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(tsp), MAXTSLP));
}
- if (error == 0 || error == EWOULDBLOCK)
- goto retry;
+ error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqsel", nsecs);
+ /* select is not restarted after signals... */
+ if (error == ERESTART)
+ error = EINTR;
+ if (error == EWOULDBLOCK)
+ error = 0;
+ goto done;
+ }
+
+ /* Collect at most `nevents' possibly waiting in kqueue_scan() */
+ kqueue_scan_setup(&scan, p->p_kq);
+ while ((n = nevents - ncollected) > 0) {
+ struct kevent kev[KQ_NEVENTS];
+ int i, ready;
+
+ if (n > nitems(kev))
+ n = nitems(kev);
+ ready = kqueue_scan(&scan, nitems(kev), kev, tsp, p, &error);
+ if (ready == 0)
+ break;
+#ifdef KTRACE
+ if (KTRPOINT(p, KTR_STRUCT))
+ ktrevent(p, kev, ready);
+#endif
+ /* Convert back events that are ready/delete spurious ones. */
+ for (i = 0; i < ready && error == 0; i++)
+ error = pselcollect(p, &kev[i], pobits, &ncollected);
+
+ if (error)
+ break;
+
+ /*
+ * If we only got spurious events, try again to wait at
+ * least once.
+ */
+ if (ncollected == 0 && ready > 0)
+ scan.kqs_nevent = 0;
}
-done:
- atomic_clearbits_int(&p->p_flag, P_SELECT);
- /* select is not restarted after signals... */
- if (error == ERESTART)
- error = EINTR;
- if (error == EWOULDBLOCK)
- error = 0;
+ kqueue_scan_finish(&scan);
+ *retval = ncollected;
+ done:
#define putbits(name, x) \
if (name && (error2 = copyout(pobits[x], name, ni))) \
error = error2;
@@ -694,40 +731,108 @@ done:
if (pibits[0] != (fd_set *)&bits[0])
free(pibits[0], M_TEMP, 6 * ni);
+
+ /* Needed to remove events lazily. */
+ p->p_kq_serial += nd;
+
return (error);
}
+/*
+ * Convert fd_set into kqueue events and register them on the
+ * per-thread queue.
+ */
int
-selscan(struct proc *p, fd_set *ibits, fd_set *obits, int nfd, int ni,
- register_t *retval)
+pselregister(struct proc *p, fd_set *pibits[3], int nfd, int *nregistered)
{
- caddr_t cibits = (caddr_t)ibits, cobits = (caddr_t)obits;
- struct filedesc *fdp = p->p_fd;
- int msk, i, j, fd;
+ static const int evf[] = { EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT };
+ static const int evff[] = { 0, 0, NOTE_OOB };
+ int msk, i, j, fd, nevents = 0, error = 0;
+ struct kevent kev;
fd_mask bits;
- struct file *fp;
- int n = 0;
- static const int flag[3] = { POLLIN, POLLOUT|POLL_NOHUP, POLLPRI };
for (msk = 0; msk < 3; msk++) {
- fd_set *pibits = (fd_set *)&cibits[msk*ni];
- fd_set *pobits = (fd_set *)&cobits[msk*ni];
-
for (i = 0; i < nfd; i += NFDBITS) {
- bits = pibits->fds_bits[i/NFDBITS];
+ bits = pibits[msk]->fds_bits[i / NFDBITS];
while ((j = ffs(bits)) && (fd = i + --j) < nfd) {
bits &= ~(1 << j);
- if ((fp = fd_getfile(fdp, fd)) == NULL)
- return (EBADF);
- if ((*fp->f_ops->fo_poll)(fp, flag[msk], p)) {
- FD_SET(fd, pobits);
- n++;
+
+ DPRINTFN(2, "select fd %d mask %d serial %lu\n",
+ fd, msk, p->p_kq_serial);
+ EV_SET(&kev, fd, evf[msk],
+ EV_ADD|EV_ENABLE|__EV_POLL,
+ evff[msk], 0, (void *)(p->p_kq_serial));
+#ifdef KTRACE
+ if (KTRPOINT(p, KTR_STRUCT))
+ ktrevent(p, &kev, 1);
+#endif
+ error = kqueue_register(p->p_kq, &kev, p);
+ switch (error) {
+ case 0:
+ nevents++;
+ /* FALLTHROUGH */
+ case EOPNOTSUPP:/* No underlying kqfilter */
+ case EINVAL: /* Unimplemented filter */
+ error = 0;
+ break;
+ case ENXIO: /* Device has been detached */
+ default:
+ goto bad;
}
- FRELE(fp, p);
}
}
}
- *retval = n;
+
+ *nregistered = nevents;
+ return (0);
+bad:
+ DPRINTFN(0, "select fd %u filt %d error %d\n", (int)kev.ident,
+ kev.filter, error);
+ return (error);
+}
+
+/*
+ * Convert given kqueue event into corresponding select(2) bit.
+ */
+int
+pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3],
+ int *ncollected)
+{
+#ifdef DIAGNOSTIC
+ /* Filter out and lazily delete spurious events */
+ if ((unsigned long)kevp->udata != p->p_kq_serial) {
+ DPRINTFN(0, "select fd %u lazily deleted (%lu != %lu)\n",
+ (int)kevp->ident, p->p_kq_serial,
+ (unsigned long)kevp->udata);
+ kevp->flags = EV_DISABLE|EV_DELETE;
+ kqueue_register(p->p_kq, kevp, p);
+ return (0);
+ }
+#endif
+ if (kevp->flags & EV_ERROR) {
+ DPRINTFN(2, "select fd %d filt %d error %d\n",
+ (int)kevp->ident, kevp->filter, (int)kevp->data);
+ kevp->flags = EV_DISABLE|EV_DELETE;
+ kqueue_register(p->p_kq, kevp, p);
+ return (kevp->data);
+ }
+
+ switch (kevp->filter) {
+ case EVFILT_READ:
+ FD_SET(kevp->ident, pobits[0]);
+ break;
+ case EVFILT_WRITE:
+ FD_SET(kevp->ident, pobits[1]);
+ break;
+ case EVFILT_EXCEPT:
+ FD_SET(kevp->ident, pobits[2]);
+ break;
+ default:
+ KASSERT(0);
+ }
+ (*ncollected)++;
+
+ DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter);
return (0);
}