Now that the kqueue refactoring has been committed, here's once again the diff to modify the internal implementation of {p,}select(2) to query kqfilter handlers instead of poll ones.
{p,}poll(2) are left untouched to ease the transition. I plan to commit this in 3 steps, to ease a possible revert: - kevent(2) refactoring - introduction of newer kq* APIs - dopselect rewrite A mid-term goal of this change would be to get rid of the poll handlers in order to have a single event system in the kernel to maintain and turn mp-safe. The logic is as follow: - With this change every thread get a "private" kqueue, usable by the kernel only, to register events for select(2) and later poll(2). - Events specified via FD_SET(2) are converted to their kqueue equivalent. - kqueue_scan() has been modified to be restartable and work with a given kqueue. - At the end of every {p,}select(2) syscall the private kqueue is purged. This version should include your previous feedbacks. Comments, tests and oks are welcome! Thanks, Martin Index: kern/kern_event.c =================================================================== RCS file: /cvs/src/sys/kern/kern_event.c,v retrieving revision 1.145 diff -u -p -r1.145 kern_event.c --- kern/kern_event.c 25 Nov 2020 13:49:00 -0000 1.145 +++ kern/kern_event.c 30 Nov 2020 15:30:40 -0000 @@ -57,6 +57,7 @@ #include <sys/timeout.h> #include <sys/wait.h> +struct kqueue *kqueue_alloc(struct filedesc *); void kqueue_terminate(struct proc *p, struct kqueue *); void kqueue_free(struct kqueue *); void kqueue_init(void); @@ -504,6 +505,27 @@ const struct filterops dead_filtops = { .f_event = filt_dead, }; +void +kqpoll_init(struct proc *p) +{ + if (p->p_kq != NULL) + return; + + p->p_kq = kqueue_alloc(p->p_fd); + p->p_kq_serial = arc4random(); +} + +void +kqpoll_exit(struct proc *p) +{ + if (p->p_kq == NULL) + return; + + kqueue_terminate(p, p->p_kq); + kqueue_free(p->p_kq); + p->p_kq = NULL; +} + struct kqueue * kqueue_alloc(struct filedesc *fdp) { @@ -567,6 +589,7 @@ sys_kevent(struct proc *p, void *v, regi struct timespec ts; struct timespec *tsp = NULL; int i, n, nerrors, error; + int ready, total; struct kevent kev[KQ_NEVENTS]; if ((fp = fd_getfile(fdp, SCARG(uap, fd))) == NULL) @@ -595,9 +618,9 @@ sys_kevent(struct proc *p, void *v, regi kq = fp->f_data; nerrors = 0; - while (SCARG(uap, nchanges) > 0) { - n = SCARG(uap, nchanges) > KQ_NEVENTS ? - KQ_NEVENTS : SCARG(uap, nchanges); + while ((n = SCARG(uap, nchanges)) > 0) { + if (n > nitems(kev)) + n = nitems(kev); error = copyin(SCARG(uap, changelist), kev, n * sizeof(struct kevent)); if (error) @@ -635,11 +658,36 @@ sys_kevent(struct proc *p, void *v, regi kqueue_scan_setup(&scan, kq); FRELE(fp, p); - error = kqueue_scan(&scan, SCARG(uap, nevents), SCARG(uap, eventlist), - tsp, kev, p, &n); + /* + * Collect as many events as we can. The timeout on successive + * loops is disabled (kqueue_scan() becomes non-blocking). + */ + total = 0; + error = 0; + while ((n = SCARG(uap, nevents) - total) > 0) { + if (n > nitems(kev)) + n = nitems(kev); + ready = kqueue_scan(&scan, n, kev, tsp, p, &error); + if (ready == 0) + break; + error = copyout(kev, SCARG(uap, eventlist) + total, + sizeof(struct kevent) * ready); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + total += ready; + if (error || ready < n) + break; + /* + * Successive loops are only necessary if there are more + * ready events to gather, so they don't need to block. + */ + tsp = &ts; + timespecclear(tsp); + } kqueue_scan_finish(&scan); - - *retval = n; + *retval = total; return (error); done: @@ -893,22 +941,22 @@ kqueue_sleep(struct kqueue *kq, struct t return (error); } +/* + * Scan the kqueue, blocking if necessary until the target time is reached. + * If tsp is NULL we block indefinitely. If tsp->ts_secs/nsecs are both + * 0 we do not block at all. + */ int kqueue_scan(struct kqueue_scan_state *scan, int maxevents, - struct kevent *ulistp, struct timespec *tsp, struct kevent *kev, - struct proc *p, int *retval) + struct kevent *kevp, struct timespec *tsp, struct proc *p, int *errorp) { struct kqueue *kq = scan->kqs_kq; - struct kevent *kevp; struct knote *kn; - int s, count, nkev, error = 0; + int s, count, nkev = 0, error = 0; - nkev = 0; - kevp = kev; count = maxevents; if (count == 0) goto done; - retry: KASSERT(count == maxevents); KASSERT(nkev == 0); @@ -958,14 +1006,8 @@ retry: while (count) { kn = TAILQ_NEXT(&scan->kqs_start, kn_tqe); if (kn->kn_filter == EVFILT_MARKER) { - if (kn == &scan->kqs_end) { - TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, - kn_tqe); - splx(s); - if (scan->kqs_nevent == 0) - goto retry; - goto done; - } + if (kn == &scan->kqs_end) + break; /* Move start marker past another thread's marker. */ TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); @@ -1001,6 +1043,9 @@ retry: count--; scan->kqs_nevent++; + /* + * Post-event action on the note + */ if (kn->kn_flags & EV_ONESHOT) { splx(s); kn->kn_fop->f_detach(kn); @@ -1026,35 +1071,14 @@ retry: knote_release(kn); } kqueue_check(kq); - if (nkev == KQ_NEVENTS) { - splx(s); -#ifdef KTRACE - if (KTRPOINT(p, KTR_STRUCT)) - ktrevent(p, kev, nkev); -#endif - error = copyout(kev, ulistp, - sizeof(struct kevent) * nkev); - ulistp += nkev; - nkev = 0; - kevp = kev; - s = splhigh(); - if (error) - break; - } } TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); splx(s); + if (scan->kqs_nevent == 0) + goto retry; done: - if (nkev != 0) { -#ifdef KTRACE - if (KTRPOINT(p, KTR_STRUCT)) - ktrevent(p, kev, nkev); -#endif - error = copyout(kev, ulistp, - sizeof(struct kevent) * nkev); - } - *retval = maxevents - count; - return (error); + *errorp = error; + return (nkev); } void @@ -1144,7 +1168,7 @@ kqueue_stat(struct file *fp, struct stat } void -kqueue_terminate(struct proc *p, struct kqueue *kq) +kqueue_purge(struct proc *p, struct kqueue *kq) { int i; @@ -1156,6 +1180,12 @@ kqueue_terminate(struct proc *p, struct for (i = 0; i < kq->kq_knhashmask + 1; i++) knote_remove(p, &kq->kq_knhash[i]); } +} + +void +kqueue_terminate(struct proc *p, struct kqueue *kq) +{ + kqueue_purge(p, kq); kq->kq_state |= KQ_DYING; kqueue_wakeup(kq); Index: kern/kern_exit.c =================================================================== RCS file: /cvs/src/sys/kern/kern_exit.c,v retrieving revision 1.191 diff -u -p -r1.191 kern_exit.c --- kern/kern_exit.c 16 Nov 2020 18:37:06 -0000 1.191 +++ kern/kern_exit.c 30 Nov 2020 15:30:40 -0000 @@ -184,6 +184,8 @@ exit1(struct proc *p, int xexit, int xsi if ((p->p_flag & P_THREAD) == 0) pr->ps_siglist = 0; + kqpoll_exit(p); + #if NKCOV > 0 kcov_exit(p); #endif Index: kern/sys_generic.c =================================================================== RCS file: /cvs/src/sys/kern/sys_generic.c,v retrieving revision 1.132 diff -u -p -r1.132 sys_generic.c --- kern/sys_generic.c 2 Oct 2020 15:45:22 -0000 1.132 +++ kern/sys_generic.c 30 Nov 2020 15:30:40 -0000 @@ -55,6 +55,7 @@ #include <sys/time.h> #include <sys/malloc.h> #include <sys/poll.h> +#include <sys/eventvar.h> #ifdef KTRACE #include <sys/ktrace.h> #endif @@ -66,8 +67,21 @@ #include <uvm/uvm_extern.h> -int selscan(struct proc *, fd_set *, fd_set *, int, int, register_t *); -void pollscan(struct proc *, struct pollfd *, u_int, register_t *); +/* + * Debug values: + * 1 - print implementation errors, things that should not happen. + * 2 - print ppoll(2) information, somewhat verbose + * 3 - print pselect(2) and ppoll(2) information, very verbose + */ +int kqpoll_debug = 0; +#define DPRINTFN(v, x...) if (kqpoll_debug > v) do { \ + printf("%s(%d): ", curproc->p_p->ps_comm, curproc->p_tid); \ + printf(x); \ +} while (0) + +int pselregister(struct proc *, fd_set *, int, int, int *); +int pselcollect(struct proc *, struct kevent *, fd_set *[]); + int pollout(struct pollfd *, struct pollfd *, u_int); int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *, struct timespec *, const sigset_t *, register_t *); @@ -584,11 +598,11 @@ int dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { + struct kqueue_scan_state scan; fd_mask bits[6]; fd_set *pibits[3], *pobits[3]; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int s, ncoll, error = 0; + struct timespec ts; + int error, nevents = 0; u_int ni; if (nd < 0) @@ -618,6 +632,8 @@ dopselect(struct proc *p, int nd, fd_set pobits[2] = (fd_set *)&bits[5]; } + kqpoll_init(p); + #define getbits(name, x) \ if (name && (error = copyin(name, pibits[x], ni))) \ goto done; @@ -636,43 +652,63 @@ dopselect(struct proc *p, int nd, fd_set if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - error = selscan(p, pibits[0], pobits[0], nd, ni, retval); - if (error || *retval) + /* Register kqueue events */ + if ((error = pselregister(p, pibits[0], nd, ni, &nevents) != 0)) goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); - nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "select", nsecs); - splx(s); - if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); - } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0) { + uint64_t nsecs = INFSLP; + + if (timeout != NULL) + nsecs = MAX(1, MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP)); + + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqsel", nsecs); + /* select is not restarted after signals... */ + if (error == ERESTART) + error = EINTR; + if (error == EWOULDBLOCK) + error = 0; + goto done; + } + + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maxium number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready; i++) + *retval += pselcollect(p, &kev[i], pobits); + /* + * Stop if there was an error or if we had enough + * space to collect all events that were ready. + */ + if (error || ready < count) + break; + /* + * Successive loops are only necessary if there are more + * ready events to gather, so they don't need to block. + */ + timeout = &ts; + timespecclear(timeout); + + nevents -= ready; } -done: - atomic_clearbits_int(&p->p_flag, P_SELECT); - /* select is not restarted after signals... */ - if (error == ERESTART) - error = EINTR; - if (error == EWOULDBLOCK) - error = 0; + kqueue_scan_finish(&scan); + done: #define putbits(name, x) \ if (name && (error2 = copyout(pobits[x], name, ni))) \ error = error2; @@ -694,41 +730,101 @@ done: if (pibits[0] != (fd_set *)&bits[0]) free(pibits[0], M_TEMP, 6 * ni); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nd; + return (error); } +/* + * Convert fd_set into kqueue events and register them on the + * per-thread queue. + */ int -selscan(struct proc *p, fd_set *ibits, fd_set *obits, int nfd, int ni, - register_t *retval) +pselregister(struct proc *p, fd_set *ibits, int nfd, int ni, int *nregistered) { - caddr_t cibits = (caddr_t)ibits, cobits = (caddr_t)obits; - struct filedesc *fdp = p->p_fd; - int msk, i, j, fd; + int evf[] = { EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT }; + int evff[] = { 0, 0, NOTE_OOB }; + caddr_t cibits = (caddr_t)ibits; + int msk, i, j, fd, nevents = 0, error = 0; + struct kevent kev; fd_mask bits; - struct file *fp; - int n = 0; - static const int flag[3] = { POLLIN, POLLOUT|POLL_NOHUP, POLLPRI }; for (msk = 0; msk < 3; msk++) { fd_set *pibits = (fd_set *)&cibits[msk*ni]; - fd_set *pobits = (fd_set *)&cobits[msk*ni]; for (i = 0; i < nfd; i += NFDBITS) { bits = pibits->fds_bits[i/NFDBITS]; while ((j = ffs(bits)) && (fd = i + --j) < nfd) { bits &= ~(1 << j); - if ((fp = fd_getfile(fdp, fd)) == NULL) - return (EBADF); - if ((*fp->f_ops->fo_poll)(fp, flag[msk], p)) { - FD_SET(fd, pobits); - n++; + + DPRINTFN(2, "select fd %d mask %d serial %lu\n", + fd, msk, p->p_kq_serial); + EV_SET(&kev, fd, evf[msk], + EV_ADD|EV_ENABLE|__EV_POLL, evff[msk], 0, + (void *)(p->p_kq_serial)); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, &kev, 1); +#endif + error = kqueue_register(p->p_kq, &kev, p); + switch (error) { + case 0: + nevents++; + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + error = 0; + break; + case ENXIO: /* Device has been detached */ + default: + goto bad; } - FRELE(fp, p); } } } - *retval = n; + + *nregistered = nevents; return (0); +bad: + DPRINTFN(0, "select fd %u filt %d error %d\n", (int)kev.ident, + kev.filter, error); + return (error); +} + +/* + * Convert given kqueue event into corresponding select(2) bit. + */ +int +pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3]) +{ +#ifdef DIAGNOSTIC + /* Filter out and lazily delete spurious events */ + if ((unsigned long)kevp->udata != p->p_kq_serial) { + DPRINTFN(0, "select fd %u mismatched serial %lu\n", + (int)kevp->ident, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } +#endif + + switch (kevp->filter) { + case EVFILT_READ: + FD_SET(kevp->ident, pobits[0]); + break; + case EVFILT_WRITE: + FD_SET(kevp->ident, pobits[1]); + break; + case EVFILT_EXCEPT: + FD_SET(kevp->ident, pobits[2]); + break; + default: + KASSERT(0); + } + + DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter); + return (1); } int @@ -935,6 +1031,8 @@ doppoll(struct proc *p, struct pollfd *f } sz = nfds * sizeof(*pl); + + kqpoll_init(p); if ((error = copyin(fds, pl, sz)) != 0) goto bad; Index: sys/event.h =================================================================== RCS file: /cvs/src/sys/sys/event.h,v retrieving revision 1.47 diff -u -p -r1.47 event.h --- sys/event.h 25 Nov 2020 13:49:00 -0000 1.47 +++ sys/event.h 30 Nov 2020 15:30:41 -0000 @@ -215,6 +215,8 @@ struct timespec; extern const struct filterops sig_filtops; extern const struct filterops dead_filtops; +extern void kqpoll_init(struct proc *); +extern void kqpoll_exit(struct proc *); extern void knote(struct klist *list, long hint); extern void knote_activate(struct knote *); extern void knote_remove(struct proc *p, struct knlist *list); @@ -223,9 +225,10 @@ extern void knote_processexit(struct pro extern int kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p); extern int kqueue_scan(struct kqueue_scan_state *, int, struct kevent *, - struct timespec *, struct kevent *, struct proc *, int *); + struct timespec *, struct proc *, int *); extern void kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *); extern void kqueue_scan_finish(struct kqueue_scan_state *); +extern void kqueue_purge(struct proc *, struct kqueue *); extern int filt_seltrue(struct knote *kn, long hint); extern int seltrue_kqfilter(dev_t, struct knote *); extern void klist_insert(struct klist *, struct knote *); Index: sys/proc.h =================================================================== RCS file: /cvs/src/sys/sys/proc.h,v retrieving revision 1.301 diff -u -p -r1.301 proc.h --- sys/proc.h 10 Nov 2020 17:26:54 -0000 1.301 +++ sys/proc.h 30 Nov 2020 15:30:41 -0000 @@ -320,6 +320,7 @@ struct process { struct kcov_dev; struct lock_list_entry; +struct kqueue; struct p_inentry { u_long ie_serial; @@ -382,6 +383,8 @@ struct proc { struct plimit *p_limit; /* [l] read ref. of p_p->ps_limit */ struct kcov_dev *p_kd; /* kcov device handle */ struct lock_list_entry *p_sleeplocks; /* WITNESS lock tracking */ + struct kqueue *p_kq; /* for select/poll */ + unsigned long p_kq_serial; /* for select/poll */ int p_siglist; /* [a] Signals arrived & not delivered*/