Diff below can be seen as 3 logical parts that together change the current *select(2) implementation:
- Create & destroy a per-thread kqueue in fork1() and exit1(). - Change the kqueue_scan() interface to keep track of the end point of a scan, this is mostly from visa@. - Change dopselect() to translate inputs "fd_set" into "struct kevent", register them via kqueue_register(), wait for events via kqueue_scan() then translate them back. A visible change is that programs waiting in *select(2) will now sleep on the "kqread" or "kqsel" channel. Please test, comment and report back. Index: kern/kern_event.c =================================================================== RCS file: /cvs/src/sys/kern/kern_event.c,v retrieving revision 1.140 diff -u -p -r1.140 kern_event.c --- kern/kern_event.c 22 Jun 2020 13:14:32 -0000 1.140 +++ kern/kern_event.c 23 Jun 2020 08:44:47 -0000 @@ -64,9 +64,6 @@ void KQREF(struct kqueue *); void KQRELE(struct kqueue *); int kqueue_sleep(struct kqueue *, struct timespec *); -int kqueue_scan(struct kqueue *kq, int maxevents, - struct kevent *ulistp, struct timespec *timeout, - struct proc *p, int *retval); int kqueue_read(struct file *, struct uio *, int); int kqueue_write(struct file *, struct uio *, int); @@ -521,6 +518,14 @@ kqueue_alloc(struct filedesc *fdp) return (kq); } +void +kqueue_exit(struct proc *p) +{ + kqueue_terminate(p, p->p_kq); + kqueue_free(p->p_kq); + p->p_kq = NULL; +} + int sys_kqueue(struct proc *p, void *v, register_t *retval) { @@ -554,6 +559,7 @@ out: int sys_kevent(struct proc *p, void *v, register_t *retval) { + struct kqueue_scan_state scan; struct filedesc* fdp = p->p_fd; struct sys_kevent_args /* { syscallarg(int) fd; @@ -569,6 +575,7 @@ sys_kevent(struct proc *p, void *v, regi struct timespec ts; struct timespec *tsp = NULL; int i, n, nerrors, error; + int ready, total; struct kevent kev[KQ_NEVENTS]; if ((fp = fd_getfile(fdp, SCARG(uap, fd))) == NULL) @@ -597,9 +604,9 @@ sys_kevent(struct proc *p, void *v, regi kq = fp->f_data; nerrors = 0; - while (SCARG(uap, nchanges) > 0) { - n = SCARG(uap, nchanges) > KQ_NEVENTS ? - KQ_NEVENTS : SCARG(uap, nchanges); + while ((n = SCARG(uap, nchanges)) > 0) { + if (n > nitems(kev)) + n = nitems(kev); error = copyin(SCARG(uap, changelist), kev, n * sizeof(struct kevent)); if (error) @@ -635,12 +642,39 @@ sys_kevent(struct proc *p, void *v, regi goto done; } + KQREF(kq); FRELE(fp, p); - error = kqueue_scan(kq, SCARG(uap, nevents), SCARG(uap, eventlist), - tsp, p, &n); + /* + * Collect as many events as we can. The timeout on successive + * loops is disabled (kqueue_scan() becomes non-blocking). + */ + total = 0; + error = 0; + kqueue_scan_setup(&scan, kq); + while ((n = SCARG(uap, nevents) - total) > 0) { + if (n > nitems(kev)) + n = nitems(kev); + ready = kqueue_scan(&scan, n, kev, tsp, p, &error); + if (ready == 0) + break; + error = copyout(kev, SCARG(uap, eventlist) + total, + sizeof(struct kevent) * ready); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + total += ready; + if (error || ready < n) + break; + tsp = &ts; /* successive loops non-blocking */ + timespecclear(tsp); + } + kqueue_scan_finish(&scan); KQRELE(kq); - *retval = n; + if (error == EWOULDBLOCK) + error = 0; + *retval = total; return (error); done: @@ -894,32 +928,32 @@ kqueue_sleep(struct kqueue *kq, struct t return (error); } +/* + * Scan the kqueue, blocking if necessary until the target time is reached. + * If tsp is NULL we block indefinitely. If tsp->ts_secs/nsecs are both + * 0 we do not block at all. + */ int -kqueue_scan(struct kqueue *kq, int maxevents, struct kevent *ulistp, - struct timespec *tsp, struct proc *p, int *retval) +kqueue_scan(struct kqueue_scan_state *scan, int maxevents, + struct kevent *kevp, struct timespec *tsp, struct proc *p, int *errorp) { - struct kevent *kevp; - struct knote mend, mstart, *kn; + struct knote *kn; + struct kqueue *kq = scan->kqs_kq; int s, count, nkev = 0, error = 0; - struct kevent kev[KQ_NEVENTS]; count = maxevents; if (count == 0) goto done; - - memset(&mstart, 0, sizeof(mstart)); - memset(&mend, 0, sizeof(mend)); - retry: if (kq->kq_state & KQ_DYING) { error = EBADF; goto done; } - kevp = &kev[0]; s = splhigh(); if (kq->kq_count == 0) { - if (tsp != NULL && !timespecisset(tsp)) { + if ((tsp != NULL && !timespecisset(tsp)) || + scan->kqs_nevent != 0) { splx(s); error = 0; goto done; @@ -927,7 +961,7 @@ retry: kq->kq_state |= KQ_SLEEP; error = kqueue_sleep(kq, tsp); splx(s); - if (error == 0 || error == EWOULDBLOCK) + if (error == 0) goto retry; /* don't restart after signals... */ if (error == ERESTART) @@ -935,27 +969,40 @@ retry: goto done; } - mstart.kn_filter = EVFILT_MARKER; - mstart.kn_status = KN_PROCESSING; - TAILQ_INSERT_HEAD(&kq->kq_head, &mstart, kn_tqe); - mend.kn_filter = EVFILT_MARKER; - mend.kn_status = KN_PROCESSING; - TAILQ_INSERT_TAIL(&kq->kq_head, &mend, kn_tqe); + /* + * Put the end marker in the queue to limit the scan to the events + * that are currently active. This prevents events from being + * recollected if they reactivate during scan. + * + * If a partial scan has been performed already but no events have + * been collected, reposition the end marker to make any new events + * reachable. + */ + if (!scan->kqs_queued) { + TAILQ_INSERT_TAIL(&kq->kq_head, &scan->kqs_end, kn_tqe); + scan->kqs_queued = 1; + } else if (scan->kqs_nevent == 0) { + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_end, kn_tqe); + TAILQ_INSERT_TAIL(&kq->kq_head, &scan->kqs_end, kn_tqe); + } + + TAILQ_INSERT_HEAD(&kq->kq_head, &scan->kqs_start, kn_tqe); while (count) { - kn = TAILQ_NEXT(&mstart, kn_tqe); + kn = TAILQ_NEXT(&scan->kqs_start, kn_tqe); if (kn->kn_filter == EVFILT_MARKER) { - if (kn == &mend) { - TAILQ_REMOVE(&kq->kq_head, &mend, kn_tqe); - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); + if (kn == &scan->kqs_end) { + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, + kn_tqe); splx(s); - if (count == maxevents) + if (scan->kqs_nevent == 0) goto retry; goto done; } /* Move start marker past another thread's marker. */ - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); - TAILQ_INSERT_AFTER(&kq->kq_head, kn, &mstart, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); + TAILQ_INSERT_AFTER(&kq->kq_head, kn, &scan->kqs_start, + kn_tqe); continue; } @@ -983,6 +1030,12 @@ retry: *kevp = kn->kn_kevent; kevp++; nkev++; + count--; + scan->kqs_nevent++; + + /* + * Post-event action on the note + */ if (kn->kn_flags & EV_ONESHOT) { splx(s); kn->kn_fop->f_detach(kn); @@ -1008,37 +1061,44 @@ retry: knote_release(kn); } kqueue_check(kq); - count--; - if (nkev == KQ_NEVENTS) { - splx(s); -#ifdef KTRACE - if (KTRPOINT(p, KTR_STRUCT)) - ktrevent(p, kev, nkev); -#endif - error = copyout(kev, ulistp, - sizeof(struct kevent) * nkev); - ulistp += nkev; - nkev = 0; - kevp = &kev[0]; - s = splhigh(); - if (error) - break; - } } - TAILQ_REMOVE(&kq->kq_head, &mend, kn_tqe); - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); splx(s); + if (scan->kqs_nevent == 0) + goto retry; done: - if (nkev != 0) { -#ifdef KTRACE - if (KTRPOINT(p, KTR_STRUCT)) - ktrevent(p, kev, nkev); -#endif - error = copyout(kev, ulistp, - sizeof(struct kevent) * nkev); + *errorp = error; + return (nkev); +} + +void +kqueue_scan_setup(struct kqueue_scan_state *scan, struct kqueue *kq) +{ + memset(scan, 0, sizeof(*scan)); + scan->kqs_kq = kq; + scan->kqs_start.kn_filter = EVFILT_MARKER; + scan->kqs_start.kn_status = KN_PROCESSING; + scan->kqs_end.kn_filter = EVFILT_MARKER; + scan->kqs_end.kn_status = KN_PROCESSING; +} + +void +kqueue_scan_finish(struct kqueue_scan_state *scan) +{ + struct kqueue *kq = scan->kqs_kq; + int s; + + KASSERT(scan->kqs_start.kn_filter == EVFILT_MARKER); + KASSERT(scan->kqs_start.kn_status == KN_PROCESSING); + KASSERT(scan->kqs_end.kn_filter == EVFILT_MARKER); + KASSERT(scan->kqs_end.kn_status == KN_PROCESSING); + + if (scan->kqs_queued) { + scan->kqs_queued = 0; + s = splhigh(); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_end, kn_tqe); + splx(s); } - *retval = maxevents - count; - return (error); } /* @@ -1095,7 +1155,7 @@ kqueue_stat(struct file *fp, struct stat } void -kqueue_terminate(struct proc *p, struct kqueue *kq) +kqueue_purge(struct proc *p, struct kqueue *kq) { int i; @@ -1107,6 +1167,12 @@ kqueue_terminate(struct proc *p, struct for (i = 0; i < kq->kq_knhashmask + 1; i++) knote_remove(p, &kq->kq_knhash[i]); } +} + +void +kqueue_terminate(struct proc *p, struct kqueue *kq) +{ + kqueue_purge(p, kq); kq->kq_state |= KQ_DYING; kqueue_wakeup(kq); Index: kern/kern_exit.c =================================================================== RCS file: /cvs/src/sys/kern/kern_exit.c,v retrieving revision 1.188 diff -u -p -r1.188 kern_exit.c --- kern/kern_exit.c 18 Mar 2020 15:48:21 -0000 1.188 +++ kern/kern_exit.c 23 Jun 2020 08:44:07 -0000 @@ -184,6 +184,8 @@ exit1(struct proc *p, int xexit, int xsi if ((p->p_flag & P_THREAD) == 0) pr->ps_siglist = 0; + kqueue_exit(p); + #if NKCOV > 0 kcov_exit(p); #endif Index: kern/kern_fork.c =================================================================== RCS file: /cvs/src/sys/kern/kern_fork.c,v retrieving revision 1.225 diff -u -p -r1.225 kern_fork.c --- kern/kern_fork.c 20 Mar 2020 08:14:07 -0000 1.225 +++ kern/kern_fork.c 23 Jun 2020 08:44:07 -0000 @@ -422,6 +422,8 @@ fork1(struct proc *curp, int flags, void newptstat = malloc(sizeof(*newptstat), M_SUBPROC, M_WAITOK); p->p_tid = alloctid(); + p->p_kq = kqueue_alloc(p->p_fd); + p->p_kq_serial = arc4random(); LIST_INSERT_HEAD(&allproc, p, p_list); LIST_INSERT_HEAD(TIDHASH(p->p_tid), p, p_hash); @@ -553,6 +555,8 @@ thread_fork(struct proc *curp, void *sta cpu_fork(curp, p, stack, tcb, child_return, p); p->p_tid = alloctid(); + p->p_kq = kqueue_alloc(p->p_fd); + p->p_kq_serial = arc4random(); LIST_INSERT_HEAD(&allproc, p, p_list); LIST_INSERT_HEAD(TIDHASH(p->p_tid), p, p_hash); Index: kern/sys_generic.c =================================================================== RCS file: /cvs/src/sys/kern/sys_generic.c,v retrieving revision 1.131 diff -u -p -r1.131 sys_generic.c --- kern/sys_generic.c 20 Mar 2020 04:11:05 -0000 1.131 +++ kern/sys_generic.c 23 Jun 2020 08:45:00 -0000 @@ -55,6 +55,7 @@ #include <sys/time.h> #include <sys/malloc.h> #include <sys/poll.h> +#include <sys/eventvar.h> #ifdef KTRACE #include <sys/ktrace.h> #endif @@ -66,8 +67,20 @@ #include <uvm/uvm_extern.h> -int selscan(struct proc *, fd_set *, fd_set *, int, int, register_t *); -void pollscan(struct proc *, struct pollfd *, u_int, register_t *); +/* + * Debug values: + * 1 - print implementation errors, things that should not happen. + * 2 - print ppoll(2) information, somewhat verbose + * 3 - print pselect(2) and ppoll(2) information, very verbose + */ +int kqpoll_debug = 0; +#define DPRINTFN(v, x...) if (kqpoll_debug > v) { \ + printf("%s(%d): ", curproc->p_p->ps_comm, curproc->p_tid); \ + printf(x); \ +} +int pselregister(struct proc *, fd_set *, int, int, int *); +int pselcollect(struct proc *, struct kevent *, fd_set *[]); + int pollout(struct pollfd *, struct pollfd *, u_int); int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *, struct timespec *, const sigset_t *, register_t *); @@ -584,11 +597,11 @@ int dopselect(struct proc *p, int nd, fd_set *in, fd_set *ou, fd_set *ex, struct timespec *timeout, const sigset_t *sigmask, register_t *retval) { + struct kqueue_scan_state scan; fd_mask bits[6]; fd_set *pibits[3], *pobits[3]; - struct timespec elapsed, start, stop; - uint64_t nsecs; - int s, ncoll, error = 0; + struct timespec ts; + int error, nevents = 0; u_int ni; if (nd < 0) @@ -636,43 +649,61 @@ dopselect(struct proc *p, int nd, fd_set if (sigmask) dosigsuspend(p, *sigmask &~ sigcantmask); -retry: - ncoll = nselcoll; - atomic_setbits_int(&p->p_flag, P_SELECT); - error = selscan(p, pibits[0], pobits[0], nd, ni, retval); - if (error || *retval) + /* Register kqueue events */ + if ((error = pselregister(p, pibits[0], nd, ni, &nevents) != 0)) goto done; - if (timeout == NULL || timespecisset(timeout)) { - if (timeout != NULL) { - getnanouptime(&start); + + /* + * The poll/select family of syscalls has been designed to + * block when file descriptors are not available, even if + * there's nothing to wait for. + */ + if (nevents == 0) { + uint64_t nsecs = INFSLP; + + if (timeout != NULL) nsecs = MIN(TIMESPEC_TO_NSEC(timeout), MAXTSLP); - } else - nsecs = INFSLP; - s = splhigh(); - if ((p->p_flag & P_SELECT) == 0 || nselcoll != ncoll) { - splx(s); - goto retry; - } - atomic_clearbits_int(&p->p_flag, P_SELECT); - error = tsleep_nsec(&selwait, PSOCK | PCATCH, "select", nsecs); - splx(s); - if (timeout != NULL) { - getnanouptime(&stop); - timespecsub(&stop, &start, &elapsed); - timespecsub(timeout, &elapsed, timeout); - if (timeout->tv_sec < 0) - timespecclear(timeout); - } - if (error == 0 || error == EWOULDBLOCK) - goto retry; + + error = tsleep_nsec(&p->p_kq, PSOCK | PCATCH, "kqsel", nsecs); + /* select is not restarted after signals... */ + if (error == ERESTART) + error = EINTR; } -done: - atomic_clearbits_int(&p->p_flag, P_SELECT); - /* select is not restarted after signals... */ - if (error == ERESTART) - error = EINTR; + + /* Collect at most `nevents' possibly waiting in kqueue_scan() */ + kqueue_scan_setup(&scan, p->p_kq); + while (nevents > 0) { + struct kevent kev[KQ_NEVENTS]; + int i, ready, count; + + /* Maxium number of events per iteration */ + count = MIN(nitems(kev), nevents); + ready = kqueue_scan(&scan, count, kev, timeout, p, &error); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, kev, ready); +#endif + /* Convert back events that are ready. */ + for (i = 0; i < ready; i++) + *retval += pselcollect(p, &kev[i], pobits); + + /* + * Stop if there was an error or if we had enough + * place to collect all events that were ready. + */ + if (error || ready < count) + break; + + timeout = &ts; /* successive loops non-blocking */ + timespecclear(timeout); + + nevents -= ready; + } + kqueue_scan_finish(&scan); + if (error == EWOULDBLOCK) error = 0; + done: #define putbits(name, x) \ if (name && (error2 = copyout(pobits[x], name, ni))) \ error = error2; @@ -694,41 +725,100 @@ done: if (pibits[0] != (fd_set *)&bits[0]) free(pibits[0], M_TEMP, 6 * ni); + + kqueue_purge(p, p->p_kq); + p->p_kq_serial += nd; + return (error); } +/* + * Convert fd_set into kqueue events and register them on the + * per-thread queue. + */ int -selscan(struct proc *p, fd_set *ibits, fd_set *obits, int nfd, int ni, - register_t *retval) +pselregister(struct proc *p, fd_set *ibits, int nfd, int ni, int *nregistered) { - caddr_t cibits = (caddr_t)ibits, cobits = (caddr_t)obits; - struct filedesc *fdp = p->p_fd; - int msk, i, j, fd; + int evf[] = { EVFILT_READ, EVFILT_WRITE, EVFILT_EXCEPT }; + caddr_t cibits = (caddr_t)ibits; + int msk, i, j, fd, nevents = 0, error = 0; + struct kevent kev; fd_mask bits; - struct file *fp; - int n = 0; - static const int flag[3] = { POLLIN, POLLOUT|POLL_NOHUP, POLLPRI }; for (msk = 0; msk < 3; msk++) { fd_set *pibits = (fd_set *)&cibits[msk*ni]; - fd_set *pobits = (fd_set *)&cobits[msk*ni]; for (i = 0; i < nfd; i += NFDBITS) { bits = pibits->fds_bits[i/NFDBITS]; while ((j = ffs(bits)) && (fd = i + --j) < nfd) { bits &= ~(1 << j); - if ((fp = fd_getfile(fdp, fd)) == NULL) - return (EBADF); - if ((*fp->f_ops->fo_poll)(fp, flag[msk], p)) { - FD_SET(fd, pobits); - n++; + + DPRINTFN(2, "select fd %d mask %d serial %lu\n", + fd, msk, p->p_kq_serial); + EV_SET(&kev, fd, evf[msk], + EV_ADD|EV_ENABLE|__EV_POLL, 0, 0, + (void *)(p->p_kq_serial)); +#ifdef KTRACE + if (KTRPOINT(p, KTR_STRUCT)) + ktrevent(p, &kev, 1); +#endif + error = kqueue_register(p->p_kq, &kev, p); + switch (error) { + case 0: + nevents++; + case EOPNOTSUPP:/* No underlying kqfilter */ + case EINVAL: /* Unimplemented filter */ + error = 0; + break; + case ENXIO: /* Device has been detached */ + default: + goto bad; } - FRELE(fp, p); } } } - *retval = n; + + *nregistered = nevents; return (0); +bad: + DPRINTFN(0, "select fd %u filt %d error %d\n", (int)kev.ident, + kev.filter, error); + return (error); +} + +/* + * Convert given kqueue event into corresponding select(2) bit. + */ +int +pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3]) +{ +#ifdef DIAGNOSTIC + /* Filter out and lazily delete spurious events */ + if ((unsigned long)kevp->udata != p->p_kq_serial) { + DPRINTFN(0, "select fd %u mismatched serial %lu\n", + (int)kevp->ident, p->p_kq_serial); + kevp->flags = EV_DISABLE|EV_DELETE; + kqueue_register(p->p_kq, kevp, p); + return (0); + } +#endif + + switch (kevp->filter) { + case EVFILT_READ: + FD_SET(kevp->ident, pobits[0]); + break; + case EVFILT_WRITE: + FD_SET(kevp->ident, pobits[1]); + break; + case EVFILT_EXCEPT: + FD_SET(kevp->ident, pobits[2]); + break; + default: + KASSERT(0); + } + + DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter); + return (1); } int Index: sys/event.h =================================================================== RCS file: /cvs/src/sys/sys/event.h,v retrieving revision 1.44 diff -u -p -r1.44 event.h --- sys/event.h 22 Jun 2020 13:14:32 -0000 1.44 +++ sys/event.h 23 Jun 2020 08:44:47 -0000 @@ -200,7 +200,18 @@ struct knote { #define kn_fp kn_ptr.p_fp }; +struct kqueue_scan_state { + struct kqueue *kqs_kq; /* kqueue of this scan */ + struct knote kqs_start; /* start marker */ + struct knote kqs_end; /* end marker */ + int kqs_nevent; /* number of events collected */ + int kqs_queued; /* if set, end marker is + * in queue */ +}; + struct proc; +struct filedesc; +struct timespec; extern const struct filterops sig_filtops; extern const struct filterops dead_filtops; @@ -210,8 +221,15 @@ extern void knote_activate(struct knote extern void knote_remove(struct proc *p, struct knlist *list); extern void knote_fdclose(struct proc *p, int fd); extern void knote_processexit(struct proc *); +extern struct kqueue *kqueue_alloc(struct filedesc *); +extern void kqueue_exit(struct proc *); extern int kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p); +extern void kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *); +extern void kqueue_scan_finish(struct kqueue_scan_state *); +extern int kqueue_scan(struct kqueue_scan_state *, int, struct kevent *, + struct timespec *, struct proc *, int *); +extern void kqueue_purge(struct proc *, struct kqueue *); extern int filt_seltrue(struct knote *kn, long hint); extern int seltrue_kqfilter(dev_t, struct knote *); extern void klist_insert(struct klist *, struct knote *); Index: sys/proc.h =================================================================== RCS file: /cvs/src/sys/sys/proc.h,v retrieving revision 1.295 diff -u -p -r1.295 proc.h --- sys/proc.h 28 Apr 2020 08:29:40 -0000 1.295 +++ sys/proc.h 23 Jun 2020 08:44:07 -0000 @@ -315,6 +315,7 @@ struct process { struct kcov_dev; struct lock_list_entry; +struct kqueue; struct p_inentry { u_long ie_serial; @@ -377,6 +378,8 @@ struct proc { struct plimit *p_limit; /* [l] read ref. of p_p->ps_limit */ struct kcov_dev *p_kd; /* kcov device handle */ struct lock_list_entry *p_sleeplocks; /* WITNESS lock tracking */ + struct kqueue *p_kq; /* for select/poll */ + unsigned long p_kq_serial; /* for select/poll */ int p_siglist; /* Signals arrived but not delivered. */