Diff below is the last version of my kqueue diff to allow grabbing the
solock() and possibly sleeping, inside kqueue_scan().
Sleeping in kqueue_scan() is not a problem since threads are already
doing it when no event are available, in that case `kq_count' is 0.
When this happens, a thread going to sleep first remove its own marker
from the given kqueue's TAILQ.
The diff below allows threads to sleep *inside* the event collection
loop. For that marker are now flagged with EVFILT_MARKER and thread
are taught how to skip other threads markers.
Now if a thread, tA, is sleeping inside `f_event' it "owns" the corres-
ponding knote. That means the knote is no longer present in the kqueue's
TAILQ and kq_count no longer accounts for it.
However a sibling thread, tB, could decide it's a good time to close the
file descriptor associated to this knote. So to make sure tA finished
processing the knote before tB frees it, we use the KN_PROCESSING flag.
That means fdrelease() will sleeps until it can acquires the knote.
This logic is taken from DragonflyBSD where it is used as part of a
finer grained locking scheme. We still rely on the KERNEL_LOCK() for
serializing accesses to the data structures.
The sys_close() vs sys_kevent() race described above has been triggered
by abieber@ with www/gitea and homestead with previous version of this
diff. Base doesn't seem to be good enough to exercise it.
Second part of the diff below enables the solock() inside the filters.
Comments and tests welcome.
Index: kern/kern_event.c
===================================================================
RCS file: /cvs/src/sys/kern/kern_event.c,v
retrieving revision 1.81
diff -u -p -r1.81 kern_event.c
--- kern/kern_event.c 11 Oct 2017 08:06:56 -0000 1.81
+++ kern/kern_event.c 11 Oct 2017 10:40:10 -0000
@@ -84,6 +84,8 @@ void knote_attach(struct knote *kn, stru
void knote_drop(struct knote *kn, struct proc *p, struct filedesc *fdp);
void knote_enqueue(struct knote *kn);
void knote_dequeue(struct knote *kn);
+int knote_acquire(struct knote *kn);
+int knote_release(struct knote *kn);
#define knote_alloc() ((struct knote *)pool_get(&knote_pool, PR_WAITOK))
#define knote_free(kn) pool_put(&knote_pool, (kn))
@@ -759,27 +761,43 @@ start:
goto done;
}
+ marker.kn_filter = EVFILT_MARKER;
+ marker.kn_status = KN_PROCESSING;
TAILQ_INSERT_TAIL(&kq->kq_head, &marker, kn_tqe);
while (count) {
kn = TAILQ_FIRST(&kq->kq_head);
if (kn == &marker) {
- TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+ TAILQ_REMOVE(&kq->kq_head, &marker, kn_tqe);
splx(s);
if (count == maxevents)
goto retry;
goto done;
}
+ if (kn->kn_filter == EVFILT_MARKER) {
+ struct knote *other_marker = kn;
+
+ /* Move some other threads marker past this kn */
+ kn = TAILQ_NEXT(other_marker, kn_tqe);
+ TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+ TAILQ_INSERT_BEFORE(other_marker, kn, kn_tqe);
+ continue;
+ }
+
+ if (!knote_acquire(kn))
+ continue;
TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
kq->kq_count--;
if (kn->kn_status & KN_DISABLED) {
kn->kn_status &= ~KN_QUEUED;
+ knote_release(kn);
continue;
}
if ((kn->kn_flags & EV_ONESHOT) == 0 &&
kn->kn_fop->f_event(kn, 0) == 0) {
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
+ knote_release(kn);
continue;
}
*kevp = kn->kn_kevent;
@@ -799,9 +817,11 @@ start:
if (kn->kn_flags & EV_DISPATCH)
kn->kn_status |= KN_DISABLED;
kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
+ knote_release(kn);
} else {
TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
kq->kq_count++;
+ knote_release(kn);
}
count--;
if (nkev == KQ_NEVENTS) {
@@ -956,6 +976,41 @@ kqueue_wakeup(struct kqueue *kq)
}
/*
+ * Acquire a knote, return non-zero on success, 0 on failure.
+ *
+ * If we cannot acquire the knote we sleep and return 0. The knote
+ * may be stale on return in this case and the caller must restart
+ * whatever loop they are in.
+ */
+int
+knote_acquire(struct knote *kn)
+{
+ if (kn->kn_status & KN_PROCESSING) {
+ kn->kn_status |= KN_WAITING;
+ tsleep(kn, 0, "kqepts", hz);
+ /* knote may be stale now */
+ return (0);
+ }
+ kn->kn_status |= KN_PROCESSING;
+ return (1);
+}
+
+/*
+ * Release an acquired knote, clearing KN_PROCESSING.
+ */
+int
+knote_release(struct knote *kn)
+{
+ if (kn->kn_status & KN_WAITING) {
+ kn->kn_status &= ~KN_WAITING;
+ wakeup(kn);
+ }
+ kn->kn_status &= ~KN_PROCESSING;
+ /* kn should not be accessed anymore */
+ return (0);
+}
+
+/*
* activate one knote.
*/
void
@@ -986,6 +1041,8 @@ knote_remove(struct proc *p, struct klis
struct knote *kn;
while ((kn = SLIST_FIRST(list)) != NULL) {
+ if (!knote_acquire(kn))
+ continue;
kn->kn_fop->f_detach(kn);
knote_drop(kn, p, p->p_fd);
}
Index: kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.205
diff -u -p -r1.205 uipc_socket.c
--- kern/uipc_socket.c 15 Sep 2017 19:29:28 -0000 1.205
+++ kern/uipc_socket.c 11 Oct 2017 10:40:11 -0000
@@ -1921,8 +1921,10 @@ int
filt_soread(struct knote *kn, long hint)
{
struct socket *so = kn->kn_fp->f_data;
- int rv;
+ int s, rv;
+ if (!(hint & NOTE_SUBMIT))
+ s = solock(so);
kn->kn_data = so->so_rcv.sb_cc;
#ifdef SOCKET_SPLICE
if (isspliced(so)) {
@@ -1940,6 +1942,8 @@ filt_soread(struct knote *kn, long hint)
} else {
rv = (kn->kn_data >= so->so_rcv.sb_lowat);
}
+ if (!(hint & NOTE_SUBMIT))
+ sounlock(s);
return rv;
}
@@ -1960,8 +1964,10 @@ int
filt_sowrite(struct knote *kn, long hint)
{
struct socket *so = kn->kn_fp->f_data;
- int rv;
+ int s, rv;
+ if (!(hint & NOTE_SUBMIT))
+ s = solock(so);
kn->kn_data = sbspace(so, &so->so_snd);
if (so->so_state & SS_CANTSENDMORE) {
kn->kn_flags |= EV_EOF;
@@ -1977,6 +1983,8 @@ filt_sowrite(struct knote *kn, long hint
} else {
rv = (kn->kn_data >= so->so_snd.sb_lowat);
}
+ if (!(hint & NOTE_SUBMIT))
+ sounlock(s);
return (rv);
}
@@ -1985,8 +1993,13 @@ int
filt_solisten(struct knote *kn, long hint)
{
struct socket *so = kn->kn_fp->f_data;
+ int s;
+ if (!(hint & NOTE_SUBMIT))
+ s = solock(so);
kn->kn_data = so->so_qlen;
+ if (!(hint & NOTE_SUBMIT))
+ sounlock(s);
return (kn->kn_data != 0);
}
Index: miscfs/fifofs/fifo_vnops.c
===================================================================
RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v
retrieving revision 1.58
diff -u -p -r1.58 fifo_vnops.c
--- miscfs/fifofs/fifo_vnops.c 24 Jul 2017 15:07:39 -0000 1.58
+++ miscfs/fifofs/fifo_vnops.c 11 Oct 2017 10:40:11 -0000
@@ -545,8 +545,10 @@ int
filt_fiforead(struct knote *kn, long hint)
{
struct socket *so = (struct socket *)kn->kn_hook;
- int rv;
+ int s, rv;
+ if (!(hint & NOTE_SUBMIT))
+ s = solock(so);
kn->kn_data = so->so_rcv.sb_cc;
if (so->so_state & SS_CANTRCVMORE) {
kn->kn_flags |= EV_EOF;
@@ -555,6 +557,8 @@ filt_fiforead(struct knote *kn, long hin
kn->kn_flags &= ~EV_EOF;
rv = (kn->kn_data > 0);
}
+ if (!(hint & NOTE_SUBMIT))
+ sounlock(s);
return (rv);
}
@@ -573,8 +577,10 @@ int
filt_fifowrite(struct knote *kn, long hint)
{
struct socket *so = (struct socket *)kn->kn_hook;
- int rv;
+ int s, rv;
+ if (!(hint & NOTE_SUBMIT))
+ s = solock(so);
kn->kn_data = sbspace(so, &so->so_snd);
if (so->so_state & SS_CANTSENDMORE) {
kn->kn_flags |= EV_EOF;
@@ -583,6 +589,8 @@ filt_fifowrite(struct knote *kn, long hi
kn->kn_flags &= ~EV_EOF;
rv = (kn->kn_data >= so->so_snd.sb_lowat);
}
+ if (!(hint & NOTE_SUBMIT))
+ sounlock(s);
return (rv);
}
Index: sys/event.h
===================================================================
RCS file: /cvs/src/sys/sys/event.h,v
retrieving revision 1.26
diff -u -p -r1.26 event.h
--- sys/event.h 26 Jun 2017 09:32:32 -0000 1.26
+++ sys/event.h 11 Oct 2017 10:40:11 -0000
@@ -80,13 +80,6 @@ struct kevent {
#define EV_ERROR 0x4000 /* error, data contains errno */
/*
- * hint flag for in-kernel use - must not equal any existing note
- */
-#ifdef _KERNEL
-#define NOTE_SUBMIT 0x01000000 /* initial knote submission */
-#endif
-
-/*
* data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
*/
#define NOTE_LOWAT 0x0001 /* low water mark */
@@ -128,6 +121,13 @@ SLIST_HEAD(klist, knote);
#ifdef _KERNEL
+#define EVFILT_MARKER 0xF /* placemarker for tailq */
+
+/*
+ * hint flag for in-kernel use - must not equal any existing note
+ */
+#define NOTE_SUBMIT 0x01000000 /* initial knote submission */
+
#define KNOTE(list_, hint) do { \
struct klist *list = (list_); \
if ((list) != NULL) \
@@ -164,10 +164,12 @@ struct knote {
} kn_ptr;
const struct filterops *kn_fop;
void *kn_hook;
-#define KN_ACTIVE 0x01 /* event has been triggered */
-#define KN_QUEUED 0x02 /* event is on queue */
-#define KN_DISABLED 0x04 /* event is disabled */
-#define KN_DETACHED 0x08 /* knote is detached */
+#define KN_ACTIVE 0x0001 /* event has been triggered */
+#define KN_QUEUED 0x0002 /* event is on queue */
+#define KN_DISABLED 0x0004 /* event is disabled */
+#define KN_DETACHED 0x0008 /* knote is detached */
+#define KN_PROCESSING 0x0010 /* event processing in prog */
+#define KN_WAITING 0x0020 /* waiting on processing */
#define kn_id kn_kevent.ident
#define kn_filter kn_kevent.filter
Index: sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
retrieving revision 1.76
diff -u -p -r1.76 socketvar.h
--- sys/socketvar.h 1 Sep 2017 15:05:31 -0000 1.76
+++ sys/socketvar.h 11 Oct 2017 10:40:11 -0000
@@ -186,10 +186,7 @@ static inline long
sbspace(struct socket *so, struct sockbuf *sb)
{
KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
-#if 0
- /* XXXSMP kqueue_scan() calling filt_sowrite() cannot sleep. */
soassertlocked(so);
-#endif
return lmin(sb->sb_hiwat - sb->sb_cc, sb->sb_mbmax - sb->sb_mbcnt);
}