On Mar 10, 2005, at 9:05 PM, Charles Lockhart wrote:

Jim Thompson wrote:

pthread_cond_broadcast() will (attempt) to waken *ALL* threads waiting on that condition variable. If there is more than one, they'll race, and the first one that gets past the mutex 'wins'. pthread_cond_signal() will attempt to waken *one* thread (typically the first
thread waiting on the condition variable.)

What do you mean 'wins'? Maybe I don't understand correctly, but if I have three threads waiting on a conditional, and I do a broadcast on that conditional, they should ALL wake up, right?

Yes, but then two of them won't get (the outter) mutex which essentially forced my former code to execute single-threaded. The one that has or gets the mutex 'wins' (and gets to continue running).

Of course, before you said:

One of the threads sleeps on a conditional wait ( pthread_cond_wait(&my_cond, &my_mutex)), and when a different thread calls pthread_cond_broadcast(&my_cond), I kind of expect the first thread to wake it's shiny smiling face and do some work. But it doesn't. Shouldn't it? It used to.

And I'll have to ask, "Are you 100% sure that the first thread is sleeping on the CV?" The pthread_cond_broadcast() and pthread_cond_signal() functions have no effect if there are no threads currently blocked on the CV.

If all (worker?) threads are busy handling previous requests, when a new one arrives, the signaling of the condition variable will do nothing (since all threads are busy doing other things, NOT waiting on the condition variable), and after all the worker (?) threads finish handling their current request, they come back to wait on the variable, which won't necessarily be signaled again (for example, if no new requests arrive). Thus, there is at least one request pending, while all handling threads are blocked, waiting for a signal, which will never arrive.

I can't read your code, but this describes one interpretation of the problem you may be having, given what you've said, though violating the assumption that the thread is, indeed, sleeping on the CV.

The typical response to this problem is to set some integer variable to denote the number of pending requests, and have each thread check the value of this variable before waiting on the CV. If this variable's value is positive, some request is pending, and the thread should go and handle it, instead of going to sleep on the CV. Additionally, a thread that handles a request, should reduce the value of this variable by one, to keep the count correct.

I've attached a more complete example, though the code here is still imperfect.

I mean, I don't see anything to indicate that there's any particular order, but they should all wake up. right? Wait, uh, yeah, that's even what it says in the man page. Whereas pthread_cond_signal wakes up one thread, but if there are multiple threads waiting, there's no provision for determining which one.

Typically you get the thread at the top of the list associated with that CV. Now, please pardon me while I get pedantic.

The first version of the pthreads standard (IEEE POSIX 1003.1c standard (1995)) says pthread_cond_signal() will wake "one" thread, but it doesn't say which one, as you allow.

The "Unified Unix Specification" (IEEE POSIX 1003-2001 (and 2004) says "at least one", which would allow a conforming implementation to wake *all* threads waiting on a given CV, just like pthread_cond_broadcast(). To see why this was changed, read this little tidbit from the 2001 (and 2004) versions of 1003.1:

------
On a multi-processor, it may be impossible for an implementation of pthread_cond_signal() to avoid the unblocking of more than one thread blocked on a condition variable. For example, consider the following partial implementation of pthread_cond_wait() and pthread_cond_signal(), executed by two threads in the order given. One thread is trying to wait on the condition variable, another is concurrently executing pthread_cond_signal(), while a third thread is already waiting.

pthread_cond_wait(mutex, cond):
    value = cond->value; /* 1 */
    pthread_mutex_unlock(mutex); /* 2 */
    pthread_mutex_lock(cond->mutex); /* 10 */
    if (value == cond->value) { /* 11 */
        me->next_cond = cond->waiter;
        cond->waiter = me;
        pthread_mutex_unlock(cond->mutex);
        unable_to_run(me);
    } else
        pthread_mutex_unlock(cond->mutex); /* 12 */
    pthread_mutex_lock(mutex); /* 13 */


pthread_cond_signal(cond):
    pthread_mutex_lock(cond->mutex); /* 3 */
    cond->value++; /* 4 */
    if (cond->waiter) { /* 5 */
        sleeper = cond->waiter; /* 6 */
        cond->waiter = sleeper->next_cond; /* 7 */
        able_to_run(sleeper); /* 8 */
    }
    pthread_mutex_unlock(cond->mutex); /* 9 */


The effect is that more than one thread can return from its call to pthread_cond_wait() or pthread_cond_timedwait() as a result of one call to pthread_cond_signal(). This effect is called "spurious wakeup". Note that the situation is self-correcting in that the number of threads that are so awakened is finite; for example, the next thread to call pthread_cond_wait() after the sequence of events above blocks.

While this problem could be resolved, the loss of efficiency for a fringe condition that occurs only rarely is unacceptable, especially given that one has to check the predicate associated with a condition variable anyway. Correcting this problem would unnecessarily reduce the degree of concurrency in this basic building block for all higher-level synchronization operations.

An added benefit of allowing spurious wakeups is that applications are forced to code a predicate-testing-loop around the condition wait. This also makes the application tolerate superfluous condition broadcasts or signals on the same condition variable that may be coded in some other part of the application. The resulting applications are thus more robust. Therefore, IEEEĀ StdĀ 1003.1-2001 explicitly documents that spurious wakeups may occur.
--------

Heh.

Perhaps you're just running into side-effects of the scheduler. It seems to work here (gentoo, 2.6.10 kernel). See the code and output below.
(I'm not bragging on the code, it was quick-n-dirty.)

Ah, man, you're code worked on my machine too. So I must be doing something screwy. I guess I should be happy, because that means I can fix the bug. On the other hand, that means I have to fix the bug.

In my experience, fixing bugs in your code is more fun than finding workarounds for bugs in other people's code.

The Fedora Project officially ended support for Fedora Core 1 (FC1) on September 20th, 2004. FC3 was released November 8, 2004. Maybe you should run FC3 (which is current) .vs a release that is now known as "Fedora Legacy". (http://fedora.redhat.com/)

Yeah, that would be nice. But some of the hardware we have in "the machine" currently only has drivers for the 2.4 kernel, and it's kind of late in the game to start porting and testing, and I am way too busy (and LAZY). Probably on the next instrument.

Jim, you The Man, thank you very much for your time and feedback.

No problem. Here's the more complete code (the previous thing was a hack. This could actually be used to do something, though there are still changes that would need to be done if this was part of a long-running program. (I wouldn't accept an exit on an out of memory condition if I was in the code review. Still, this is a mail list, and its unlikely that I'll be working for anyone soon, so perfection isn't required.)

Note that the scheduling behavior is still quite different if you use pthread_cond_signal() .vs pthread_cond_broadcast().

/home/jim> cat thread-pool-server.c
#define _GNU_SOURCE

#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>

/* number of threads used to service requests */
#define NUM_HANDLER_THREADS 3

/* global mutex for our program. note that we use a recursive mutex,
 * since a handlerthread might try to lock it twice consecutively.
 */

pthread_mutex_t request_mutex = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;

/* global condition variable for our program.*/
pthread_cond_t  got_request   = PTHREAD_COND_INITIALIZER;

int num_requests = 0;

/* a single request. */
struct request {
    struct request* next;   /* pointer to next request, NULL if none */
    int number;
};

struct request* requests = NULL; /* head of linked list of requests. */
struct request* last_request = NULL; /* pointer to last request */

/*
 * add a request to the requests list
 *
 * creates a request structure, adds to the list, and
 * increases number of pending requests by one.
 */
void
add_request(int request_num, pthread_mutex_t* p_mutex, pthread_cond_t* p_cond_var)
{
int rc; /* return code of pthreads functions, UNCHECKED */
    struct request* a_request;      /* pointer to newly added request */

    /* create structure with new request */
    a_request = (struct request*)malloc(sizeof(struct request));
    if (!a_request) { /* malloc failed?? */
        fprintf(stderr, "add_request: out of memory\n");
        exit(1);
    }
    a_request->number = request_num;
    a_request->next = NULL;

    /* lock the mutex, to assure exclusive access to the list */
    rc = pthread_mutex_lock(p_mutex);

/* add new request to the end of the list, updating list pointers as required */
    if (num_requests == 0) { /* special case - list is empty */
        requests = a_request;
        last_request = a_request;
    }  else {
        last_request->next = a_request;
        last_request = a_request;
    }

    /* increase total number of pending requests by one. */
    num_requests++;

#ifdef DEBUG
printf("add_request: added request with id '%d'\n", a_request->number);
    fflush(stdout);
#endif /* DEBUG */

    /* unlock mutex */
    rc = pthread_mutex_unlock(p_mutex);

/* signal the condition variable - there's a new request to handle */
#ifdef COND_SIGNAL
    rc = pthread_cond_signal(p_cond_var);
#else
    rc = pthread_cond_broadcast(p_cond_var);
#endif
}

/*
* gets the first pending request from the requests list removing it from the list.
 * the returned request need to be freed by the caller.
 */
struct request*
get_request(pthread_mutex_t* p_mutex)
{
int rc; /* return code of pthreads functions UNCHECKED */
    struct request* a_request;      /* pointer to request */

    /* lock the mutex, to assure exclusive access to the list */
    rc = pthread_mutex_lock(p_mutex);

    if (num_requests > 0) {
        a_request = requests;
        requests = a_request->next;
if (requests == NULL) { /* this was the last request on the list */
            last_request = NULL;
        }
        /* decrease the total number of pending requests */
        num_requests--;
    }  else { /* requests list is empty */
        a_request = NULL;
    }

    /* unlock mutex */
    rc = pthread_mutex_unlock(p_mutex);

    /* return the request to the caller. */
    return a_request;
}

void
handle_request(struct request* a_request, int thread_id)
{
    if (a_request) {
printf("Thread '%d' handled request '%d'\n", thread_id, a_request->number);
        fflush(stdout);
    }
}

/*
 * infinite loop of request handling
 */
void*
handle_requests_loop(void* data)
{
    int rc;                         /* return code UNCHECKED */
    struct request* a_request;      /* pointer to a request */
    int thread_id = *((int*)data);  /* thread id */

#ifdef DEBUG
    printf("Starting thread '%d'\n", thread_id);
    fflush(stdout);
#endif /* DEBUG */

    /* access the requests list exclusively */
    rc = pthread_mutex_lock(&request_mutex);

#ifdef DEBUG
    printf("thread '%d' after pthread_mutex_lock\n", thread_id);
    fflush(stdout);
#endif /* DEBUG */

    while (1) {
#ifdef DEBUG
printf("thread '%d', num_requests = %d\n", thread_id, num_requests);
        fflush(stdout);
#endif /* DEBUG */
        if (num_requests > 0) { /* a request is pending */
            a_request = get_request(&request_mutex);
            if (a_request) { /* got a request - handle and free it */
/* unlock mutex - so other threads would be able to handle
                 * other reqeusts waiting in the queue paralelly
                 */
                rc = pthread_mutex_unlock(&request_mutex);
                handle_request(a_request, thread_id);
                free(a_request);
                /* and lock the mutex again */
                rc = pthread_mutex_lock(&request_mutex);
            }
        } else {
            /* wait for a request to arrive. note the mutex will be
             * unlocked here, thus allowing other threads access to
             * requests list
             */
#ifdef DEBUG
            printf("thread '%d' before pthread_cond_wait\n", thread_id);
            fflush(stdout);
#endif /* DEBUG */
            rc = pthread_cond_wait(&got_request, &request_mutex);
            /* after we return from pthread_cond_wait, the mutex
             * is locked again, so we don't need to lock it ourselves
             */
#ifdef DEBUG
            printf("thread '%d' after pthread_cond_wait\n", thread_id);
            fflush(stdout);
#endif /* DEBUG */
        }
    }
}

int
main(int argc, char* argv[])
{
    int i;                                       /* loop control */
    int thr_id[NUM_HANDLER_THREADS];             /* thread IDs */
pthread_t p_threads[NUM_HANDLER_THREADS]; /* thread's structures */ struct timespec delay; /* used for wasting time */

    /* create the request-handling threads */
    for (i=0; i<NUM_HANDLER_THREADS; i++) {
thr_id[i] = i; /* this should actually store the value returned from pthread_create() for a future pthread_join() */ pthread_create(&p_threads[i], NULL, handle_requests_loop, (void*)&thr_id[i]);
    }

    /* generate requests */
    for (i=0; i<600; i++) {
        add_request(i, &request_mutex, &got_request);
        /* pause execution for a little bit, to allow
         * other threads to run and handle some requests.
         */
        if (rand() > 3*(RAND_MAX/4)) { /* approx 25% of the time */
            delay.tv_sec = 0;
            delay.tv_nsec = 10;
            nanosleep(&delay, NULL);
        }
    }

    /* now wait till there are no more requests to process */
    sleep(5);

    printf("Check ya latter, bra\n");

    return 0;
}

Reply via email to