This diff introduces a struct for kqueue scan state. It eases making scans piecewise by keeping track of the scan's end point. The end point prevents the scan from recollecting events that are already being reported to the userspace.
Below is an overview of the goal. It is achieved by combining this diff with mpi@'s kqueue_scan() refactoring. kqueue_scan_setup(&scan, kq); while (room for events) { ... ready = kqueue_scan(&scan, ...); if (ready == 0) break; ... } kqueue_scan_finish(&scan); The diff takes a conservative approach with the scan start marker and reinserts it for each round in kqueue_scan(). As the diff does not provide any immediate improvement, it will wait after release. OK? Index: kern/kern_event.c =================================================================== RCS file: src/sys/kern/kern_event.c,v retrieving revision 1.131 diff -u -p -r1.131 kern_event.c --- kern/kern_event.c 7 Apr 2020 13:27:51 -0000 1.131 +++ kern/kern_event.c 20 Apr 2020 14:17:59 -0000 @@ -62,7 +62,7 @@ void KQREF(struct kqueue *); void KQRELE(struct kqueue *); int kqueue_sleep(struct kqueue *, struct timespec *); -int kqueue_scan(struct kqueue *kq, int maxevents, +int kqueue_scan(struct kqueue_scan_state *scan, int maxevents, struct kevent *ulistp, struct timespec *timeout, struct proc *p, int *retval); @@ -529,6 +529,7 @@ out: int sys_kevent(struct proc *p, void *v, register_t *retval) { + struct kqueue_scan_state scan; struct filedesc* fdp = p->p_fd; struct sys_kevent_args /* { syscallarg(int) fd; @@ -612,8 +613,10 @@ sys_kevent(struct proc *p, void *v, regi KQREF(kq); FRELE(fp, p); - error = kqueue_scan(kq, SCARG(uap, nevents), SCARG(uap, eventlist), + kqueue_scan_setup(&scan, kq); + error = kqueue_scan(&scan, SCARG(uap, nevents), SCARG(uap, eventlist), tsp, p, &n); + kqueue_scan_finish(&scan); KQRELE(kq); *retval = n; return (error); @@ -870,11 +873,12 @@ kqueue_sleep(struct kqueue *kq, struct t } int -kqueue_scan(struct kqueue *kq, int maxevents, struct kevent *ulistp, - struct timespec *tsp, struct proc *p, int *retval) +kqueue_scan(struct kqueue_scan_state *scan, int maxevents, + struct kevent *ulistp, struct timespec *tsp, struct proc *p, int *retval) { struct kevent *kevp; - struct knote mend, mstart, *kn; + struct knote *kn; + struct kqueue *kq = scan->kqs_kq; int s, count, nkev = 0, error = 0; struct kevent kev[KQ_NEVENTS]; @@ -882,9 +886,6 @@ kqueue_scan(struct kqueue *kq, int maxev if (count == 0) goto done; - memset(&mstart, 0, sizeof(mstart)); - memset(&mend, 0, sizeof(mend)); - retry: if (kq->kq_state & KQ_DYING) { error = EBADF; @@ -894,7 +895,8 @@ retry: kevp = &kev[0]; s = splhigh(); if (kq->kq_count == 0) { - if (tsp != NULL && !timespecisset(tsp)) { + if ((tsp != NULL && !timespecisset(tsp)) || + scan->kqs_nevent != 0) { splx(s); error = 0; goto done; @@ -910,27 +912,40 @@ retry: goto done; } - mstart.kn_filter = EVFILT_MARKER; - mstart.kn_status = KN_PROCESSING; - TAILQ_INSERT_HEAD(&kq->kq_head, &mstart, kn_tqe); - mend.kn_filter = EVFILT_MARKER; - mend.kn_status = KN_PROCESSING; - TAILQ_INSERT_TAIL(&kq->kq_head, &mend, kn_tqe); + /* + * Put the end marker in the queue to limit the scan to the events + * that are currently active. This prevents events from being + * recollected if they reactivate during scan. + * + * If a partial scan has been performed already but no events have + * been collected, reposition the end marker to make any new events + * reachable. + */ + if (!scan->kqs_queued) { + TAILQ_INSERT_TAIL(&kq->kq_head, &scan->kqs_end, kn_tqe); + scan->kqs_queued = 1; + } else if (scan->kqs_nevent == 0) { + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_end, kn_tqe); + TAILQ_INSERT_TAIL(&kq->kq_head, &scan->kqs_end, kn_tqe); + } + + TAILQ_INSERT_HEAD(&kq->kq_head, &scan->kqs_start, kn_tqe); while (count) { - kn = TAILQ_NEXT(&mstart, kn_tqe); + kn = TAILQ_NEXT(&scan->kqs_start, kn_tqe); if (kn->kn_filter == EVFILT_MARKER) { - if (kn == &mend) { - TAILQ_REMOVE(&kq->kq_head, &mend, kn_tqe); - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); + if (kn == &scan->kqs_end) { + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, + kn_tqe); splx(s); - if (count == maxevents) + if (scan->kqs_nevent == 0) goto retry; goto done; } /* Move start marker past another thread's marker. */ - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); - TAILQ_INSERT_AFTER(&kq->kq_head, kn, &mstart, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); + TAILQ_INSERT_AFTER(&kq->kq_head, kn, &scan->kqs_start, + kn_tqe); continue; } @@ -958,6 +973,9 @@ retry: *kevp = kn->kn_kevent; kevp++; nkev++; + count--; + scan->kqs_nevent++; + if (kn->kn_flags & EV_ONESHOT) { splx(s); kn->kn_fop->f_detach(kn); @@ -983,7 +1001,6 @@ retry: knote_release(kn); } kqueue_check(kq); - count--; if (nkev == KQ_NEVENTS) { splx(s); #ifdef KTRACE @@ -1000,8 +1017,7 @@ retry: break; } } - TAILQ_REMOVE(&kq->kq_head, &mend, kn_tqe); - TAILQ_REMOVE(&kq->kq_head, &mstart, kn_tqe); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_start, kn_tqe); splx(s); done: if (nkev != 0) { @@ -1014,6 +1030,36 @@ done: } *retval = maxevents - count; return (error); +} + +void +kqueue_scan_setup(struct kqueue_scan_state *scan, struct kqueue *kq) +{ + memset(scan, 0, sizeof(*scan)); + scan->kqs_kq = kq; + scan->kqs_start.kn_filter = EVFILT_MARKER; + scan->kqs_start.kn_status = KN_PROCESSING; + scan->kqs_end.kn_filter = EVFILT_MARKER; + scan->kqs_end.kn_status = KN_PROCESSING; +} + +void +kqueue_scan_finish(struct kqueue_scan_state *scan) +{ + struct kqueue *kq = scan->kqs_kq; + int s; + + KASSERT(scan->kqs_start.kn_filter == EVFILT_MARKER); + KASSERT(scan->kqs_start.kn_status == KN_PROCESSING); + KASSERT(scan->kqs_end.kn_filter == EVFILT_MARKER); + KASSERT(scan->kqs_end.kn_status == KN_PROCESSING); + + if (scan->kqs_queued) { + scan->kqs_queued = 0; + s = splhigh(); + TAILQ_REMOVE(&kq->kq_head, &scan->kqs_end, kn_tqe); + splx(s); + } } /* Index: sys/event.h =================================================================== RCS file: src/sys/sys/event.h,v retrieving revision 1.35 diff -u -p -r1.35 event.h --- sys/event.h 7 Apr 2020 13:27:52 -0000 1.35 +++ sys/event.h 20 Apr 2020 14:18:00 -0000 @@ -189,6 +189,15 @@ struct knote { #define kn_fp kn_ptr.p_fp }; +struct kqueue_scan_state { + struct kqueue *kqs_kq; /* kqueue of this scan */ + struct knote kqs_start; /* start marker */ + struct knote kqs_end; /* end marker */ + int kqs_nevent; /* number of events collected */ + int kqs_queued; /* if set, end marker is + * in queue */ +}; + struct proc; extern const struct filterops sig_filtops; @@ -200,6 +209,8 @@ extern void knote_fdclose(struct proc *p extern void knote_processexit(struct proc *); extern int kqueue_register(struct kqueue *kq, struct kevent *kev, struct proc *p); +extern void kqueue_scan_setup(struct kqueue_scan_state *, struct kqueue *); +extern void kqueue_scan_finish(struct kqueue_scan_state *); extern int filt_seltrue(struct knote *kn, long hint); extern int seltrue_kqfilter(dev_t, struct knote *); extern void klist_insert(struct klist *, struct knote *);