On 30/04/18(Mon) 14:36, Paul Irofti wrote:
> On Sat, Apr 28, 2018 at 07:40:38PM +0300, Paul Irofti wrote:
> > On Sun, Apr 22, 2018 at 03:34:45PM +0300, Paul Irofti wrote:
> > > Here is a new semaphore implementation that uses atomic operations,
> > > where available, and futexes for locking. 

I'm happy to see more rthread internals based on futex(2).

> > > The reason we need this is to make semaphores safe for asynchronous
> > > signals.

Could you describe with words what is currently broken and how the
version below fixes it?

> > > [...]
> > Here is the same diff adapted to what happened in -current this week.
> > All required bits are now in, so the current diff neatly contains just the
> > implementation.
> 
> People started testing this, thank you!
> 
> Here is an updated diff that:
> 
>   Add barriers, debug printfs and handle EAGAIN.

You can't use printf() for this purpose.  Please use _rthread_debug()
like the rest of the library :)  It also has the advantage of being
enabled via the RTHREAD_DEBUG env variable.

>   The barriers bit is mostly from visa@, thanks!
> 
>   tb@ found offlineimap faulting because the futex syscall returned EAGAIN
>   and sem_wait exited. Loop again on EAGAIN.
> 
>   Debug printfs are for future debugging.
> 
> Martin, is handling EAGAIN like this correct?

I don't know, what is it supposed to do? 

> new file mode 100644
> index 00000000000..e5c8015d27c
> --- /dev/null
> +++ lib/librthread/rthread_sem_atomic.c

I'm not fan of the _atomic suffix.  What about rthread_semaphore.c?

> @@ -0,0 +1,445 @@
> +/*   $OpenBSD$ */
> +/*
> + * Copyright (c) 2004,2005,2013 Ted Unangst <t...@openbsd.org>
> + * Copyright (c) 2018 Paul Irofti <piro...@openbsd.org>
> + * All Rights Reserved.
> + *
> + * Permission to use, copy, modify, and distribute this software for any
> + * purpose with or without fee is hereby granted, provided that the above
> + * copyright notice and this permission notice appear in all copies.
> + *
> + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
> + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
> + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
> + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
> + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
> + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
> + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
> + */
> +
> +#include <sys/types.h>
> +#include <sys/mman.h>
> +#include <sys/stat.h>
> +#include <sys/atomic.h>
> +#include <sys/time.h>
> +#include <sys/futex.h>
> +
> +#include <errno.h>
> +#include <fcntl.h>
> +#include <sha2.h>
> +#include <stdarg.h>
> +#include <stdlib.h>
> +#include <stdio.h>
> +#include <string.h>
> +#include <unistd.h>
> +
> +#include <pthread.h>
> +
> +#include "rthread.h"
> +#include "cancel.h"          /* in libc/include */
> +#include "synch.h"
> +
> +#ifdef SEM_ATOMIC_DEBUG
> +#define DPRINTF(x)   printf x
> +#else
> +#define DPRINTF(x)
> +#endif
> +
> +#define SHARED_IDENT ((void *)-1)
> +
> +/* SHA256_DIGEST_STRING_LENGTH includes nul */
> +/* "/tmp/" + sha256 + ".sem" */
> +#define SEM_PATH_SIZE (5 + SHA256_DIGEST_STRING_LENGTH + 4)
> +
> +/* long enough to be hard to guess */
> +#define SEM_RANDOM_NAME_LEN  10
> +
> +/*
> + * Size of memory to be mmap()'ed by named semaphores.
> + * Should be >= SEM_PATH_SIZE and page-aligned.
> + */
> +#define SEM_MMAP_SIZE        _thread_pagesize
> +
> +/*
> + * Internal implementation of semaphores
> + */
> +int
> +_sem_wait(sem_t sem, int can_eintr, const struct timespec *abstime,
> +    int *delayed_cancel)
> +{
> +     int r = 0;
> +     int v = sem->value, ov;

Reading `sem->value' here is superfluous.

> +
> +     for (;;) {
> +             while ((v = sem->value) > 0) {
> +                     ov = atomic_cas_uint(&sem->value, v, v - 1);
> +                     if (ov == v) {
> +                             membar_enter_after_atomic();
> +                             return 0;
> +                     }
> +             }
> +             if (r)
> +                     break;
> +
> +             atomic_inc_int(&sem->waitcount);

Can you keep the destroy check without adding two atomic operations here?

> +             r = _twait(&sem->value, 0, CLOCK_REALTIME, abstime);
> +             /* ignore interruptions other than cancelation */
> +             if ((r == ECANCELED && *delayed_cancel == 0) ||
> +                 (r == EINTR && !can_eintr) || r == EAGAIN)
> +                     r = 0;
> +             atomic_dec_int(&sem->waitcount);
> +     }
> +
> +     return r;
> +}
> +
> +/* always increment count */
> +int
> +_sem_post(sem_t sem)
> +{

This function is called only once, so you can fold it in sem_post() :)

> +     membar_exit_before_atomic();

Why is the membar needed here?  Does that mean we're missing two in 
pthread_cond_signal() and pthread_cond_broadcast()?

> +     atomic_inc_int(&sem->value);
> +     _wake(&sem->value, 1);
> +     return 0;
> +}
> +
> +/*
> + * exported semaphores
> + */
> +int
> +sem_init(sem_t *semp, int pshared, unsigned int value)
> +{
> +     sem_t sem;
> +
> +     if (value > SEM_VALUE_MAX) {
> +             errno = EINVAL;
> +             return (-1);
> +     }
> +
> +     if (pshared) {
> +             errno = EPERM;
> +             return (-1);
> +#ifdef notyet
> +             char name[SEM_RANDOM_NAME_LEN];
> +             sem_t *sempshared;
> +             int i;
> +
> +             for (;;) {
> +                     for (i = 0; i < SEM_RANDOM_NAME_LEN - 1; i++)
> +                             name[i] = arc4random_uniform(255) + 1;
> +                     name[SEM_RANDOM_NAME_LEN - 1] = '\0';
> +                     sempshared = sem_open(name, O_CREAT | O_EXCL, 0, value);
> +                     if (sempshared != SEM_FAILED)
> +                             break;
> +                     if (errno == EEXIST)
> +                             continue;
> +                     if (errno != EPERM)
> +                             errno = ENOSPC;
> +                     return (-1);
> +             }
> +
> +             /* unnamed semaphore should not be opened twice */
> +             if (sem_unlink(name) == -1) {
> +                     sem_close(sempshared);
> +                     errno = ENOSPC;
> +                     return (-1);
> +             }
> +
> +             *semp = *sempshared;
> +             free(sempshared);
> +             return (0);
> +#endif
> +     }
> +
> +     sem = calloc(1, sizeof(*sem));
> +     if (!sem) {
> +             errno = ENOSPC;
> +             return (-1);
> +     }
> +     sem->lock = _SPINLOCK_UNLOCKED;

`sem->lock' is no longer unused and can be #ifdef out.

> +     sem->value = value;
> +     *semp = sem;
> +
> +     return (0);
> +}
> +
> +int
> +sem_destroy(sem_t *semp)
> +{
> +     sem_t sem;
> +
> +     if (!_threads_ready)             /* for SEM_MMAP_SIZE */
> +             _rthread_init();
> +
> +     if (!semp || !(sem = *semp)) {
> +             errno = EINVAL;
> +             return (-1);
> +     }
> +
> +     if (sem->waitcount) {
> +#define MSG "sem_destroy on semaphore with waiters!\n"
> +             write(2, MSG, sizeof(MSG) - 1);
> +#undef MSG
> +             errno = EBUSY;
> +             return (-1);
> +     }
> +
> +     *semp = NULL;
> +     if (sem->shared)
> +             munmap(sem, SEM_MMAP_SIZE);
> +     else
> +             free(sem);
> +
> +     return (0);
> +}
> +
> +int
> +sem_getvalue(sem_t *semp, int *sval)
> +{
> +     sem_t sem;
> +
> +     if (!semp || !(sem = *semp)) {
> +             errno = EINVAL;
> +             return (-1);
> +     }
> +
> +     //membar_exit_before_atomic();
> +     //*sval = atomic_add_int_nv(&sem->value, 0);

What does that mean?

> +     *sval = sem->value;
> +
> +     return (0);
> +}

Reply via email to