Hi,

Here is a new semaphore implementation that uses atomic operations,
where available, and futexes for locking. 

The reason we need this is to make semaphores safe for asynchronous
signals.


All posixsuite tests (semaphore and sigaction) pass with this.
They do not with our current implementation.  Unfortunately I can not
get our sem_timedwait regression test to work.

  regress/lib/libpthread/semaphore/sem_timedwait

My investigation so far suggests that the current implementation is
flawed because it does not respect ERESTART and treats EINTR as if it
would be equivalent to EAGAIN. The POSIX standard and other
implementations disagree with that: ERESTART should restart the
semaphore waiting and EINTR should exit the call. The above regression
test relies on our current EINTR abuse and I think that is why it fails.
I added a few helpful printfs to that test in my diff.

I hope future discussions at the Nantes hackathon will clarify this
issue.


Otherwise I have been running with this implementation for a couple of
weeks. LaTeX, octave, chrome, firefox, thunderbird, vim, mutt, vlc,
mplayer etc. run just fine.

I would like to get wider testing to see if there are any defects left
in the current version. 


I have also added all the changes in a fork on github.

  https://github.com/bulibuta/openbsd-src/tree/sem_atomicfutex


Please test and get back to me if you see any issues.

Thank you,
Paul

diff --git lib/librthread/Makefile lib/librthread/Makefile
index 4c3e127491d..5dfb140290e 100644
--- lib/librthread/Makefile
+++ lib/librthread/Makefile
@@ -30,12 +30,19 @@ SRCS=       rthread.c \
        rthread_rwlock.c \
        rthread_rwlockattr.c \
        rthread_sched.c \
-       rthread_sem.c \
        rthread_sig.c \
        rthread_stack.c \
        rthread_spin_lock.c \
        sched_prio.c
 
+# Architectures that implement atomics
+.if ${MACHINE_ARCH} == "amd64" || ${MACHINE_ARCH} == "i386" || \
+    ${MACHINE_ARCH} == "mips64" || ${MACHINE_ARCH} == "mips64el"
+SRCS+= rthread_sem_atomic.c
+.else
+SRCS+= rthread_sem.c
+.endif
+
 SRCDIR= ${.CURDIR}/../libpthread
 .include "${SRCDIR}/man/Makefile.inc"
 .include <bsd.lib.mk>
diff --git lib/librthread/rthread_rwlock.c lib/librthread/rthread_rwlock.c
index a75e88c52e4..6fccd2fe1bd 100644
--- lib/librthread/rthread_rwlock.c
+++ lib/librthread/rthread_rwlock.c
@@ -143,8 +143,8 @@ int
 pthread_rwlock_timedrdlock(pthread_rwlock_t *lockp,
     const struct timespec *abstime)
 {
-       if (abstime == NULL || abstime->tv_sec < 0 || abstime->tv_nsec < 0 ||
-           abstime->tv_nsec > 1000000000)
+       if (abstime == NULL || abstime->tv_nsec < 0 ||
+           abstime->tv_nsec >= 1000000000)
                return (EINVAL);
        return (_rthread_rwlock_rdlock(lockp, abstime, 0));
 }
@@ -210,8 +210,8 @@ int
 pthread_rwlock_timedwrlock(pthread_rwlock_t *lockp,
     const struct timespec *abstime)
 {
-       if (abstime == NULL || abstime->tv_sec < 0 || abstime->tv_nsec < 0 ||
-           abstime->tv_nsec > 1000000000)
+       if (abstime == NULL || abstime->tv_nsec < 0 ||
+           abstime->tv_nsec >= 1000000000)
                return (EINVAL);
        return (_rthread_rwlock_wrlock(lockp, abstime, 0));
 }
diff --git lib/librthread/rthread_sem_atomic.c 
lib/librthread/rthread_sem_atomic.c
new file mode 100644
index 00000000000..f1d8eea4f88
--- /dev/null
+++ lib/librthread/rthread_sem_atomic.c
@@ -0,0 +1,434 @@
+/*     $OpenBSD$ */
+/*
+ * Copyright (c) 2004,2005,2013 Ted Unangst <t...@openbsd.org>
+ * Copyright (c) 2018 Paul Irofti <piro...@openbsd.org>
+ * All Rights Reserved.
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/types.h>
+#include <sys/mman.h>
+#include <sys/stat.h>
+#include <sys/atomic.h>
+#include <sys/time.h>
+#include <sys/futex.h>
+
+#include <errno.h>
+#include <fcntl.h>
+#include <sha2.h>
+#include <stdarg.h>
+#include <stdlib.h>
+#include <stdio.h>
+#include <string.h>
+#include <unistd.h>
+
+#include <pthread.h>
+
+#include "rthread.h"
+#include "cancel.h"            /* in libc/include */
+#include "synch.h"
+
+#define SHARED_IDENT ((void *)-1)
+
+/* SHA256_DIGEST_STRING_LENGTH includes nul */
+/* "/tmp/" + sha256 + ".sem" */
+#define SEM_PATH_SIZE (5 + SHA256_DIGEST_STRING_LENGTH + 4)
+
+/* long enough to be hard to guess */
+#define SEM_RANDOM_NAME_LEN    10
+
+/*
+ * Size of memory to be mmap()'ed by named semaphores.
+ * Should be >= SEM_PATH_SIZE and page-aligned.
+ */
+#define SEM_MMAP_SIZE  _thread_pagesize
+
+/*
+ * Internal implementation of semaphores
+ */
+int
+_sem_wait(sem_t sem, int tryonly, const struct timespec *abstime,
+    int *delayed_cancel)
+{
+       int r = 0;
+       int v = sem->value, ov;
+
+       if (tryonly) {
+               while ((v = sem->value) > 0) {
+                       ov = atomic_cas_uint(&sem->value, v, v - 1);
+                       if (ov == v)
+                               return 0;
+               }
+               return EAGAIN;
+       }
+
+       for (;;) {
+               while ((v = sem->value) > 0) {
+                       ov = atomic_cas_uint(&sem->value, v, v - 1);
+                       if (ov == v)
+                               return 0;
+               }
+               if (r)
+                       break;
+
+               atomic_inc_int(&sem->waitcount);
+               r = _twait(&sem->value, 0, CLOCK_REALTIME, abstime);
+               /*
+                * Enable to get old semaphore functionality and pass existing
+                * regression tests. Breaks at least posixsuite and lang/mono.
+                *
+                * if ((r == EAGAIN || r == EINTR) &&
+                *     (delayed_cancel == NULL || *delayed_cancel == 0))
+                */
+               if (r == EAGAIN && (delayed_cancel == NULL || *delayed_cancel 
== 0))
+                       r = 0;
+               atomic_dec_int(&sem->waitcount);
+       }
+
+       return r;
+}
+
+/* always increment count */
+int
+_sem_post(sem_t sem)
+{
+       atomic_inc_int(&sem->value);
+       _wake(&sem->value, 1);
+       return 0;
+}
+
+/*
+ * exported semaphores
+ */
+int
+sem_init(sem_t *semp, int pshared, unsigned int value)
+{
+       sem_t sem;
+
+       if (value > SEM_VALUE_MAX) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (pshared) {
+               errno = EPERM;
+               return (-1);
+#ifdef notyet
+               char name[SEM_RANDOM_NAME_LEN];
+               sem_t *sempshared;
+               int i;
+
+               for (;;) {
+                       for (i = 0; i < SEM_RANDOM_NAME_LEN - 1; i++)
+                               name[i] = arc4random_uniform(255) + 1;
+                       name[SEM_RANDOM_NAME_LEN - 1] = '\0';
+                       sempshared = sem_open(name, O_CREAT | O_EXCL, 0, value);
+                       if (sempshared != SEM_FAILED)
+                               break;
+                       if (errno == EEXIST)
+                               continue;
+                       if (errno != EPERM)
+                               errno = ENOSPC;
+                       return (-1);
+               }
+
+               /* unnamed semaphore should not be opened twice */
+               if (sem_unlink(name) == -1) {
+                       sem_close(sempshared);
+                       errno = ENOSPC;
+                       return (-1);
+               }
+
+               *semp = *sempshared;
+               free(sempshared);
+               return (0);
+#endif
+       }
+
+       sem = calloc(1, sizeof(*sem));
+       if (!sem) {
+               errno = ENOSPC;
+               return (-1);
+       }
+       sem->lock = _SPINLOCK_UNLOCKED;
+       sem->value = value;
+       *semp = sem;
+
+       return (0);
+}
+
+int
+sem_destroy(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!_threads_ready)             /* for SEM_MMAP_SIZE */
+               _rthread_init();
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (sem->waitcount) {
+#define MSG "sem_destroy on semaphore with waiters!\n"
+               write(2, MSG, sizeof(MSG) - 1);
+#undef MSG
+               errno = EBUSY;
+               return (-1);
+       }
+
+       *semp = NULL;
+       if (sem->shared)
+               munmap(sem, SEM_MMAP_SIZE);
+       else
+               free(sem);
+
+       return (0);
+}
+
+int
+sem_getvalue(sem_t *semp, int *sval)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       *sval = atomic_add_int_nv(&sem->value, 0);
+
+       return (0);
+}
+
+int
+sem_post(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       _sem_post(sem);
+
+       return (0);
+}
+
+int
+sem_wait(sem_t *semp)
+{
+       struct tib *tib = TIB_GET();
+       pthread_t self;
+       sem_t sem;
+       int r;
+       PREP_CANCEL_POINT(tib);
+
+       if (!_threads_ready)
+               _rthread_init();
+       self = tib->tib_thread;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       ENTER_DELAYED_CANCEL_POINT(tib, self);
+       r = _sem_wait(sem, 0, NULL, &self->delayed_cancel);
+       LEAVE_CANCEL_POINT_INNER(tib, r);
+
+       if (r) {
+               errno = r;
+               return (-1);
+       }
+
+       return (0);
+}
+
+int
+sem_timedwait(sem_t *semp, const struct timespec *abstime)
+{
+       struct tib *tib = TIB_GET();
+       pthread_t self;
+       sem_t sem;
+       int r;
+       PREP_CANCEL_POINT(tib);
+
+       if (!semp || !(sem = *semp) || abstime == NULL ||
+          abstime->tv_nsec < 0 || abstime->tv_nsec >= 1000000000) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       if (!_threads_ready)
+               _rthread_init();
+       self = tib->tib_thread;
+
+       ENTER_DELAYED_CANCEL_POINT(tib, self);
+       r = _sem_wait(sem, 0, abstime, &self->delayed_cancel);
+       LEAVE_CANCEL_POINT_INNER(tib, r);
+
+       if (r) {
+               errno = r == EWOULDBLOCK ? ETIMEDOUT : r;
+               return (-1);
+       }
+
+       return (0);
+}
+
+int
+sem_trywait(sem_t *semp)
+{
+       sem_t sem;
+       int r;
+
+       if (!semp || !(sem = *semp)) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       r = _sem_wait(sem, 1, NULL, NULL);
+
+       if (r) {
+               errno = r;
+               return (-1);
+       }
+
+       return (0);
+}
+
+
+static void
+makesempath(const char *origpath, char *sempath, size_t len)
+{
+       char buf[SHA256_DIGEST_STRING_LENGTH];
+
+       SHA256Data(origpath, strlen(origpath), buf);
+       snprintf(sempath, len, "/tmp/%s.sem", buf);
+}
+
+sem_t *
+sem_open(const char *name, int oflag, ...)
+{
+       char sempath[SEM_PATH_SIZE];
+       struct stat sb;
+       sem_t sem, *semp;
+       unsigned int value = 0;
+       int created = 0, fd;
+
+       if (!_threads_ready)
+               _rthread_init();
+
+       if (oflag & ~(O_CREAT | O_EXCL)) {
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+
+       if (oflag & O_CREAT) {
+               va_list ap;
+               va_start(ap, oflag);
+               /* 3rd parameter mode is not used */
+               va_arg(ap, mode_t);
+               value = va_arg(ap, unsigned);
+               va_end(ap);
+
+               if (value > SEM_VALUE_MAX) {
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+       }
+
+       makesempath(name, sempath, sizeof(sempath));
+       fd = open(sempath, O_RDWR | O_NOFOLLOW | oflag, 0600);
+       if (fd == -1)
+               return (SEM_FAILED);
+       if (fstat(fd, &sb) == -1 || !S_ISREG(sb.st_mode)) {
+               close(fd);
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+       if (sb.st_uid != geteuid()) {
+               close(fd);
+               errno = EPERM;
+               return (SEM_FAILED);
+       }
+       if (sb.st_size != (off_t)SEM_MMAP_SIZE) {
+               if (!(oflag & O_CREAT)) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+               if (sb.st_size != 0) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+               if (ftruncate(fd, SEM_MMAP_SIZE) == -1) {
+                       close(fd);
+                       errno = EINVAL;
+                       return (SEM_FAILED);
+               }
+
+               created = 1;
+       }
+       sem = mmap(NULL, SEM_MMAP_SIZE, PROT_READ | PROT_WRITE,
+           MAP_SHARED, fd, 0);
+       close(fd);
+       if (sem == MAP_FAILED) {
+               errno = EINVAL;
+               return (SEM_FAILED);
+       }
+       semp = malloc(sizeof(*semp));
+       if (!semp) {
+               munmap(sem, SEM_MMAP_SIZE);
+               errno = ENOSPC;
+               return (SEM_FAILED);
+       }
+       if (created) {
+               sem->lock = _SPINLOCK_UNLOCKED;
+               sem->value = value;
+               sem->shared = 1;
+       }
+       *semp = sem;
+
+       return (semp);
+}
+
+int
+sem_close(sem_t *semp)
+{
+       sem_t sem;
+
+       if (!semp || !(sem = *semp) || !sem->shared) {
+               errno = EINVAL;
+               return (-1);
+       }
+
+       *semp = NULL;
+       munmap(sem, SEM_MMAP_SIZE);
+       free(semp);
+
+       return (0);
+}
+
+int
+sem_unlink(const char *name)
+{
+       char sempath[SEM_PATH_SIZE];
+
+       makesempath(name, sempath, sizeof(sempath));
+       return (unlink(sempath));
+}
diff --git lib/librthread/synch.h lib/librthread/synch.h
new file mode 100644
index 00000000000..8ab379530e8
--- /dev/null
+++ lib/librthread/synch.h
@@ -0,0 +1,61 @@
+/*     $OpenBSD: synch.h,v 1.2 2017/09/05 02:40:54 guenther Exp $ */
+/*
+ * Copyright (c) 2017 Martin Pieuchot
+ *
+ * Permission to use, copy, modify, and distribute this software for any
+ * purpose with or without fee is hereby granted, provided that the above
+ * copyright notice and this permission notice appear in all copies.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+ * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+ * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+ * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+ * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+ * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+ */
+
+#include <sys/atomic.h>
+#include <sys/time.h>
+#include <sys/futex.h>
+
+static inline int
+_wake(volatile uint32_t *p, int n)
+{
+       return futex(p, FUTEX_WAKE, n, NULL, NULL);
+}
+
+static inline void
+_wait(volatile uint32_t *p, int val)
+{
+       while (*p != (uint32_t)val)
+               futex(p, FUTEX_WAIT, val, NULL, NULL);
+}
+
+static inline int
+_twait(volatile uint32_t *p, int val, clockid_t clockid, const struct timespec 
*abs)
+{
+       struct timespec rel;
+
+       if (abs == NULL)
+               return futex(p, FUTEX_WAIT, val, NULL, NULL);
+
+       if (abs->tv_nsec >= 1000000000 || clock_gettime(clockid, &rel))
+               return (EINVAL);
+
+       rel.tv_sec = abs->tv_sec - rel.tv_sec;
+       if ((rel.tv_nsec = abs->tv_nsec - rel.tv_nsec) < 0) {
+               rel.tv_sec--;
+               rel.tv_nsec += 1000000000;
+       }
+       if (rel.tv_sec < 0)
+               return (ETIMEDOUT);
+
+       return futex(p, FUTEX_WAIT, val, &rel, NULL);
+}
+
+static inline int
+_requeue(volatile uint32_t *p, int n, int m, volatile uint32_t *q)
+{
+       return futex(p, FUTEX_REQUEUE, n, (void *)(long)m, q);
+}
diff --git regress/lib/libpthread/semaphore/sem_timedwait/sem_timedwait.c 
regress/lib/libpthread/semaphore/sem_timedwait/sem_timedwait.c
index 13a1d34a181..ffeaf026101 100644
--- regress/lib/libpthread/semaphore/sem_timedwait/sem_timedwait.c
+++ regress/lib/libpthread/semaphore/sem_timedwait/sem_timedwait.c
@@ -31,7 +31,10 @@ main(int argc, char **argv)
        pthread_t th;
        struct sigaction sa;
        struct timespec ts, ts2;
+       int r;
 
+       sem_getvalue(&sem, &r);
+       printf("v = %d\n", r);
        CHECKr(clock_gettime(CLOCK_REALTIME, &ts));
        ts.tv_sec += 3;
        CHECKn(sem_timedwait(&sem, &ts));
@@ -47,6 +50,8 @@ main(int argc, char **argv)
        CHECKn(sem_destroy(&sem));
        ASSERT(errno == EBUSY);
 
+       sem_getvalue(&sem, &r);
+       printf("%s: v = %d\n", __func__, r);
        posted = 1;
        CHECKr(sem_post(&sem));
        CHECKr(pthread_join(th, NULL));
@@ -68,12 +73,19 @@ main(int argc, char **argv)
        fprintf(stderr, "posting\n");
        posted = 1;
        eintr_ok = 0;
+       sem_getvalue(&sem, &r);
+       printf("%s: v = %d\n", __func__, r);
        CHECKr(sem_post(&sem));
        CHECKr(pthread_join(th, NULL));
 
+       sem_getvalue(&sem, &r);
+       printf("%s: v = %d\n", __func__, r);
        CHECKr(clock_gettime(CLOCK_REALTIME, &ts));
        ts.tv_sec += 2;
-       CHECKn(sem_timedwait(&sem, &ts));
+       errno = 0;
+       r = sem_timedwait(&sem, &ts);
+       printf("%s: r = %d errno=%d\n", __func__, r, errno);
+       //CHECKn(sem_timedwait(&sem, &ts));
        ASSERT(errno == ETIMEDOUT);
        CHECKr(clock_gettime(CLOCK_REALTIME, &ts2));
        if (timespeccmp(&ts, &ts2, < ))
@@ -96,10 +108,16 @@ waiter(void *arg)
        struct timespec ts;
        int value;
        int r;
+       time_t tic, toc;
 
+       sem_getvalue(&sem, &r);
+       printf("%s: v = %d\n", __func__,  r);
        CHECKr(clock_gettime(CLOCK_REALTIME, &ts));
        ts.tv_sec += 3;
+       time(&tic);
        r = sem_timedwait(semp, &ts);
+       time(&toc);
+       printf("%s: t = %llds passed\n", __func__,  toc - tic);
        CHECKr(sem_getvalue(semp, &value));
        if (r == 0) {
                ASSERT(value == 0);
@@ -114,5 +132,6 @@ waiter(void *arg)
                        ASSERT(value == 0);
        }
 
+       printf("%s: ret v = %d\n", __func__,  value);
        return (NULL);
 }
diff --git sys/kern/kern_synch.c sys/kern/kern_synch.c
index ea28288d375..c65dbbd2817 100644
--- sys/kern/kern_synch.c
+++ sys/kern/kern_synch.c
@@ -614,13 +614,17 @@ sys___thrsleep(struct proc *p, void *v, register_t 
*retval)
        if (SCARG(uap, tp) != NULL) {
                if ((error = copyin(SCARG(uap, tp), &ts, sizeof(ts)))) {
                        *retval = error;
-                       return (0);
+                       return 0;
+               }
+               if (ts.tv_nsec < 0 || ts.tv_nsec >= 1000000000) {
+                       *retval = EINVAL;
+                       return 0;
                }
                SCARG(uap, tp) = &ts;
        }
 
        *retval = thrsleep(p, uap);
-       return (0);
+       return 0;
 }
 
 int
diff --git sys/kern/sys_futex.c sys/kern/sys_futex.c
index a31dfdd41a5..fdbebe0e3d8 100644
--- sys/kern/sys_futex.c
+++ sys/kern/sys_futex.c
@@ -226,7 +226,7 @@ futex_wait(uint32_t *uaddr, uint32_t val, const struct 
timespec *timeout)
 
        error = rwsleep(p, &ftlock, PUSER|PCATCH, "fsleep", (int)to_ticks);
        if (error == ERESTART)
-               error = EINTR;
+               error = EAGAIN;
        else if (error == EWOULDBLOCK) {
                /* A race occured between a wakeup and a timeout. */
                if (p->p_futex == NULL)

Reply via email to