This patch uses the original fdqueue.[ch] logic to implement the single acceptor, 
multiple
worker logic in the threaded MPM.  The original fdqueue logic can be found in 
apache-apr, but
I have attached a copy to this message.

Ryan


Index: server/mpm/threaded/Makefile.in
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/threaded/Makefile.in,v
retrieving revision 1.1
diff -u -d -b -w -u -r1.1 Makefile.in
--- server/mpm/threaded/Makefile.in     2001/02/16 19:00:23     1.1
+++ server/mpm/threaded/Makefile.in     2001/07/24 15:37:59
@@ -1,5 +1,5 @@
 
 LTLIBRARY_NAME    = libthreaded.la
-LTLIBRARY_SOURCES = threaded.c
+LTLIBRARY_SOURCES = threaded.c fdqueue.c
 
 include $(top_srcdir)/build/ltlib.mk
Index: server/mpm/threaded/threaded.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/threaded/threaded.c,v
retrieving revision 1.47
diff -u -d -b -w -u -r1.47 threaded.c
--- server/mpm/threaded/threaded.c      2001/07/24 05:19:47     1.47
+++ server/mpm/threaded/threaded.c      2001/07/24 15:37:59
@@ -96,6 +96,7 @@
 #include "mpm_common.h"
 #include "ap_listen.h"
 #include "scoreboard.h" 
+#include "fdqueue.h"
 
 #include <signal.h>
 #include <limits.h>             /* for INT_MAX */
@@ -116,6 +117,7 @@
 static int requests_this_child;
 static int num_listensocks = 0;
 static apr_socket_t **listensocks;
+static FDQueue *worker_queue;
 
 /* The structure used to pass unique initialization info to each thread */
 typedef struct {
@@ -520,7 +522,7 @@
     apr_lock_release(pipe_of_death_mutex);
 }
 
-static void * worker_thread(apr_thread_t *thd, void * dummy)
+static void *listener_thread(apr_thread_t *thd, void * dummy)
 {
     proc_info * ti = dummy;
     int process_slot = ti->pid;
@@ -546,6 +548,9 @@
     for(n=0 ; n <= num_listensocks ; ++n)
        apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
 
+    worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
+    ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+
     /* TODO: Switch to a system where threads reuse the results from earlier
        poll calls - manoj */
     while (1) {
@@ -554,8 +559,6 @@
         }
         if (workers_may_exit) break;
 
-        (void) ap_update_child_status(process_slot, thread_slot, SERVER_READY, 
-                                      (request_rec *) NULL);
         if ((rv = SAFE_ACCEPT(apr_lock_acquire(accept_mutex)))
             != APR_SUCCESS) {
             ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
@@ -628,8 +631,8 @@
                 workers_may_exit = 1;
             }
             if (csd != NULL) {
-                process_socket(ptrans, csd, process_slot, thread_slot);
-                requests_this_child--;
+                ap_queue_push(worker_queue, csd, ptrans);
+                ap_block_on_queue(worker_queue);
             }
         }
         else {
@@ -642,6 +645,44 @@
             }
             break;
         }
+    }
+
+    apr_pool_destroy(tpool);
+    ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD : 
+SERVER_GRACEFUL,
+        (request_rec *) NULL);
+    dying = 1;
+    apr_lock_acquire(worker_thread_count_mutex);
+    worker_thread_count--;
+    if (worker_thread_count == 0) {
+        /* All the threads have exited, now finish the shutdown process
+         * by signalling the sigwait thread */
+        kill(ap_my_pid, SIGTERM);
+    }
+    apr_lock_release(worker_thread_count_mutex);
+
+    return NULL;
+}
+
+static void *worker_thread(apr_thread_t *thd, void * dummy)
+{
+    proc_info * ti = dummy;
+    int process_slot = ti->pid;
+    int thread_slot = ti->tid;
+    apr_pool_t *tpool = ti->tpool;
+    apr_socket_t *csd = NULL;
+    apr_pool_t *ptrans;                /* Pool for per-transaction stuff */
+    apr_socket_t *sd = NULL;
+    int n;
+    int curr_pollfd, last_pollfd = 0;
+    apr_pollfd_t *pollset;
+    apr_status_t rv;
+
+    free(ti);
+
+    while (!workers_may_exit) {
+        ap_queue_pop(worker_queue, &csd, &ptrans, 1);
+        process_socket(ptrans, csd, process_slot, thread_slot);
+        requests_this_child--;
         apr_pool_clear(ptrans);
     }
 
@@ -682,8 +723,15 @@
     proc_info *my_info = NULL;
     apr_status_t rv;
     int threads_created = 0;
+    apr_thread_t *listener;
 
     while (1) {
+        my_info = (proc_info *)malloc(sizeof(proc_info));
+        my_info->pid = my_child_num;
+        my_info->tid = i;
+        my_info->sd = 0;
+        apr_pool_create(&my_info->tpool, pchild);
+       apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
         for (i=0; i < ap_threads_per_child; i++) {
             int status = ap_scoreboard_image->servers[child_num_arg][i].status;
 


_____________________________________________________________________________
Ryan Bloom                              [EMAIL PROTECTED]
Covalent Technologies                   [EMAIL PROTECTED]
-----------------------------------------------------------------------------
#ifndef FDQUEUE_H
#define FDQUEUE_H
#include "httpd.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>

/* Bleccch. 0 on success always rubbed me the wrong way */
/* All failures are unrecoverable */
#define FD_QUEUE_SUCCESS 0
#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
                               of queue_pop semantics */

typedef struct fd_queue_elem {
    apr_socket_t *sd;
    apr_pool_t *p;
} FDQueueElement;

typedef struct fd_queue {
    int head;
    int tail;
    FDQueueElement *data;
    int bounds;
    int blanks;
    pthread_mutex_t one_big_mutex;
    pthread_cond_t not_empty;
    pthread_cond_t not_full;
} FDQueue;

int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
int ap_queue_size(FDQueue *queue);
int ap_queue_full(FDQueue *queue);
int ap_block_on_queue(FDQueue *queue);

#endif /* FDQUEUE_H */
#include "fdqueue.h"
#include "apr_pools.h"

/* Assumption: queue itself is allocated by the user */
/* Assumption: increment and decrement are atomic on int */

int ap_queue_size(FDQueue *queue) {
    return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
}

int ap_queue_full(FDQueue *queue) {
    return(queue->blanks <= 0);
}

int ap_block_on_queue(FDQueue *queue) {
#if 0
    if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
#endif
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
    }
#if 0
    if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
        return FD_QUEUE_FAILURE;
    }
#endif
    return FD_QUEUE_SUCCESS;
}

static int increase_blanks(FDQueue *queue) {
    queue->blanks++;
    return FD_QUEUE_SUCCESS;
}

static apr_status_t ap_queue_destroy(void *data) {
    FDQueue *queue = data;
    /* Ignore errors here, we can't do anything about them anyway */
    pthread_cond_destroy(&queue->not_empty);
    pthread_cond_destroy(&queue->not_full);
    pthread_mutex_destroy(&queue->one_big_mutex);
    return FD_QUEUE_SUCCESS;
}

int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
    int i;
    int bounds = queue_capacity + 1;
    pthread_mutex_init(&queue->one_big_mutex, NULL);
    pthread_cond_init(&queue->not_empty, NULL);
    pthread_cond_init(&queue->not_full, NULL);
    queue->head = queue->tail = 0;
    queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
    queue->bounds = bounds;
    queue->blanks = queue_capacity;
    apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
    for (i=0; i < bounds; ++i)
        queue->data[i].sd = NULL;
    return FD_QUEUE_SUCCESS;
}

int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
    queue->data[queue->tail].sd = sd;
    queue->data[queue->tail].p  = p;
    queue->tail = (queue->tail + 1) % queue->bounds;
    queue->blanks--;
    pthread_cond_signal(&queue->not_empty);
#if 0
    if (queue->head == (queue->tail + 1) % queue->bounds) {
#endif
    if (ap_queue_full(queue)) {
        pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
    }
    return FD_QUEUE_SUCCESS;
}

apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
    increase_blanks(queue);
    /* We have just removed one from the queue.  By definition, it is
     * no longer full.  We can ALWAYS signal the listener thread at
     * this point.  However, the original code didn't do it this way,
     * so I am leaving the original code in, just commented out.  BTW,
     * originally, the increase_blanks wasn't in this function either.
     *
     if (queue->blanks > 0) {
     */
    pthread_cond_signal(&queue->not_full);

    /*    }    */
    if (queue->head == queue->tail) {
        if (block_if_empty) {
            pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
        }
    } 
    
    *sd = queue->data[queue->head].sd;
    *p  = queue->data[queue->head].p;
    queue->data[queue->head].sd = NULL;
    if (*sd != NULL) {
        queue->head = (queue->head + 1) % queue->bounds;
    }
    return APR_SUCCESS;
}

Reply via email to