On 30/04/18(Mon) 14:36, Paul Irofti wrote: > On Sat, Apr 28, 2018 at 07:40:38PM +0300, Paul Irofti wrote: > > On Sun, Apr 22, 2018 at 03:34:45PM +0300, Paul Irofti wrote: > > > Here is a new semaphore implementation that uses atomic operations, > > > where available, and futexes for locking.
I'm happy to see more rthread internals based on futex(2). > > > The reason we need this is to make semaphores safe for asynchronous > > > signals. Could you describe with words what is currently broken and how the version below fixes it? > > > [...] > > Here is the same diff adapted to what happened in -current this week. > > All required bits are now in, so the current diff neatly contains just the > > implementation. > > People started testing this, thank you! > > Here is an updated diff that: > > Add barriers, debug printfs and handle EAGAIN. You can't use printf() for this purpose. Please use _rthread_debug() like the rest of the library :) It also has the advantage of being enabled via the RTHREAD_DEBUG env variable. > The barriers bit is mostly from visa@, thanks! > > tb@ found offlineimap faulting because the futex syscall returned EAGAIN > and sem_wait exited. Loop again on EAGAIN. > > Debug printfs are for future debugging. > > Martin, is handling EAGAIN like this correct? I don't know, what is it supposed to do? > new file mode 100644 > index 00000000000..e5c8015d27c > --- /dev/null > +++ lib/librthread/rthread_sem_atomic.c I'm not fan of the _atomic suffix. What about rthread_semaphore.c? > @@ -0,0 +1,445 @@ > +/* $OpenBSD$ */ > +/* > + * Copyright (c) 2004,2005,2013 Ted Unangst <t...@openbsd.org> > + * Copyright (c) 2018 Paul Irofti <piro...@openbsd.org> > + * All Rights Reserved. > + * > + * Permission to use, copy, modify, and distribute this software for any > + * purpose with or without fee is hereby granted, provided that the above > + * copyright notice and this permission notice appear in all copies. > + * > + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES > + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF > + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR > + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES > + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN > + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF > + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. > + */ > + > +#include <sys/types.h> > +#include <sys/mman.h> > +#include <sys/stat.h> > +#include <sys/atomic.h> > +#include <sys/time.h> > +#include <sys/futex.h> > + > +#include <errno.h> > +#include <fcntl.h> > +#include <sha2.h> > +#include <stdarg.h> > +#include <stdlib.h> > +#include <stdio.h> > +#include <string.h> > +#include <unistd.h> > + > +#include <pthread.h> > + > +#include "rthread.h" > +#include "cancel.h" /* in libc/include */ > +#include "synch.h" > + > +#ifdef SEM_ATOMIC_DEBUG > +#define DPRINTF(x) printf x > +#else > +#define DPRINTF(x) > +#endif > + > +#define SHARED_IDENT ((void *)-1) > + > +/* SHA256_DIGEST_STRING_LENGTH includes nul */ > +/* "/tmp/" + sha256 + ".sem" */ > +#define SEM_PATH_SIZE (5 + SHA256_DIGEST_STRING_LENGTH + 4) > + > +/* long enough to be hard to guess */ > +#define SEM_RANDOM_NAME_LEN 10 > + > +/* > + * Size of memory to be mmap()'ed by named semaphores. > + * Should be >= SEM_PATH_SIZE and page-aligned. > + */ > +#define SEM_MMAP_SIZE _thread_pagesize > + > +/* > + * Internal implementation of semaphores > + */ > +int > +_sem_wait(sem_t sem, int can_eintr, const struct timespec *abstime, > + int *delayed_cancel) > +{ > + int r = 0; > + int v = sem->value, ov; Reading `sem->value' here is superfluous. > + > + for (;;) { > + while ((v = sem->value) > 0) { > + ov = atomic_cas_uint(&sem->value, v, v - 1); > + if (ov == v) { > + membar_enter_after_atomic(); > + return 0; > + } > + } > + if (r) > + break; > + > + atomic_inc_int(&sem->waitcount); Can you keep the destroy check without adding two atomic operations here? > + r = _twait(&sem->value, 0, CLOCK_REALTIME, abstime); > + /* ignore interruptions other than cancelation */ > + if ((r == ECANCELED && *delayed_cancel == 0) || > + (r == EINTR && !can_eintr) || r == EAGAIN) > + r = 0; > + atomic_dec_int(&sem->waitcount); > + } > + > + return r; > +} > + > +/* always increment count */ > +int > +_sem_post(sem_t sem) > +{ This function is called only once, so you can fold it in sem_post() :) > + membar_exit_before_atomic(); Why is the membar needed here? Does that mean we're missing two in pthread_cond_signal() and pthread_cond_broadcast()? > + atomic_inc_int(&sem->value); > + _wake(&sem->value, 1); > + return 0; > +} > + > +/* > + * exported semaphores > + */ > +int > +sem_init(sem_t *semp, int pshared, unsigned int value) > +{ > + sem_t sem; > + > + if (value > SEM_VALUE_MAX) { > + errno = EINVAL; > + return (-1); > + } > + > + if (pshared) { > + errno = EPERM; > + return (-1); > +#ifdef notyet > + char name[SEM_RANDOM_NAME_LEN]; > + sem_t *sempshared; > + int i; > + > + for (;;) { > + for (i = 0; i < SEM_RANDOM_NAME_LEN - 1; i++) > + name[i] = arc4random_uniform(255) + 1; > + name[SEM_RANDOM_NAME_LEN - 1] = '\0'; > + sempshared = sem_open(name, O_CREAT | O_EXCL, 0, value); > + if (sempshared != SEM_FAILED) > + break; > + if (errno == EEXIST) > + continue; > + if (errno != EPERM) > + errno = ENOSPC; > + return (-1); > + } > + > + /* unnamed semaphore should not be opened twice */ > + if (sem_unlink(name) == -1) { > + sem_close(sempshared); > + errno = ENOSPC; > + return (-1); > + } > + > + *semp = *sempshared; > + free(sempshared); > + return (0); > +#endif > + } > + > + sem = calloc(1, sizeof(*sem)); > + if (!sem) { > + errno = ENOSPC; > + return (-1); > + } > + sem->lock = _SPINLOCK_UNLOCKED; `sem->lock' is no longer unused and can be #ifdef out. > + sem->value = value; > + *semp = sem; > + > + return (0); > +} > + > +int > +sem_destroy(sem_t *semp) > +{ > + sem_t sem; > + > + if (!_threads_ready) /* for SEM_MMAP_SIZE */ > + _rthread_init(); > + > + if (!semp || !(sem = *semp)) { > + errno = EINVAL; > + return (-1); > + } > + > + if (sem->waitcount) { > +#define MSG "sem_destroy on semaphore with waiters!\n" > + write(2, MSG, sizeof(MSG) - 1); > +#undef MSG > + errno = EBUSY; > + return (-1); > + } > + > + *semp = NULL; > + if (sem->shared) > + munmap(sem, SEM_MMAP_SIZE); > + else > + free(sem); > + > + return (0); > +} > + > +int > +sem_getvalue(sem_t *semp, int *sval) > +{ > + sem_t sem; > + > + if (!semp || !(sem = *semp)) { > + errno = EINVAL; > + return (-1); > + } > + > + //membar_exit_before_atomic(); > + //*sval = atomic_add_int_nv(&sem->value, 0); What does that mean? > + *sval = sem->value; > + > + return (0); > +}