This patch revises the way how kqueue notifies select(2) about the
closing of monitored file descriptors. Instead of returning EBADF through
kqueue_scan(), the error is conveyed in struct kevent. This is excessive
for select(2) but should be useful with kqueue-based poll(2).
The idea is the following: When a file descriptor is closed, the related
knotes with __EV_POLL are turned into one-shot error notifications and
queued for delivery.
The knotes used for error notification are "unregistered" - they are
reachable only through the queue of active events. This keeps the
implementation relatively simple and does not add much burden on the
normal workings of kqueue. However, the code has to be more careful in
that it will not leak knotes. Unfortunately, the unregistering permits
a potentially unlimited number of pending error notifications. This is
mitigated by flushing the active queue at the start of a kqpoll scan.
Each kqpoll instance can have at most as many registered knotes as there
are open file descriptors in the process' file descriptor table.
At this point it is good to note that select(2), poll(2), or kevent(2)
interfaces are not specific about multithreading. On OpenBSD, select(2)
and poll(2) have unblocked if another thread has closed one of the
monitored file descriptors. In principle, these interfaces could be
silent about the closing, like kevent(2) does. However, I think it
prudent to keep the existing behaviour for the time being and hence
offer this diff to assist with kqueue-based poll(2).
Index: kern/kern_event.c
===================================================================
RCS file: src/sys/kern/kern_event.c,v
retrieving revision 1.156
diff -u -p -r1.156 kern_event.c
--- kern/kern_event.c 25 Dec 2020 12:59:52 -0000 1.156
+++ kern/kern_event.c 27 Dec 2020 14:47:02 -0000
@@ -86,6 +86,15 @@ int kqueue_stat(struct file *fp, struct
int kqueue_close(struct file *fp, struct proc *p);
void kqueue_wakeup(struct kqueue *kq);
+#ifdef KQUEUE_DEBUG
+void kqueue_do_check(struct kqueue *kq, const char *func, int line);
+#define kqueue_check(kq) kqueue_do_check((kq), __func__, __LINE__)
+#else
+#define kqueue_check(kq) do {} while (0)
+#endif
+
+void kqpoll_dequeue(struct proc *p);
+
static void kqueue_expand_hash(struct kqueue *kq);
static void kqueue_expand_list(struct kqueue *kq, int fd);
static void kqueue_task(void *);
@@ -103,6 +112,7 @@ const struct fileops kqueueops = {
};
void knote_attach(struct knote *kn);
+void knote_detach(struct knote *kn);
void knote_drop(struct knote *kn, struct proc *p);
void knote_enqueue(struct knote *kn);
void knote_dequeue(struct knote *kn);
@@ -513,6 +523,22 @@ const struct filterops dead_filtops = {
.f_event = filt_dead,
};
+static int
+filt_badfd(struct knote *kn, long hint)
+{
+ kn->kn_flags |= (EV_ERROR | EV_ONESHOT);
+ kn->kn_data = EBADF;
+ return (1);
+}
+
+/* For use with kqpoll. */
+const struct filterops badfd_filtops = {
+ .f_flags = FILTEROP_ISFD,
+ .f_attach = NULL,
+ .f_detach = filt_deaddetach,
+ .f_event = filt_badfd,
+};
+
void
kqpoll_init(void)
{
@@ -521,10 +547,12 @@ kqpoll_init(void)
if (p->p_kq != NULL) {
/*
- * Clear any pending error that was raised after
+ * Discard any knotes that have been enqueued after
* previous scan.
+ * This prevents accumulation of enqueued badfd knotes
+ * in case scan does not make progress for some reason.
*/
- p->p_kq->kq_error = 0;
+ kqpoll_dequeue(p);
return;
}
@@ -544,12 +572,44 @@ kqpoll_exit(void)
if (p->p_kq == NULL)
return;
+ kqueue_purge(p, p->p_kq);
+ /* Clear any detached knotes that remain in the queue. */
+ kqpoll_dequeue(p);
kqueue_terminate(p, p->p_kq);
KASSERT(p->p_kq->kq_refs == 1);
KQRELE(p->p_kq);
p->p_kq = NULL;
}
+void
+kqpoll_dequeue(struct proc *p)
+{
+ struct knote *kn;
+ struct kqueue *kq = p->p_kq;
+ int s;
+
+ s = splhigh();
+ while ((kn = TAILQ_FIRST(&kq->kq_head)) != NULL) {
+ /* This kqueue should not be scanned by other threads. */
+ KASSERT(kn->kn_filter != EVFILT_MARKER);
+
+ if (!knote_acquire(kn, NULL, 0))
+ continue;
+
+ kqueue_check(kq);
+ TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+ kn->kn_status &= ~KN_QUEUED;
+ kq->kq_count--;
+
+ splx(s);
+ kn->kn_fop->f_detach(kn);
+ knote_drop(kn, p);
+ s = splhigh();
+ kqueue_check(kq);
+ }
+ splx(s);
+}
+
struct kqueue *
kqueue_alloc(struct filedesc *fdp)
{
@@ -750,9 +810,6 @@ bad:
func, line, kq, kq->kq_count, count, nmarker);
}
}
-#define kqueue_check(kq) kqueue_do_check((kq), __func__, __LINE__)
-#else
-#define kqueue_check(kq) do {} while (0)
#endif
int
@@ -985,15 +1042,6 @@ retry:
}
s = splhigh();
-
- if (kq->kq_error != 0) {
- /* Deliver the pending error. */
- error = kq->kq_error;
- kq->kq_error = 0;
- splx(s);
- goto done;
- }
-
if (kq->kq_count == 0) {
/*
* Successive loops are only necessary if there are more
@@ -1090,6 +1138,7 @@ retry:
kn->kn_status |= KN_DISABLED;
if ((kn->kn_status & KN_QUEUED) == 0)
kn->kn_status &= ~KN_ACTIVE;
+ KASSERT(kn->kn_status & KN_ATTACHED);
knote_release(kn);
} else {
if ((kn->kn_status & KN_QUEUED) == 0) {
@@ -1098,6 +1147,7 @@ retry:
kn->kn_status |= KN_QUEUED;
TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
}
+ KASSERT(kn->kn_status & KN_ATTACHED);
knote_release(kn);
}
kqueue_check(kq);
@@ -1215,7 +1265,8 @@ kqueue_purge(struct proc *p, struct kque
void
kqueue_terminate(struct proc *p, struct kqueue *kq)
{
- kqueue_purge(p, kq);
+ KASSERT(TAILQ_EMPTY(&kq->kq_head));
+
kq->kq_state |= KQ_DYING;
kqueue_wakeup(kq);
@@ -1230,6 +1281,7 @@ kqueue_close(struct file *fp, struct pro
struct kqueue *kq = fp->f_data;
KERNEL_LOCK();
+ kqueue_purge(p, kq);
kqueue_terminate(p, kq);
fp->f_data = NULL;
@@ -1398,7 +1450,6 @@ void
knote_remove(struct proc *p, struct knlist *list, int purge)
{
struct knote *kn;
- struct kqueue *kq;
int s;
while ((kn = SLIST_FIRST(list)) != NULL) {
@@ -1413,15 +1464,26 @@ knote_remove(struct proc *p, struct knli
/*
* Notify poll(2) and select(2) when a monitored
* file descriptor is closed.
+ *
+ * This reuses the original knote for delivering the
+ * notification so as to avoid allocating memory.
+ * The knote will be reachable only through the queue
+ * of active knotes and is freed either by kqueue_scan()
+ * or kqpoll_dequeue().
*/
if (!purge && (kn->kn_flags & __EV_POLL) != 0) {
- kq = kn->kn_kq;
+ KASSERT(kn->kn_fop->f_flags & FILTEROP_ISFD);
+ knote_detach(kn);
+ FRELE(kn->kn_fp, p);
+ kn->kn_fp = NULL;
+
+ kn->kn_fop = &badfd_filtops;
+ kn->kn_fop->f_event(kn, 0);
+ knote_activate(kn);
s = splhigh();
- if (kq->kq_error == 0) {
- kq->kq_error = EBADF;
- kqueue_wakeup(kq);
- }
+ knote_release(kn);
splx(s);
+ continue;
}
knote_drop(kn, p);
@@ -1481,6 +1543,14 @@ knote_attach(struct knote *kn)
{
struct kqueue *kq = kn->kn_kq;
struct knlist *list;
+ int s;
+
+ KASSERT(kn->kn_status & KN_PROCESSING);
+ KASSERT((kn->kn_status & KN_ATTACHED) == 0);
+
+ s = splhigh();
+ kn->kn_status |= KN_ATTACHED;
+ splx(s);
if (kn->kn_fop->f_flags & FILTEROP_ISFD) {
KASSERT(kq->kq_knlistsize > kn->kn_id);
@@ -1492,25 +1562,42 @@ knote_attach(struct knote *kn)
SLIST_INSERT_HEAD(list, kn, kn_link);
}
-/*
- * should be called at spl == 0, since we don't want to hold spl
- * while calling FRELE and pool_put.
- */
void
-knote_drop(struct knote *kn, struct proc *p)
+knote_detach(struct knote *kn)
{
struct kqueue *kq = kn->kn_kq;
struct knlist *list;
int s;
- KASSERT(kn->kn_filter != EVFILT_MARKER);
+ KASSERT(kn->kn_status & KN_PROCESSING);
+
+ if ((kn->kn_status & KN_ATTACHED) == 0)
+ return;
if (kn->kn_fop->f_flags & FILTEROP_ISFD)
list = &kq->kq_knlist[kn->kn_id];
else
list = &kq->kq_knhash[KN_HASH(kn->kn_id, kq->kq_knhashmask)];
-
SLIST_REMOVE(list, kn, knote, kn_link);
+
+ s = splhigh();
+ kn->kn_status &= ~KN_ATTACHED;
+ splx(s);
+}
+
+/*
+ * should be called at spl == 0, since we don't want to hold spl
+ * while calling FRELE and pool_put.
+ */
+void
+knote_drop(struct knote *kn, struct proc *p)
+{
+ int s;
+
+ KASSERT(kn->kn_filter != EVFILT_MARKER);
+
+ knote_detach(kn);
+
s = splhigh();
if (kn->kn_status & KN_QUEUED)
knote_dequeue(kn);
@@ -1519,7 +1606,7 @@ knote_drop(struct knote *kn, struct proc
wakeup(kn);
}
splx(s);
- if (kn->kn_fop->f_flags & FILTEROP_ISFD)
+ if ((kn->kn_fop->f_flags & FILTEROP_ISFD) && kn->kn_fp != NULL)
FRELE(kn->kn_fp, p);
pool_put(&knote_pool, kn);
}
Index: kern/sys_generic.c
===================================================================
RCS file: src/sys/kern/sys_generic.c,v
retrieving revision 1.134
diff -u -p -r1.134 sys_generic.c
--- kern/sys_generic.c 26 Dec 2020 14:26:48 -0000 1.134
+++ kern/sys_generic.c 27 Dec 2020 14:47:02 -0000
@@ -80,7 +80,7 @@ int kqpoll_debug = 0;
}
int pselregister(struct proc *, fd_set *[], int, int *);
-int pselcollect(struct proc *, struct kevent *, fd_set *[]);
+int pselcollect(struct proc *, struct kevent *, fd_set *[], int *);
int pollout(struct pollfd *, struct pollfd *, u_int);
int dopselect(struct proc *, int, fd_set *, fd_set *, fd_set *,
@@ -601,7 +601,7 @@ dopselect(struct proc *p, int nd, fd_set
struct kqueue_scan_state scan;
fd_mask bits[6];
fd_set *pibits[3], *pobits[3];
- int error, nevents = 0;
+ int error, ncollected = 0, nevents = 0;
u_int ni;
if (nd < 0)
@@ -692,8 +692,8 @@ dopselect(struct proc *p, int nd, fd_set
ktrevent(p, kev, ready);
#endif
/* Convert back events that are ready. */
- for (i = 0; i < ready; i++)
- *retval += pselcollect(p, &kev[i], pobits);
+ for (i = 0; i < ready && error == 0; i++)
+ error = pselcollect(p, &kev[i], pobits, &ncollected);
/*
* Stop if there was an error or if we had enough
* space to collect all events that were ready.
@@ -704,6 +704,7 @@ dopselect(struct proc *p, int nd, fd_set
nevents -= ready;
}
kqueue_scan_finish(&scan);
+ *retval = ncollected;
done:
#define putbits(name, x) \
if (name && (error2 = copyout(pobits[x], name, ni))) \
@@ -790,7 +791,8 @@ bad:
* Convert given kqueue event into corresponding select(2) bit.
*/
int
-pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3])
+pselcollect(struct proc *p, struct kevent *kevp, fd_set *pobits[3],
+ int *ncollected)
{
#ifdef DIAGNOSTIC
/* Filter out and lazily delete spurious events */
@@ -803,6 +805,12 @@ pselcollect(struct proc *p, struct keven
}
#endif
+ if (kevp->flags & EV_ERROR) {
+ DPRINTFN(2, "select fd %d filt %d error %d\n",
+ (int)kevp->ident, kevp->filter, (int)kevp->data);
+ return (kevp->data);
+ }
+
switch (kevp->filter) {
case EVFILT_READ:
FD_SET(kevp->ident, pobits[0]);
@@ -816,9 +824,10 @@ pselcollect(struct proc *p, struct keven
default:
KASSERT(0);
}
+ (*ncollected)++;
DPRINTFN(2, "select fd %d filt %d\n", (int)kevp->ident, kevp->filter);
- return (1);
+ return (0);
}
int
Index: sys/event.h
===================================================================
RCS file: src/sys/sys/event.h,v
retrieving revision 1.52
diff -u -p -r1.52 event.h
--- sys/event.h 25 Dec 2020 12:59:53 -0000 1.52
+++ sys/event.h 27 Dec 2020 14:47:02 -0000
@@ -195,6 +195,8 @@ struct knote {
#define KN_DETACHED 0x0008 /* knote is detached */
#define KN_PROCESSING 0x0010 /* knote is being processed */
#define KN_WAITING 0x0020 /* waiting on processing */
+#define KN_ATTACHED 0x0040 /* knote is attached to
+ * a knlist of the kqueue */
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
Index: sys/eventvar.h
===================================================================
RCS file: src/sys/sys/eventvar.h,v
retrieving revision 1.10
diff -u -p -r1.10 eventvar.h
--- sys/eventvar.h 18 Dec 2020 16:16:14 -0000 1.10
+++ sys/eventvar.h 27 Dec 2020 14:47:02 -0000
@@ -59,7 +59,6 @@ struct kqueue {
#define KQ_SEL 0x01
#define KQ_SLEEP 0x02
#define KQ_DYING 0x04
- int kq_error; /* pending error */
};
#endif /* !_SYS_EVENTVAR_H_ */