Diff below is the last version of my kqueue diff to allow grabbing the
solock() and possibly sleeping, inside kqueue_scan().

Sleeping in kqueue_scan() is not a problem since threads are already
doing it when no event are available, in that case `kq_count' is 0.
When this happens, a thread going to sleep first remove its own marker
from the given kqueue's TAILQ.

The diff below allows threads to sleep *inside* the event collection
loop.  For that marker are now flagged with EVFILT_MARKER and thread
are taught how to skip other threads markers.

Now if a thread, tA, is sleeping inside `f_event' it "owns" the corres-
ponding knote.  That means the knote is no longer present in the kqueue's
TAILQ and kq_count no longer accounts for it.

However a sibling thread, tB, could decide it's a good time to close the
file descriptor associated to this knote.  So to make sure tA finished
processing the knote before tB frees it, we use the KN_PROCESSING flag.
That means fdrelease() will sleeps until it can acquires the knote.

This logic is taken from DragonflyBSD where it is used as part of a
finer grained locking scheme.  We still rely on the KERNEL_LOCK() for
serializing accesses to the data structures.

The sys_close() vs sys_kevent() race described above has been triggered
by abieber@ with www/gitea and homestead with previous version of this
diff.  Base doesn't seem to be good enough to exercise it.

Second part of the diff below enables the solock() inside the filters. 

Comments and tests welcome.

Index: kern/kern_event.c
===================================================================
RCS file: /cvs/src/sys/kern/kern_event.c,v
retrieving revision 1.81
diff -u -p -r1.81 kern_event.c
--- kern/kern_event.c   11 Oct 2017 08:06:56 -0000      1.81
+++ kern/kern_event.c   11 Oct 2017 10:40:10 -0000
@@ -84,6 +84,8 @@ void  knote_attach(struct knote *kn, stru
 void   knote_drop(struct knote *kn, struct proc *p, struct filedesc *fdp);
 void   knote_enqueue(struct knote *kn);
 void   knote_dequeue(struct knote *kn);
+int    knote_acquire(struct knote *kn);
+int    knote_release(struct knote *kn);
 #define knote_alloc() ((struct knote *)pool_get(&knote_pool, PR_WAITOK))
 #define knote_free(kn) pool_put(&knote_pool, (kn))
 
@@ -759,27 +761,43 @@ start:
                goto done;
        }
 
+       marker.kn_filter = EVFILT_MARKER;
+       marker.kn_status = KN_PROCESSING;
        TAILQ_INSERT_TAIL(&kq->kq_head, &marker, kn_tqe);
        while (count) {
                kn = TAILQ_FIRST(&kq->kq_head);
                if (kn == &marker) {
-                       TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+                       TAILQ_REMOVE(&kq->kq_head, &marker, kn_tqe);
                        splx(s);
                        if (count == maxevents)
                                goto retry;
                        goto done;
                }
+               if (kn->kn_filter == EVFILT_MARKER) {
+                       struct knote *other_marker = kn;
+
+                       /* Move some other threads marker past this kn */
+                       kn = TAILQ_NEXT(other_marker, kn_tqe);
+                       TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
+                       TAILQ_INSERT_BEFORE(other_marker, kn, kn_tqe);
+                       continue;
+               }
+
+               if (!knote_acquire(kn))
+                       continue;
 
                TAILQ_REMOVE(&kq->kq_head, kn, kn_tqe);
                kq->kq_count--;
 
                if (kn->kn_status & KN_DISABLED) {
                        kn->kn_status &= ~KN_QUEUED;
+                       knote_release(kn);
                        continue;
                }
                if ((kn->kn_flags & EV_ONESHOT) == 0 &&
                    kn->kn_fop->f_event(kn, 0) == 0) {
                        kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
+                       knote_release(kn);
                        continue;
                }
                *kevp = kn->kn_kevent;
@@ -799,9 +817,11 @@ start:
                        if (kn->kn_flags & EV_DISPATCH)
                                kn->kn_status |= KN_DISABLED;
                        kn->kn_status &= ~(KN_QUEUED | KN_ACTIVE);
+                       knote_release(kn);
                } else {
                        TAILQ_INSERT_TAIL(&kq->kq_head, kn, kn_tqe);
                        kq->kq_count++;
+                       knote_release(kn);
                }
                count--;
                if (nkev == KQ_NEVENTS) {
@@ -956,6 +976,41 @@ kqueue_wakeup(struct kqueue *kq)
 }
 
 /*
+ * Acquire a knote, return non-zero on success, 0 on failure.
+ *
+ * If we cannot acquire the knote we sleep and return 0.  The knote
+ * may be stale on return in this case and the caller must restart
+ * whatever loop they are in.
+ */
+int
+knote_acquire(struct knote *kn)
+{
+       if (kn->kn_status & KN_PROCESSING) {
+               kn->kn_status |= KN_WAITING;
+               tsleep(kn, 0, "kqepts", hz);
+               /* knote may be stale now */
+               return (0);
+       }
+       kn->kn_status |= KN_PROCESSING;
+       return (1);
+}
+
+/*
+ * Release an acquired knote, clearing KN_PROCESSING.
+ */
+int
+knote_release(struct knote *kn)
+{
+       if (kn->kn_status & KN_WAITING) {
+               kn->kn_status &= ~KN_WAITING;
+               wakeup(kn);
+       }
+       kn->kn_status &= ~KN_PROCESSING;
+       /* kn should not be accessed anymore */
+       return (0);
+}
+
+/*
  * activate one knote.
  */
 void
@@ -986,6 +1041,8 @@ knote_remove(struct proc *p, struct klis
        struct knote *kn;
 
        while ((kn = SLIST_FIRST(list)) != NULL) {
+               if (!knote_acquire(kn))
+                       continue;
                kn->kn_fop->f_detach(kn);
                knote_drop(kn, p, p->p_fd);
        }
Index: kern/uipc_socket.c
===================================================================
RCS file: /cvs/src/sys/kern/uipc_socket.c,v
retrieving revision 1.205
diff -u -p -r1.205 uipc_socket.c
--- kern/uipc_socket.c  15 Sep 2017 19:29:28 -0000      1.205
+++ kern/uipc_socket.c  11 Oct 2017 10:40:11 -0000
@@ -1921,8 +1921,10 @@ int
 filt_soread(struct knote *kn, long hint)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv;
+       int s, rv;
 
+       if (!(hint & NOTE_SUBMIT))
+               s = solock(so);
        kn->kn_data = so->so_rcv.sb_cc;
 #ifdef SOCKET_SPLICE
        if (isspliced(so)) {
@@ -1940,6 +1942,8 @@ filt_soread(struct knote *kn, long hint)
        } else {
                rv = (kn->kn_data >= so->so_rcv.sb_lowat);
        }
+       if (!(hint & NOTE_SUBMIT))
+               sounlock(s);
 
        return rv;
 }
@@ -1960,8 +1964,10 @@ int
 filt_sowrite(struct knote *kn, long hint)
 {
        struct socket *so = kn->kn_fp->f_data;
-       int rv;
+       int s, rv;
 
+       if (!(hint & NOTE_SUBMIT))
+               s = solock(so);
        kn->kn_data = sbspace(so, &so->so_snd);
        if (so->so_state & SS_CANTSENDMORE) {
                kn->kn_flags |= EV_EOF;
@@ -1977,6 +1983,8 @@ filt_sowrite(struct knote *kn, long hint
        } else {
                rv = (kn->kn_data >= so->so_snd.sb_lowat);
        }
+       if (!(hint & NOTE_SUBMIT))
+               sounlock(s);
 
        return (rv);
 }
@@ -1985,8 +1993,13 @@ int
 filt_solisten(struct knote *kn, long hint)
 {
        struct socket *so = kn->kn_fp->f_data;
+       int s;
 
+       if (!(hint & NOTE_SUBMIT))
+               s = solock(so);
        kn->kn_data = so->so_qlen;
+       if (!(hint & NOTE_SUBMIT))
+               sounlock(s);
 
        return (kn->kn_data != 0);
 }
Index: miscfs/fifofs/fifo_vnops.c
===================================================================
RCS file: /cvs/src/sys/miscfs/fifofs/fifo_vnops.c,v
retrieving revision 1.58
diff -u -p -r1.58 fifo_vnops.c
--- miscfs/fifofs/fifo_vnops.c  24 Jul 2017 15:07:39 -0000      1.58
+++ miscfs/fifofs/fifo_vnops.c  11 Oct 2017 10:40:11 -0000
@@ -545,8 +545,10 @@ int
 filt_fiforead(struct knote *kn, long hint)
 {
        struct socket *so = (struct socket *)kn->kn_hook;
-       int rv;
+       int s, rv;
 
+       if (!(hint & NOTE_SUBMIT))
+               s = solock(so);
        kn->kn_data = so->so_rcv.sb_cc;
        if (so->so_state & SS_CANTRCVMORE) {
                kn->kn_flags |= EV_EOF;
@@ -555,6 +557,8 @@ filt_fiforead(struct knote *kn, long hin
                kn->kn_flags &= ~EV_EOF;
                rv = (kn->kn_data > 0);
        }
+       if (!(hint & NOTE_SUBMIT))
+               sounlock(s);
 
        return (rv);
 }
@@ -573,8 +577,10 @@ int
 filt_fifowrite(struct knote *kn, long hint)
 {
        struct socket *so = (struct socket *)kn->kn_hook;
-       int rv;
+       int s, rv;
 
+       if (!(hint & NOTE_SUBMIT))
+               s = solock(so);
        kn->kn_data = sbspace(so, &so->so_snd);
        if (so->so_state & SS_CANTSENDMORE) {
                kn->kn_flags |= EV_EOF;
@@ -583,6 +589,8 @@ filt_fifowrite(struct knote *kn, long hi
                kn->kn_flags &= ~EV_EOF;
                rv = (kn->kn_data >= so->so_snd.sb_lowat);
        }
+       if (!(hint & NOTE_SUBMIT))
+               sounlock(s);
 
        return (rv);
 }
Index: sys/event.h
===================================================================
RCS file: /cvs/src/sys/sys/event.h,v
retrieving revision 1.26
diff -u -p -r1.26 event.h
--- sys/event.h 26 Jun 2017 09:32:32 -0000      1.26
+++ sys/event.h 11 Oct 2017 10:40:11 -0000
@@ -80,13 +80,6 @@ struct kevent {
 #define EV_ERROR       0x4000          /* error, data contains errno */
 
 /*
- * hint flag for in-kernel use - must not equal any existing note
- */
-#ifdef _KERNEL
-#define NOTE_SUBMIT    0x01000000              /* initial knote submission */
-#endif
-
-/*
  * data/hint flags for EVFILT_{READ|WRITE}, shared with userspace
  */
 #define NOTE_LOWAT     0x0001                  /* low water mark */
@@ -128,6 +121,13 @@ SLIST_HEAD(klist, knote);
 
 #ifdef _KERNEL
 
+#define EVFILT_MARKER  0xF                     /* placemarker for tailq */
+
+/*
+ * hint flag for in-kernel use - must not equal any existing note
+ */
+#define NOTE_SUBMIT    0x01000000              /* initial knote submission */
+
 #define KNOTE(list_, hint)     do { \
                                        struct klist *list = (list_); \
                                        if ((list) != NULL) \
@@ -164,10 +164,12 @@ struct knote {
        } kn_ptr;
        const struct            filterops *kn_fop;
        void                    *kn_hook;
-#define KN_ACTIVE      0x01                    /* event has been triggered */
-#define KN_QUEUED      0x02                    /* event is on queue */
-#define KN_DISABLED    0x04                    /* event is disabled */
-#define KN_DETACHED    0x08                    /* knote is detached */
+#define KN_ACTIVE      0x0001                  /* event has been triggered */
+#define KN_QUEUED      0x0002                  /* event is on queue */
+#define KN_DISABLED    0x0004                  /* event is disabled */
+#define KN_DETACHED    0x0008                  /* knote is detached */
+#define KN_PROCESSING  0x0010                  /* event processing in prog */
+#define KN_WAITING     0x0020                  /* waiting on processing */
 
 #define kn_id          kn_kevent.ident
 #define kn_filter      kn_kevent.filter
Index: sys/socketvar.h
===================================================================
RCS file: /cvs/src/sys/sys/socketvar.h,v
retrieving revision 1.76
diff -u -p -r1.76 socketvar.h
--- sys/socketvar.h     1 Sep 2017 15:05:31 -0000       1.76
+++ sys/socketvar.h     11 Oct 2017 10:40:11 -0000
@@ -186,10 +186,7 @@ static inline long
 sbspace(struct socket *so, struct sockbuf *sb)
 {
        KASSERT(sb == &so->so_rcv || sb == &so->so_snd);
-#if 0
-       /* XXXSMP kqueue_scan() calling filt_sowrite() cannot sleep. */
        soassertlocked(so);
-#endif
        return lmin(sb->sb_hiwat - sb->sb_cc, sb->sb_mbmax - sb->sb_mbcnt);
 }
 

Reply via email to