Il 16/05/2013 12:15, Stefan Hajnoczi ha scritto:
> On Wed, May 15, 2013 at 05:48:47PM +0200, Paolo Bonzini wrote:
>> This emulates Win32 manual-reset events using futexes or conditional
>> variables.  Typical ways to use them are with multi-producer,
>> single-consumer data structures, to test for a complex condition whose
>> elements come from different threads:
>>
>>     for (;;) {
>>         qemu_event_reset(ev);
>>         ... test complex condition ...
>>         if (condition is true) {
>>             break;
>>         }
>>         qemu_event_wait(ev);
>>     }
>>
>> Alternatively:
>>
>>     ... compute condition ...
>>     if (condition) {
>>         do {
>>             qemu_event_wait(ev);
>>             qemu_event_reset(ev);
>>             ... compute condition ...
>>         } while(condition);
>>         qemu_event_set(ev);
>>     }
>>
>> QemuEvent provides a very fast userspace path in the common case when
>> no other thread is waiting, or the event is not changing state.  It
>> is used to report RCU quiescent states to the thread calling
>> synchronize_rcu (the latter being the single consumer), and to report
>> call_rcu invocations to the thread that receives them.
> 
> It would be nice to describe the need for the Linux futex code.  pthread
> mutex/condvars are implemented in terms of futexes already, so how much
> benefit is there - I thought they stay in userspace in the non-contended
> case too?

Yes, but they are still measurably slower, around 20%.  I don't have
around the program I wrote for QemuEvent, because I did the measurement
~2 years ago.  However, here is one that tests a similar synchronization
primitive (not exactly the same as QemuEvent).  You can run it like this:

$ ./a.out -c 100 10 4 40 # with mutex/condvar
4 child processes, avg think time 100 msec
Avg event distance 10 msec. Running for 40 sec
........................................
waits            1404   0.663 us (fast)
  slow waits     152
signal           3890   0.784 us
  fast path      0                             <<<< (this is bogus)

$ ./a.out 100 10 4 40 # with futex
4 child processes, avg think time 100 msec
Avg event distance 10 msec. Running for 40 sec
........................................
waits            1383   0.635 us (fast)
  slow waits     147
signal           3924   0.640 us
  fast path      3791


Paolo
#define _GNU_SOURCE 1
#include "pthread.h"
#include <limits.h>
#include <string.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <math.h>

#ifdef __linux__
#include <sys/syscall.h>
#include <linux/futex.h>
#define HAVE_FUTEX 1
#endif

#ifdef _WIN32
#include <windows.h>
#include <mmsystem.h>
#else
#define HAVE_AFFINITY 1
#endif

typedef struct EvCounter {
    int ctr;
    int waiters;
    int fast_signal;
    pthread_mutex_t lock;
    pthread_cond_t cond;
} EvCounter;

typedef int EvCounterState;

void evcounter_get(EvCounterState *state, EvCounter *evcounter)
{
    *state = evcounter->ctr;
}

#ifdef HAVE_FUTEX
#define futex(...)      syscall(__NR_futex, __VA_ARGS__)

void _evcounter_init(EvCounter *evcounter)
{
    evcounter->ctr = evcounter->waiters = 0;
}

int _evcounter_wait(EvCounterState *state, EvCounter *evcounter)
{
    int fast = 1;
    __sync_fetch_and_add(&evcounter->waiters, 1);
    while (*state == evcounter->ctr) {
        futex(&evcounter->ctr, FUTEX_WAIT_PRIVATE, *state, NULL);
        fast = 0;
    }
    __sync_fetch_and_add(&evcounter->waiters, -1);
    *state = evcounter->ctr;
    return fast;
}

void _evcounter_signal(EvCounter *evcounter)
{
    __sync_fetch_and_add(&evcounter->ctr, 1);
    if (evcounter->waiters != 0) {
        futex(&evcounter->ctr, FUTEX_WAKE_PRIVATE, INT_MAX);
    } else {
	evcounter->fast_signal++;
    }
}
#endif

void _evcounter_init_cond(EvCounter *evcounter)
{
    evcounter->ctr = 0;
    pthread_mutex_init (&evcounter->lock, NULL);
    pthread_cond_init (&evcounter->cond, NULL);
}

int _evcounter_wait_cond(EvCounterState *state, EvCounter *evcounter)
{
    int fast = 1;
    pthread_mutex_lock(&evcounter->lock);
    while (*state == evcounter->ctr) {
        pthread_cond_wait(&evcounter->cond, &evcounter->lock);
        fast = 0;
    }
    pthread_mutex_unlock(&evcounter->lock);
    *state = evcounter->ctr;
    return fast;
}

void _evcounter_signal_cond(EvCounter *evcounter)
{
    pthread_mutex_lock(&evcounter->lock);
    evcounter->ctr++;
    pthread_cond_broadcast(&evcounter->cond);
    pthread_mutex_unlock(&evcounter->lock);
}


void (*evcounter_init)(EvCounter *);
int (*evcounter_wait)(EvCounterState *, EvCounter *);
void (*evcounter_signal)(EvCounter *);

EvCounter evc;
volatile int stop = 0;
int affinity = 0;

long t_wait;
long n_slow_wait;
long n_wait;
long t_signal;
long n_signal;

#define CLOCK_RES (sizeof(long) == 8 ? 1000000000LL : 1000000LL)

static inline long long getclk()
{
#ifndef _WIN32
    struct timespec ts;
    clock_gettime(CLOCK_MONOTONIC, &ts);
    return (ts.tv_sec * 1000000000LL + ts.tv_nsec) / (1000000000LL / CLOCK_RES);
#else
    static LARGE_INTEGER freq, init;
    LARGE_INTEGER counter;
    if (!init.QuadPart) {
        QueryPerformanceFrequency(&freq);
        QueryPerformanceCounter(&init);
    }
    QueryPerformanceCounter(&counter);
    return (counter.QuadPart - init.QuadPart) * CLOCK_RES / freq.QuadPart;
#endif
}

#ifdef _WIN32
void sleep(int secs)
{
    Sleep (secs * 1000);
}
#endif

void exp_usleep(int avg)
{
    if (avg) {
	double x = rand() / ((double)RAND_MAX + 1.0);
	int usecs = (int) -avg * log(x);
#ifdef _WIN32
	Sleep(usecs / 1000);
#else
	usleep(usecs);
#endif
    }
}

void *generator(void *pavg)
{
    int avg = *(int *)pavg;
    while (stop != 2) {
	exp_usleep(avg);
	long long t1 = getclk();
	evcounter_signal(&evc);
	t_signal += getclk() - t1;
	n_signal++;
    }
    return NULL;
}

void *consumer(void *pavg)
{
    EvCounterState s;
    int avg = *(int *)pavg;
    evcounter_get(&s, &evc);
    while (stop == 0) {
	long long t1 = getclk();
	if (evcounter_wait(&s, &evc)) {
	    __sync_fetch_and_add (&t_wait, getclk() - t1);
	    __sync_fetch_and_add (&n_wait, 1);
        } else {
            __sync_fetch_and_add(&n_slow_wait, 1);
        }
	exp_usleep(avg);
    }
    return NULL;
}

void
create_thread(pthread_t *p, void *(*f)(void *), void *arg)
{
    pthread_create(p, NULL, f, arg);
#ifdef HAVE_AFFINITY
    if (affinity) {
        static int nproc, i;
        cpu_set_t cpu_set;
        if (nproc == 0)
	    nproc = sysconf (_SC_NPROCESSORS_ONLN);

        CPU_ZERO(&cpu_set);
        CPU_SET(i % nproc, &cpu_set);
        i++;

        pthread_setaffinity_np(*p, sizeof(cpu_set), &cpu_set);
    }
#endif
}

int main(int argc, char **argv)
{
#ifdef HAVE_FUTEX
    evcounter_init = _evcounter_init;
    evcounter_wait = _evcounter_wait;
    evcounter_signal = _evcounter_signal;
#else
    evcounter_init = _evcounter_init_cond;
    evcounter_wait = _evcounter_wait_cond;
    evcounter_signal = _evcounter_signal_cond;
#endif
    while (argv[1] && argv[1][0] == '-') { 
        if (strchr(argv[1], 'c')) {
	    evcounter_init = _evcounter_init_cond;
	    evcounter_wait = _evcounter_wait_cond;
	    evcounter_signal = _evcounter_signal_cond;
        }
        if (!strcmp(argv[1], "-a")) {
            affinity = 1;
        }
	argc--, argv++;
    }

    evcounter_init (&evc);

    srand(time(NULL));

    int think_avg = argc > 1 ? atoi(argv[1]) * 1000 : 100000;
    int throughput_avg = argc > 2 ? atoi(argv[2]) * 1000 : 10000;
    int n = argc > 3 ? atoi(argv[3]) : 10;
    int len = argc > 4 ? atoi(argv[4]) : 10;

    printf ("%d child processes, avg think time %d msec\n", n, think_avg/1000);
    printf ("Avg event distance %d msec. Running for %d sec\n",
	    throughput_avg/1000, len);

    pthread_t g, c[n];
    create_thread(&g, generator, &throughput_avg);

    int i;
    for (i = 0; i < n; i++)
	create_thread(&c[i], consumer, &think_avg);

    for (i = 0; i < len; i++) {
	write(1, ".", 1);
	sleep (1);
    }

    write (1, "\n", 1);

    stop = 1;
    for (i = 0; i < n; i++)
	pthread_join(c[i], NULL);
    stop = 2;
    pthread_join(g, NULL);

    printf ("waits\t\t %ld	%.3f us (fast case)\n", n_wait,
            (double)t_wait/(CLOCK_RES/1000000.0)/n_wait);
    printf ("  slow waits\t %ld\n", n_slow_wait);
    printf ("signal\t\t %ld	%.3f us\n", n_signal,
            (double)t_signal/(CLOCK_RES/1000000.0)/n_signal);
    printf ("  fast path\t %ld\n", evc.fast_signal);
    exit (0);
}

Reply via email to