Hello,

there seems to be a scheduler bug (or a bug in my understanding
of the documentation) related to function events.

I attach a small test program (I know I should rather post an
HTTP location, but it's only 10k) that uses the various event
types signal an event from one thread to another. One method is
to have a global variable and a function event that checks the
value of the variable. However, when you try this on a Linux sys-
tem, the scheduler does no longer schedule the other threads. On
Solaris 8, other threads are scheduled, but the check function of
the function event is only called after a pth_sleep in some other
thread completes.

To see for yourself what is happening, compile the program (you
may want to modify the linker flags in the acompanying Makefile
to match your installation), and compare the output of the com-
mands

Does anybody know how to fix this behaviour (or how to correctly
use function events...)?

Maybe the function event stuff is just not as thoroughly tested
as it should be, not too surprising as the test directory does
not contain any program testing function events. Maybe
test_signaling could be useful for this?

Mit herzlichem Gruss

Andreas Mueller

------------------------------------------------------------
Dr. Andreas Mueller Beratung und Entwicklung
Bubental 53, CH - 8852 Altendorf <[EMAIL PROTECTED]>
Voice: +41 55 462 1483 Fax/Data: +41 55 462 1485
------------------------------------------------------------
/*
 * test_signaling.c -- pth signaling event test program
 *
 * call with the names of the signaling facilities to check as arguments:
 *
 *     ./test_signaling [ -i iterations ] { signal | fd | func | 1 | 2 | 3 }
 *
 * (c) 2000 Dr. Andreas Mueller, Beratung und Entwicklung
 *     Software distributed under the GNU General Public License
 */
#include <stdlib.h>
#include <stdio.h>
#include <pth.h>
#include <unistd.h>

int     iterations = 5;

/************************************************************************
* global definitions                                                    *
************************************************************************/
typedef struct sigmech_t {
        int             sent, received;
        int             (*init)(struct sigmech_t *);
        int             (*send)(struct sigmech_t *);
        int             (*recv)(struct sigmech_t *);
        pth_event_t     (*getev)(struct sigmech_t *);
        volatile void   *private;
} sigmech_t;

/************************************************************************
* signal event signalization                                            *
*                                                                       *
* a thread waits for a signal to be delivered, which some other thread  *
* sends                                                                 *
************************************************************************/
int     retsignal;
int     sig_signal_init(sigmech_t *sm) {
        sigset_t        set, oset;
        
        sm->sent = sm->received = 0;
        sm->private = &retsignal;
        sigemptyset(&set);
        sigaddset(&set, SIGUSR1);
        sigprocmask(SIG_BLOCK, &set, &oset);
        return 0;
}

int     sig_signal_send(sigmech_t *sm) {
        kill(getpid(), SIGUSR1);
        return 0;
}

int     sig_signal_recv(sigmech_t *sm) {
        return 0;
}

pth_event_t     sig_signal_getev(sigmech_t *sm) {
        sigset_t        set;
        sigemptyset(&set);
        sigaddset(&set, SIGUSR1);
        printf("\ncreating signal event"); fflush(stdout);
        return pth_event(PTH_EVENT_SIGS, &set, (int *)sm->private);
}

/************************************************************************
* file descriptor event signalization                                   *
*                                                                       *
* a thread waits for the read end of a pipe to become readable, some    *
* other thread writes into the write end of the pipe                    *
************************************************************************/
int     fd[2];
int     sig_fd_init(sigmech_t *sm) {
        sm->sent = sm->received = 0;
        pipe(fd);
        sm->private = fd;
        return 0;
}

int     sig_fd_send(sigmech_t *sm) {
        int     *fd = (int *)sm->private;
        write(fd[1], &sm->sent, sizeof(int));
        return 0;
}

int     sig_fd_recv(sigmech_t *sm) {
        char    buffer[10];
        int     *fd = (int *)sm->private;
        read(fd[0], buffer, sizeof(int));
        return 0;
}

pth_event_t     sig_fd_getev(sigmech_t *sm) {
        int     *fd = (int *)sm->private;
        printf("\ncreating fd event"); fflush(stdout);
        return pth_event(PTH_EVENT_FD|PTH_UNTIL_FD_READABLE, fd[0]);
}

/************************************************************************
* function event signalization                                          *
*                                                                       *
* one thread sets a flag to 1, some other thread is waiting for a       *
* function event, where the function checks whether the flag is set     *
* causes the scheduler to malfunction in pth 1.3.5 under Linux 2.2.13   *
************************************************************************/
volatile int    flag = 0;
int     delay = 100000;

int     flagcheck(sigmech_t *sm) {
        int     *flagp = (int *)sm->private;
        int     rc = *flagp;
        printf((rc) ? "\n+\n" : "."); fflush(stdout);
        *flagp = 0;
        return rc;
}

int     sig_func_init(sigmech_t *sm) {
        volatile int    *flagp = &flag;
        sm->private = flagp;
        *flagp = 0;
        sm->sent = sm->received = 0;
        return 0;
}

int     sig_func_send(sigmech_t *sm) {
        int     *flagp = (int *)sm->private;
        int     rc = *flagp;
        *flagp = 1;
        return rc;
}

int     sig_func_recv(sigmech_t *sm) {
        return 0;
}

pth_event_t     sig_func_getev(sigmech_t *sm) {
        printf("\ncreating function event"); fflush(stdout);
        return pth_event(PTH_EVENT_FUNC, flagcheck, sm, pth_time(0, delay));
}

/************************************************************************
* method table                                                          *
************************************************************************/
sigmech_t       mechtab[3] = {
        {
                0, 0, sig_func_init, sig_func_send, sig_func_recv,
                sig_func_getev, NULL
        },
        {
                0, 0, sig_fd_init, sig_fd_send, sig_fd_recv, sig_fd_getev, NULL
        },
        {
                0, 0, sig_signal_init, sig_signal_send, sig_signal_recv,
                sig_signal_getev, NULL
        },
};

/************************************************************************
* thread procedures                                                     *
************************************************************************/

void    *thread0(sigmech_t *sm) {
        pth_event_t     ev;
        struct timeval  tv;

        printf("\n0: starting receiver thread"); fflush(stdout);
        while (sm->received < iterations) {
                printf("\n0: creating event for flagcheck ");
                fflush(stdout);
                ev = sm->getev(sm);
                printf("\n0: waiting for event "); fflush(stdout);
                pth_wait(ev);
                gettimeofday(&tv, NULL);
                printf("\n0: event occured at %ld.%06ld ", tv.tv_sec,
                        tv.tv_usec);
                sm->recv(sm);
                sm->received++;
                fflush(stdout);
                pth_event_free(ev, PTH_FREE_ALL);
        }
        return NULL;
}

void    *thread1(sigmech_t *sm) {
        struct timeval  tv;

        printf("\n1: starting sender thread"); fflush(stdout);
        while (sm->sent < iterations) {
                printf("\n1: sleeping for 1 second "); fflush(stdout);
                pth_sleep(1);
                gettimeofday(&tv, NULL);
                printf("\n1: setting flag at %ld.%06ld ", tv.tv_sec,
                        tv.tv_usec);
                fflush(stdout);
                sm->send(sm);
                sm->sent++;
                printf("\n1: sleeping for 3 seconds "); fflush(stdout);
                pth_sleep(3);
        }
        return NULL;
}

void    *(*threadentries[2])(sigmech_t *) = { thread0, thread1 };
typedef char    threadname_t[16];

/************************************************************************
* test body                                                             *
************************************************************************/

int     testbody(sigmech_t *sm) {
        pth_attr_t      attr[2];
        pth_t           th[2];
        pth_event_t     ev[2], cev;
        int             nthreads = 2, i;
        void            *result;
        threadname_t    threadname[2];
        int             running[2];

        sm->init(sm);

        for (i = 1; i >= 0; i--) {
                attr[i] = pth_attr_new();
                sprintf(threadname[i], "thread%d", i);
                pth_attr_set(attr[i], PTH_ATTR_NAME, threadname[i]);
                pth_attr_set(attr[i], PTH_ATTR_STACK_SIZE, 128 * 1024);
                printf("\nm: spawning thread %d ", i); fflush(stdout);
                th[i] = pth_spawn(attr[i], (void *(*)(void *))threadentries[i],
                        sm);
                ev[i] = pth_event(PTH_EVENT_TID | PTH_UNTIL_TID_DEAD, th[i]);
                running[i] = 1;
        }

        cev = pth_event_concat(ev[0], ev[1], NULL);
        while (nthreads) {
                for (i = 0; i < 2; i++)
                        if (running[i]) ev[i] = pth_event(PTH_EVENT_TID |
                                        PTH_UNTIL_TID_DEAD, th[i]);
                if ((running[0]) && (running[1]))
                        cev = pth_event_concat(ev[0], ev[1], NULL);
                else {
                        if (running[0])
                                cev = ev[0];
                        else
                                cev = ev[1];
                }
                printf("\nm: waiting for any thread to terminate ");
                if (pth_wait(cev) != TRUE) {
                        perror("pth_wait");
                }
                fflush(stdout);
                for (i = 0; i < 2; i++) {
                        if (running[i]) {
                                if (pth_event_occurred(ev[i])) {
                                        pth_join(th[i], &result);
                                        printf("\nm: thread %d returns pointer "
                                                "%p ", i, result);
                                        fflush(stdout);
                                        nthreads--;
                                        running[i] = 0;
                                        pth_event_free(cev, PTH_FREE_ALL);
                                }
                        }
                }
        }
        printf("\nm: both threads terminated\n"); fflush(stdout);
        return 0;
}

/************************************************************************
* main function                                                         *
************************************************************************/

char    *mechname[3] = {
        "func", "fd", "signal"
};

int     main(int argc, char *argv[]) {
        int     i, j, c, rc, mechindex;
        pth_init();
        while ((c = getopt(argc, argv, "i:")) != EOF)
                switch (c) {
                case 'i':
                        iterations = atoi(optarg);
                        break;
                }

        for (i = optind; i < argc; i++) {
                mechindex = -1;
                printf("\n==> testing mechanism %s\n", argv[i]);
                for (j = 0; j < 3; j++) {
                        if (strcmp(argv[i], mechname[j]) == 0) {
                                mechindex = j;
                        }
                }
                if (mechindex < 0)
                        if (atoi(argv[i]) > 0) 
                                mechindex = atoi(argv[i]) - 1;
                if (mechindex >= 0) {
                        rc = testbody(&mechtab[mechindex]);
                        if (rc == 0) {
                                printf("==> test of mechanism %s succeeded\n",
                                        argv[i]);
                        } else {
                                printf("!!! test of mechanism %s FAILED\n",
                                        argv[i]);
                                exit(EXIT_FAILURE);
                        }
                }
        }
        exit(EXIT_SUCCESS);
}
#
# compile and link the test_signaling pth program, which is used to test
# and/or demonstrate the various signaling facilities that can be used to
# to synchronize two threads.
#
# For Pth 1.3.6, test_signaling exhibits a scheduler bug where the
# scheduler stops scheduling other threads as soon as a PTH_EVENT_FUNC
# event is waited upon by some other thread
#
# (c) 2000 Dr. Andreas Mueller, Beratung und Entwicklung
#     Software distributed under the GNU General Public License
#
all:    test_signaling

test_signaling.o:       test_signaling.c
        gcc -Wall -pedantic -g -O2 -c test_signaling.c -o test_signaling.o
test_signaling: test_signaling.o
        gcc -o test_signaling test_signaling.o -lpth

Reply via email to