This patch uses the original fdqueue.[ch] logic to implement the single acceptor,
multiple
worker logic in the threaded MPM. The original fdqueue logic can be found in
apache-apr, but
I have attached a copy to this message.
Ryan
Index: server/mpm/threaded/Makefile.in
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/threaded/Makefile.in,v
retrieving revision 1.1
diff -u -d -b -w -u -r1.1 Makefile.in
--- server/mpm/threaded/Makefile.in 2001/02/16 19:00:23 1.1
+++ server/mpm/threaded/Makefile.in 2001/07/24 15:37:59
@@ -1,5 +1,5 @@
LTLIBRARY_NAME = libthreaded.la
-LTLIBRARY_SOURCES = threaded.c
+LTLIBRARY_SOURCES = threaded.c fdqueue.c
include $(top_srcdir)/build/ltlib.mk
Index: server/mpm/threaded/threaded.c
===================================================================
RCS file: /home/cvs/httpd-2.0/server/mpm/threaded/threaded.c,v
retrieving revision 1.47
diff -u -d -b -w -u -r1.47 threaded.c
--- server/mpm/threaded/threaded.c 2001/07/24 05:19:47 1.47
+++ server/mpm/threaded/threaded.c 2001/07/24 15:37:59
@@ -96,6 +96,7 @@
#include "mpm_common.h"
#include "ap_listen.h"
#include "scoreboard.h"
+#include "fdqueue.h"
#include <signal.h>
#include <limits.h> /* for INT_MAX */
@@ -116,6 +117,7 @@
static int requests_this_child;
static int num_listensocks = 0;
static apr_socket_t **listensocks;
+static FDQueue *worker_queue;
/* The structure used to pass unique initialization info to each thread */
typedef struct {
@@ -520,7 +522,7 @@
apr_lock_release(pipe_of_death_mutex);
}
-static void * worker_thread(apr_thread_t *thd, void * dummy)
+static void *listener_thread(apr_thread_t *thd, void * dummy)
{
proc_info * ti = dummy;
int process_slot = ti->pid;
@@ -546,6 +548,9 @@
for(n=0 ; n <= num_listensocks ; ++n)
apr_poll_socket_add(pollset, listensocks[n], APR_POLLIN);
+ worker_queue = apr_pcalloc(pchild, sizeof(*worker_queue));
+ ap_queue_init(worker_queue, ap_threads_per_child, pchild);
+
/* TODO: Switch to a system where threads reuse the results from earlier
poll calls - manoj */
while (1) {
@@ -554,8 +559,6 @@
}
if (workers_may_exit) break;
- (void) ap_update_child_status(process_slot, thread_slot, SERVER_READY,
- (request_rec *) NULL);
if ((rv = SAFE_ACCEPT(apr_lock_acquire(accept_mutex)))
!= APR_SUCCESS) {
ap_log_error(APLOG_MARK, APLOG_EMERG, rv, ap_server_conf,
@@ -628,8 +631,8 @@
workers_may_exit = 1;
}
if (csd != NULL) {
- process_socket(ptrans, csd, process_slot, thread_slot);
- requests_this_child--;
+ ap_queue_push(worker_queue, csd, ptrans);
+ ap_block_on_queue(worker_queue);
}
}
else {
@@ -642,6 +645,44 @@
}
break;
}
+ }
+
+ apr_pool_destroy(tpool);
+ ap_update_child_status(process_slot, thread_slot, (dying) ? SERVER_DEAD :
+SERVER_GRACEFUL,
+ (request_rec *) NULL);
+ dying = 1;
+ apr_lock_acquire(worker_thread_count_mutex);
+ worker_thread_count--;
+ if (worker_thread_count == 0) {
+ /* All the threads have exited, now finish the shutdown process
+ * by signalling the sigwait thread */
+ kill(ap_my_pid, SIGTERM);
+ }
+ apr_lock_release(worker_thread_count_mutex);
+
+ return NULL;
+}
+
+static void *worker_thread(apr_thread_t *thd, void * dummy)
+{
+ proc_info * ti = dummy;
+ int process_slot = ti->pid;
+ int thread_slot = ti->tid;
+ apr_pool_t *tpool = ti->tpool;
+ apr_socket_t *csd = NULL;
+ apr_pool_t *ptrans; /* Pool for per-transaction stuff */
+ apr_socket_t *sd = NULL;
+ int n;
+ int curr_pollfd, last_pollfd = 0;
+ apr_pollfd_t *pollset;
+ apr_status_t rv;
+
+ free(ti);
+
+ while (!workers_may_exit) {
+ ap_queue_pop(worker_queue, &csd, &ptrans, 1);
+ process_socket(ptrans, csd, process_slot, thread_slot);
+ requests_this_child--;
apr_pool_clear(ptrans);
}
@@ -682,8 +723,15 @@
proc_info *my_info = NULL;
apr_status_t rv;
int threads_created = 0;
+ apr_thread_t *listener;
while (1) {
+ my_info = (proc_info *)malloc(sizeof(proc_info));
+ my_info->pid = my_child_num;
+ my_info->tid = i;
+ my_info->sd = 0;
+ apr_pool_create(&my_info->tpool, pchild);
+ apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
for (i=0; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
_____________________________________________________________________________
Ryan Bloom [EMAIL PROTECTED]
Covalent Technologies [EMAIL PROTECTED]
-----------------------------------------------------------------------------
#ifndef FDQUEUE_H
#define FDQUEUE_H
#include "httpd.h"
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
/* Bleccch. 0 on success always rubbed me the wrong way */
/* All failures are unrecoverable */
#define FD_QUEUE_SUCCESS 0
#define FD_QUEUE_FAILURE -1 /* Needs to be an invalid file descriptor because
of queue_pop semantics */
typedef struct fd_queue_elem {
apr_socket_t *sd;
apr_pool_t *p;
} FDQueueElement;
typedef struct fd_queue {
int head;
int tail;
FDQueueElement *data;
int bounds;
int blanks;
pthread_mutex_t one_big_mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full;
} FDQueue;
int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
int ap_queue_size(FDQueue *queue);
int ap_queue_full(FDQueue *queue);
int ap_block_on_queue(FDQueue *queue);
#endif /* FDQUEUE_H */
#include "fdqueue.h"
#include "apr_pools.h"
/* Assumption: queue itself is allocated by the user */
/* Assumption: increment and decrement are atomic on int */
int ap_queue_size(FDQueue *queue) {
return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
}
int ap_queue_full(FDQueue *queue) {
return(queue->blanks <= 0);
}
int ap_block_on_queue(FDQueue *queue) {
#if 0
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
#endif
if (ap_queue_full(queue)) {
pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
}
#if 0
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
#endif
return FD_QUEUE_SUCCESS;
}
static int increase_blanks(FDQueue *queue) {
queue->blanks++;
return FD_QUEUE_SUCCESS;
}
static apr_status_t ap_queue_destroy(void *data) {
FDQueue *queue = data;
/* Ignore errors here, we can't do anything about them anyway */
pthread_cond_destroy(&queue->not_empty);
pthread_cond_destroy(&queue->not_full);
pthread_mutex_destroy(&queue->one_big_mutex);
return FD_QUEUE_SUCCESS;
}
int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
int i;
int bounds = queue_capacity + 1;
pthread_mutex_init(&queue->one_big_mutex, NULL);
pthread_cond_init(&queue->not_empty, NULL);
pthread_cond_init(&queue->not_full, NULL);
queue->head = queue->tail = 0;
queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
queue->bounds = bounds;
queue->blanks = queue_capacity;
apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
for (i=0; i < bounds; ++i)
queue->data[i].sd = NULL;
return FD_QUEUE_SUCCESS;
}
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
queue->data[queue->tail].sd = sd;
queue->data[queue->tail].p = p;
queue->tail = (queue->tail + 1) % queue->bounds;
queue->blanks--;
pthread_cond_signal(&queue->not_empty);
#if 0
if (queue->head == (queue->tail + 1) % queue->bounds) {
#endif
if (ap_queue_full(queue)) {
pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
}
return FD_QUEUE_SUCCESS;
}
apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
increase_blanks(queue);
/* We have just removed one from the queue. By definition, it is
* no longer full. We can ALWAYS signal the listener thread at
* this point. However, the original code didn't do it this way,
* so I am leaving the original code in, just commented out. BTW,
* originally, the increase_blanks wasn't in this function either.
*
if (queue->blanks > 0) {
*/
pthread_cond_signal(&queue->not_full);
/* } */
if (queue->head == queue->tail) {
if (block_if_empty) {
pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
}
}
*sd = queue->data[queue->head].sd;
*p = queue->data[queue->head].p;
queue->data[queue->head].sd = NULL;
if (*sd != NULL) {
queue->head = (queue->head + 1) % queue->bounds;
}
return APR_SUCCESS;
}