mturk 2005/04/18 05:33:42
Modified: jni/native/src poll.c
Log:
Make poll threadsafe and with add queue that is added to pollset
before the actuall poll is called.
Revision Changes Path
1.6 +101 -20 jakarta-tomcat-connectors/jni/native/src/poll.c
Index: poll.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v
retrieving revision 1.5
retrieving revision 1.6
diff -u -r1.5 -r1.6
--- poll.c 15 Apr 2005 10:14:46 -0000 1.5
+++ poll.c 18 Apr 2005 12:33:42 -0000 1.6
@@ -16,6 +16,7 @@
#include "apr.h"
#include "apr_pools.h"
#include "apr_poll.h"
+#include "apr_thread_mutex.h"
#include "tcn.h"
/* Internal poll structure for queryset
@@ -24,10 +25,13 @@
typedef struct tcn_pollset {
apr_pool_t *pool;
apr_int32_t nelts;
+ apr_int32_t nadds;
apr_int32_t nalloc;
apr_pollset_t *pollset;
+ apr_thread_mutex_t *mutex;
apr_pollfd_t *query_set;
- apr_time_t *query_add;
+ apr_pollfd_t *query_add;
+ apr_time_t *query_ttl;
apr_interval_time_t max_ttl;
} tcn_pollset_t;
@@ -38,24 +42,44 @@
apr_pool_t *p = J2P(pool, apr_pool_t *);
apr_pollset_t *pollset = NULL;
tcn_pollset_t *tps = NULL;
+ apr_thread_mutex_t *mutex = NULL;
+ apr_uint32_t f = (apr_uint32_t)flags;
UNREFERENCED(o);
+ TCN_ASSERT(pool != 0);
- TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
- (apr_uint32_t)size, p, (apr_uint32_t)flags),
- pollset);
+ TCN_THROW_IF_ERR(apr_thread_mutex_create(&mutex,
+ APR_THREAD_MUTEX_DEFAULT, p), mutex);
+
+ if (f & APR_POLLSET_THREADSAFE) {
+ apr_status_t rv = apr_pollset_create(&pollset, (apr_uint32_t)size,
p, f);
+ if (rv == APR_ENOTIMPL)
+ f &= ~APR_POLLSET_THREADSAFE;
+ else if (rv != APR_SUCCESS) {
+ tcn_ThrowAPRException(e, rv);
+ goto cleanup;
+ }
+ }
+ if (pollset == NULL) {
+ TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
+ (apr_uint32_t)size, p, f), pollset);
+ }
tps = apr_palloc(p, sizeof(tcn_pollset_t));
tps->pollset = pollset;
+ tps->mutex = mutex;
tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
- tps->query_add = apr_palloc(p, size * sizeof(apr_time_t));
+ tps->query_add = apr_palloc(p, size * sizeof(apr_pollfd_t));
+ tps->query_ttl = apr_palloc(p, size * sizeof(apr_time_t));
tps->nelts = 0;
+ tps->nadds = 0;
tps->nalloc = size;
tps->pool = p;
tps->max_ttl = J2T(ttl);
-
-cleanup:
return P2J(tps);
-
+cleanup:
+ if (mutex)
+ apr_thread_mutex_destroy(mutex);
+ return 0;
}
TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset)
@@ -63,6 +87,8 @@
tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
UNREFERENCED_STDARGS;
+ TCN_ASSERT(pollset != 0);
+ apr_thread_mutex_destroy(p->mutex);
return (jint)apr_pollset_destroy(p->pollset);
}
@@ -75,20 +101,20 @@
apr_status_t rv;
UNREFERENCED_STDARGS;
- if (p->nelts == p->nalloc) {
- return APR_ENOMEM;
- }
+ TCN_ASSERT(socket != 0);
+ if (p->nadds == p->nalloc)
+ return APR_ENOMEM;
+ if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
+ return rv;
memset(&fd, 0, sizeof(apr_pollfd_t));
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = (apr_int16_t)reqevents;
fd.desc.s = J2P(socket, apr_socket_t *);
fd.client_data = J2P(data, void *);
- if ((rv = apr_pollset_add(p->pollset, &fd)) == APR_SUCCESS) {
- p->query_set[p->nelts] = fd;
- p->query_add[p->nelts] = apr_time_now();
- p->nelts++;
- }
+ p->query_add[p->nadds] = fd;
+ p->nadds++;
+ apr_thread_mutex_unlock(p->mutex);
return (jint)rv;
}
@@ -98,13 +124,17 @@
tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
apr_pollfd_t fd;
apr_int32_t i;
+ apr_status_t rv;
UNREFERENCED_STDARGS;
+ TCN_ASSERT(socket != 0);
memset(&fd, 0, sizeof(apr_pollfd_t));
fd.desc_type = APR_POLL_SOCKET;
fd.desc.s = J2P(socket, apr_socket_t *);
+ if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
+ return (jint)rv;
for (i = 0; i < p->nelts; i++) {
if (fd.desc.s == p->query_set[i].desc.s) {
/* Found an instance of the fd: remove this and any other copies
*/
@@ -123,8 +153,31 @@
break;
}
}
+ /* Remove from add queue if present
+ * This is unlikely to happen, but do it anyway.
+ */
+ for (i = 0; i < p->nadds; i++) {
+ if (fd.desc.s == p->query_add[i].desc.s) {
+ /* Found an instance of the fd: remove this and any other copies
*/
+ apr_int32_t dst = i;
+ apr_int32_t old_nelts = p->nadds;
+ p->nadds--;
+ for (i++; i < old_nelts; i++) {
+ if (fd.desc.s == p->query_add[i].desc.s) {
+ p->nadds--;
+ }
+ else {
+ p->query_add[dst] = p->query_add[i];
+ dst++;
+ }
+ }
+ break;
+ }
+ }
- return (jint)apr_pollset_remove(p->pollset, &fd);
+ rv = apr_pollset_remove(p->pollset, &fd);
+ apr_thread_mutex_unlock(p->mutex);
+ return (jint)rv;
}
TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset,
@@ -134,12 +187,35 @@
tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
apr_int32_t n, i = 0, num = 0;
- apr_status_t rv;
+ apr_status_t rv = APR_SUCCESS;
UNREFERENCED(o);
+ TCN_ASSERT(pollset != 0);
+
+ if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS)
+ return (jint)(-rv);
+ /* Add what is present in add queue */
+ for (n = 0; n < p->nadds; n++) {
+ apr_pollfd_t pf = p->query_add[n];
+ if (p->nelts == p->nalloc) {
+ rv = APR_ENOMEM;
+ break;
+ }
+ if ((rv = apr_pollset_add(p->pollset, &pf)) != APR_SUCCESS)
+ break;
+ p->query_ttl[p->nelts] = apr_time_now();
+ p->query_set[p->nelts] = pf;
+ p->nelts++;
+ }
+ p->nadds = 0;
+ apr_thread_mutex_unlock(p->mutex);
+ if (rv != APR_SUCCESS)
+ return (jint)(-rv);
rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd);
+ if (rv != APR_SUCCESS)
+ num = 0;
- if (rv == APR_SUCCESS && num > 0) {
+ if (num > 0) {
for (i = 0; i < num; i++) {
pset[i] = P2J(fd);
fd ++;
@@ -151,8 +227,9 @@
/* TODO: Add thread mutex protection
* or make sure the Java part is synchronized.
*/
+ apr_thread_mutex_lock(p->mutex);
for (n = 0; n < p->nelts; n++) {
- if ((now - p->query_add[n]) > p->max_ttl) {
+ if ((now - p->query_ttl[n]) > p->max_ttl) {
p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN;
if (i < p->nelts) {
pset[i++] = P2J(&(p->query_set[n]));
@@ -160,6 +237,7 @@
}
}
}
+ apr_thread_mutex_unlock(p->mutex);
}
if (num)
(*e)->ReleaseLongArrayElements(e, set, pset, 0);
@@ -173,6 +251,7 @@
{
apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *);
UNREFERENCED_STDARGS;
+ TCN_ASSERT(pollfd != 0);
return P2J(fd->desc.s);
}
@@ -180,6 +259,7 @@
{
apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *);
UNREFERENCED_STDARGS;
+ TCN_ASSERT(pollfd != 0);
return P2J(fd->client_data);
}
@@ -187,6 +267,7 @@
{
apr_pollfd_t *fd = J2P(pollfd, apr_pollfd_t *);
UNREFERENCED_STDARGS;
+ TCN_ASSERT(pollfd != 0);
return (jint)fd->rtnevents;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]