This patch revises the way how kqueue notifies select(2) about the
closing of monitored file descriptors. Instead of returning EBADF through
kqueue_scan(), the error is conveyed in struct kevent. This is excessive
for select(2) but should be useful with kqueue-based poll(2).

The idea is the following: When a file descriptor is closed, the related
knotes with __EV_POLL are turned into one-shot error notifications and
queued for delivery.

The knotes used for error notification are "unregistered" - they are
reachable only through the queue of active events. This keeps the
implementation relatively simple and does not add much burden on the
normal workings of kqueue. However, the code has to be more careful in
that it will not leak knotes. Unfortunately, the unregistering permits
a potentially unlimited number of pending error notifications. This is
mitigated by flushing the active queue at the start of a kqpoll scan.
Each kqpoll instance can have at most as many registered knotes as there
are open file descriptors in the process' file descriptor table.

At this point it is good to note that select(2), poll(2), or kevent(2)
interfaces are not specific about multithreading. On OpenBSD, select(2)
and poll(2) have unblocked if another thread has closed one of the
monitored file descriptors. In principle, these interfaces could be
silent about the closing, like kevent(2) does. However, I think it
prudent to keep the existing behaviour for the time being and hence
offer this diff to assist with kqueue-based poll(2).

Index: kern/kern_event.c
===================================================================
RCS file: src/sys/kern/kern_event.c,v
retrieving revision 1.156
diff -u -p -r1.156 kern_event.c
--- kern/kern_event.c   25 Dec 2020 12:59:52 -0000      1.156
+++ kern/kern_event.c   27 Dec 2020 14:47:02 -0000
@@ -86,6 +86,15 @@ int  kqueue_stat(struct file *fp, struct 
 int    kqueue_close(struct file *fp, struct proc *p);
 void   kqueue_wakeup(struct kqueue *kq);
 
+#ifdef KQUEUE_DEBUG
+void   kqueue_do_check(struct kqueue *kq, const char *func, int line);
+#define kqueue_check(kq)       kqueue_do_check((kq), __func__, __LINE__)
+#else
+#define kqueue_check(kq)       do {} while (0)
+#endif
+
+void   kqpoll_dequeue(struct proc *p);
+
 static void    kqueue_expand_hash(struct kqueue *kq);
 static void    kqueue_expand_list(struct kqueue *kq, int fd);
 static void    kqueue_task(void *);
@@ -103,6 +112,7 @@ const struct fileops kqueueops = {
 };
 
 void   knote_attach(struct knote *kn);
+void   knote_detach(struct knote *kn);
 void   knote_drop(struct knote *kn, struct proc *p);
 void   knote_enqueue(struct knote *kn);
 void   knote_dequeue(struct knote *kn);
@@ -513,6 +523,22 @@ const struct filterops dead_filtops = {
        .f_event        = filt_dead,
 };
 
+static int
+filt_badfd(struct knote *kn, long hint)
+{
+       kn->kn_flags |= (EV_ERROR | EV_ONESHOT);
+       kn->kn_data = EBADF;
+       return (1);
+}
+
+/* For use with kqpoll. */
+const struct filterops badfd_filtops = {
+       .f_flags        = FILTEROP_ISFD,
+       .f_attach       = NULL,
+       .f_detach       = filt_deaddetach,
+       .f_event        = filt_badfd,
+};
+
 void
 kqpoll_init(void)
 {
@@ -521,10 +547,12 @@ kqpoll_init(void)
 
        if (p->p_kq != NULL) {
                /*
-                * Clear any pending error that was raised after
+                * Discard any knotes that have been enqueued after
                 * previous scan.
+                * This prevents accumulation of enqueued badfd knotes
+                * in case scan does not make progress for some reason.
                 */
-               p->p_kq->kq_error = 0;
+               kqpoll_dequeue(p);
                return;
        }
 
@@ -544,12 +572,44 @@ kqpoll_exit(void)
        if (p->p_kq == NULL)
                return;
 
+       kqueue_purge(p, p->p_kq);
+       /* Clear any detached knotes that remain in the queue. */
+       kqpoll_dequeue(p);
        kqueue_terminate(p, p->p_kq);
        KASSERT(p->p_kq->kq_refs == 1);
        KQRELE(p->p_kq);
        p->p_kq = NULL;
 }
 
+void
+kqpoll_dequeue(struct proc *p)
+{
+       struct knote *kn;
+       struct kqueue *kq = p->p_kq;
+       int s;
+
+       s = splhigh();
+       while ((kn = TAILQ_FIRST(&kq->kq_head)) != NULL) {
+               /* This kqueue should not be scanned by other threads. */
+               KASSERT(kn->kn_filter != EVFILT_MARKER);
+
+               if (!knote_acquire(kn, NULL, 0))
+                       continue;
+
+               kqueue_check(kq);
+               TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+               kn->kn_status &= ~KN_QUEUED;
+               kq->kq_count--;
+
+               splx(s);
+               kn->kn_fop->f_detach(kn);
+               knote_drop(kn, p);
+               s = splhigh();
+               kqueue_check(kq);
+       }
+       splx(s);
+}
+
 struct kqueue *
 kqueue_alloc(struct filedesc *fdp)
 {
@@ -750,9 +810,6 @@ bad:
                    func, line, kq, kq->kq_count, count, nmarker);
        }
 }
-#define kqueue_check(kq)       kqueue_do_check((kq), __func__, __LINE__)
-#else
-#define kqueue_check(kq)       do {} while (0)
 #endif
 
 int
@@ -985,15 +1042,6 @@ retry:
        }
 
        s = splhigh();
-
-       if (kq->kq_error != 0) {
-               /* Deliver the pending error. */
-               error = kq->kq_error;
-               kq->kq_error = 0;
-               splx(s);
-               goto done;
-       }
-
        if (kq->kq_count == 0) {
                /*
                 * Successive loops are only necessary if there are more
@@ -1090,6 +1138,7 @@ retry:
                                kn->kn_status |= KN_DISABLED;
                        if ((kn->kn_status & KN_QUEUED) == 0)
                                kn->kn_status &= ~KN_ACTIVE;
+                       KASSERT(kn->kn_status & KN_ATTACHED);
                        knote_release(kn);
                } else {
                        if ((kn->kn_status & KN_QUEUED) == 0) {
@@ -1098,6 +1147,7 @@ retry:
                                kn->kn_status |= KN_QUEUED;
                                TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
                        }
+                       KASSERT(kn->kn_status & KN_ATTACHED);
                        knote_release(kn);
                }
                kqueue_check(kq);
@@ -1215,7 +1265,8 @@ kqueue_purge(struct proc *p, struct kque
 void
 kqueue_terminate(struct proc *p, struct kqueue *kq)
 {
-       kqueue_purge(p, kq);
+       KASSERT(TAILQ_EMPTY(&kq->kq_head));
+
        kq->kq_state |= KQ_DYING;
        kqueue_wakeup(kq);
 
@@ -1230,6 +1281,7 @@ kqueue_close(struct file *fp, struct pro
        struct kqueue *kq = fp->f_data;
 
        KERNEL_LOCK();
+       kqueue_purge(p, kq);
        kqueue_terminate(p, kq);
        fp->f_data = NULL;
 
@@ -1398,7 +1450,6 @@ void
 knote_remove(struct proc *p, struct knlist *list, int purge)
 {
        struct knote *kn;
-       struct kqueue *kq;
        int s;
 
        while ((kn = SLIST_FIRST(list)) != NULL) {
@@ -1413,15 +1464,26 @@ knote_remove(struct proc *p, struct knli
                /*
                 * Notify poll(2) and select(2) when a monitored
                 * file descriptor is closed.
+                *
+                * This reuses the original knote for delivering the
+                * notification so as to avoid allocating memory.
+                * The knote will be reachable only through the queue
+                * of active knotes and is freed either by kqueue_scan()
+                * or kqpoll_dequeue().
                 */
                if (!purge && (kn->kn_flags & __EV_POLL) != 0) {
-                       kq = kn->kn_kq;
+                       KASSERT(kn->kn_fop->f_flags & FILTEROP_ISFD);
+                       knote_detach(kn);
+                       FRELE(kn->kn_fp, p);
+                       kn->kn_fp = NULL;
+
+                       kn->kn_fop = &badfd_filtops;
+                       kn->kn_fop->f_event(kn, 0);
+                       knote_activate(kn);
                        s = splhigh();
-                       if (kq->kq_error == 0) {
-                               kq->kq_error = EBADF;
-                               kqueue_wakeup(kq);
-                       }
+                       knote_release(kn);
                        splx(s);
+                       continue;
                }
 
                knote_drop(kn, p);
@@ -1481,6 +1543,14 @@ knote_attach(struct knote *kn)
 {
        struct kqueue *kq = kn->kn_kq;
        struct knlist *list;
+       int s;
+
+       KASSERT(kn->kn_status & KN_PROCESSING);
+       KASSERT((kn->kn_status & KN_ATTACHED) == 0);
+
+       s = splhigh();
+       kn->kn_status |= KN_ATTACHED;
+       splx(s);
 
        if (kn->kn_fop->f_flags & FILTEROP_ISFD) {
                KASSERT(kq->kq_knlistsize > kn->kn_id);
@@ -1492,25 +1562,42 @@ knote_attach(struct knote *kn)
        SLIST_INSERT_HEAD(list, kn, kn_link);
 }
 
-/*
- * should be called at spl == 0, since we don't want to hold spl
- * while calling FRELE and pool_put.
- */
 void
-knote_drop(struct knote *kn, struct proc *p)
+knote_detach(struct knote *kn)
 {
        struct kqueue *kq = kn->kn_kq;
        struct knlist *list;
        int s;
 
-       KASSERT(kn->kn_filter != EVFILT_MARKER);
+       KASSERT(kn->kn_status & KN_PROCESSING);
+
+       if ((kn->kn_status & KN_ATTACHED) == 0)
+               return;
 
        if (kn->kn_fop->f_flags & FILTEROP_ISFD)
                list = &kq->kq_knlist[kn->kn_id];
        else
                list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)];
-
        SLIST_REMOVE(list, kn, knote, kn_link);
+
+       s = splhigh();
+       kn->kn_status &= ~KN_ATTACHED;
+       splx(s);
+}
+
+/*
+ * should be called at spl == 0, since we don't want to hold spl
+ * while calling FRELE and pool_put.
+ */
+void
+knote_drop(struct knote *kn, struct proc *p)
+{
+       int s;
+
+       KASSERT(kn->kn_filter != EVFILT_MARKER);
+
+       knote_detach(kn);
+
        s = splhigh();
        if (kn->kn_status & KN_QUEUED)
                knote_dequeue(kn);
@@ -1519,7 +1606,7 @@ knote_drop(struct knote *kn, struct proc
                wakeup(kn);
        }
        splx(s);
-       if (kn->kn_fop->f_flags & FILTEROP_ISFD)
+       if ((kn->kn_fop->f_flags & FILTEROP_ISFD) && kn->kn_fp != NULL)
                FRELE(kn->kn_fp, p);
        pool_put(&knote_pool, kn);
 }
Index: kern/sys_generic.c
===================================================================
RCS file: src/sys/kern/sys_generic.c,v
retrieving revision 1.134
diff -u -p -r1.134 sys_generic.c
--- kern/sys_generic.c  26 Dec 2020 14:26:48 -0000      1.134
+++ kern/sys_generic.c  27 Dec 2020 14:47:02 -0000
@@ -80,7 +80,7 @@ int kqpoll_debug = 0;
 }
 
 int pselregister(struct proc *, fd_set *[], int, int *);
-int pselcollect(struct proc *, struct kevent *, fd_set *[]);
+int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
 
 int pollout(struct pollfd *, struct pollfd *, u_int);
 int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
@@ -601,7 +601,7 @@ dopselect(struct proc *p, int nd, fd_set
        struct kqueue_scan_state scan;
        fd_mask bits[6];
        fd_set *pibits[3], *pobits[3];
-       int error, nevents = 0;
+       int error, ncollected = 0, nevents = 0;
        u_int ni;
 
        if (nd < 0)
@@ -692,8 +692,8 @@ dopselect(struct proc *p, int nd, fd_set
                        ktrevent(p, kev, ready);
 #endif
                /* Convert back events that are ready. */
-               for (i = 0; i < ready; i++)
-                       *retval += pselcollect(p, &kev[i], pobits);
+               for (i = 0; i < ready && error == 0; i++)
+                       error = pselcollect(p, &kev[i], pobits, &ncollected);
                /*
                 * Stop if there was an error or if we had enough
                 * space to collect all events that were ready.
@@ -704,6 +704,7 @@ dopselect(struct proc *p, int nd, fd_set
                nevents -= ready;
        }
        kqueue_scan_finish(&scan);
+       *retval = ncollected;
  done:
 #define        putbits(name, x) \
        if (name && (error2 = copyout(pobits[x], name, ni))) \
@@ -790,7 +791,8 @@ bad:
  * Convert given kqueue event into corresponding select(2) bit.
  */
 int
-pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3])
+pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3],
+    int *ncollected)
 {
 #ifdef DIAGNOSTIC
        /* Filter out and lazily delete spurious events */
@@ -803,6 +805,12 @@ pselcollect(struct proc *p, struct keven
        }
 #endif
 
+       if (kevp->flags & EV_ERROR) {
+               DPRINTFN(2, "select fd %d filt %d error %d\n",
+                   (int)kevp->ident, kevp->filter, (int)kevp->data);
+               return (kevp->data);
+       }
+
        switch (kevp->filter) {
        case EVFILT_READ:
                FD_SET(kevp->ident, pobits[0]);
@@ -816,9 +824,10 @@ pselcollect(struct proc *p, struct keven
        default:
                KASSERT(0);
        }
+       (*ncollected)++;
 
        DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter);
-       return (1);
+       return (0);
 }
 
 int
Index: sys/event.h
===================================================================
RCS file: src/sys/sys/event.h,v
retrieving revision 1.52
diff -u -p -r1.52 event.h
--- sys/event.h 25 Dec 2020 12:59:53 -0000      1.52
+++ sys/event.h 27 Dec 2020 14:47:02 -0000
@@ -195,6 +195,8 @@ struct knote {
 #define KN_DETACHED    0x0008                  /* knote is detached */
 #define KN_PROCESSING  0x0010                  /* knote is being processed */
 #define KN_WAITING     0x0020                  /* waiting on processing */
+#define KN_ATTACHED    0x0040                  /* knote is attached to
+                                                * a knlist of the kqueue */
 
 #define kn_id          kn_kevent.ident
 #define kn_filter      kn_kevent.filter
Index: sys/eventvar.h
===================================================================
RCS file: src/sys/sys/eventvar.h,v
retrieving revision 1.10
diff -u -p -r1.10 eventvar.h
--- sys/eventvar.h      18 Dec 2020 16:16:14 -0000      1.10
+++ sys/eventvar.h      27 Dec 2020 14:47:02 -0000
@@ -59,7 +59,6 @@ struct kqueue {
 #define KQ_SEL         0x01
 #define KQ_SLEEP       0x02
 #define KQ_DYING       0x04
-       int             kq_error;               /* pending error */
 };
 
 #endif /* !_SYS_EVENTVAR_H_ */

Reply via email to