mturk 2005/04/19 09:32:11 Modified: jni/java/org/apache/tomcat/jni Poll.java Status.java jni/native/src error.c poll.c Log: Add maintain for Poll for polling timed out sockets. Remove thread safety flags, because this is responsibility of Java client. Revision Changes Path 1.8 +22 -2 jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java Index: Poll.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- Poll.java 18 Apr 2005 15:24:01 -0000 1.7 +++ Poll.java 19 Apr 2005 16:32:11 -0000 1.8 @@ -104,11 +104,31 @@ * descriptors[n + 2] -> client data * descriptors[n + 2] -> reserved * </PRE> + * @param remove Remove signaled descriptors from pollset * @return Number of signalled descriptors (output parameter) * or negative APR error code. */ public static native int poll(long pollset, long timeout, - long [] descriptors); + long [] descriptors, boolean remove); + + /** + * Maintain on the descriptor(s) in a pollset + * @param pollset The pollset to use + * @param descriptors Array of signalled descriptors (output parameter) + * The desctiptor array must be four times the size of pollset. + * and are populated as follows: + * <PRE> + * descriptors[n + 0] -> returned events + * descriptors[n + 1] -> socket + * descriptors[n + 2] -> client data + * descriptors[n + 2] -> reserved + * </PRE> + * @param remove Remove signaled descriptors from pollset + * @return Number of signalled descriptors (output parameter) + * or negative APR error code. + */ + public static native int maintain(long pollset, long [] descriptors, + boolean remove); /** * Set the socket time to live. 1.6 +2 -1 jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Status.java Index: Status.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Status.java,v retrieving revision 1.5 retrieving revision 1.6 diff -u -r1.5 -r1.6 --- Status.java 15 Apr 2005 17:21:23 -0000 1.5 +++ Status.java 19 Apr 2005 16:32:11 -0000 1.6 @@ -253,5 +253,6 @@ public static final boolean APR_STATUS_IS_EINPROGRESS(int s) { return is(s, 94); } public static final boolean APR_STATUS_IS_EINTR(int s) { return is(s, 95); } public static final boolean APR_STATUS_IS_ENOTSOCK(int s) { return is(s, 96); } + public static final boolean APR_STATUS_IS_EINVAL(int s) { return is(s, 97); } } 1.5 +1 -0 jakarta-tomcat-connectors/jni/native/src/error.c Index: error.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/error.c,v retrieving revision 1.4 retrieving revision 1.5 diff -u -r1.4 -r1.5 --- error.c 15 Apr 2005 17:21:23 -0000 1.4 +++ error.c 19 Apr 2005 16:32:11 -0000 1.5 @@ -186,6 +186,7 @@ APR_IS(94, APR_STATUS_IS_EINPROGRESS); APR_IS(95, APR_STATUS_IS_EINTR); APR_IS(96, APR_STATUS_IS_ENOTSOCK); + APR_IS(97, APR_STATUS_IS_EINVAL); } return JNI_FALSE; } 1.10 +71 -107 jakarta-tomcat-connectors/jni/native/src/poll.c Index: poll.c =================================================================== RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v retrieving revision 1.9 retrieving revision 1.10 diff -u -r1.9 -r1.10 --- poll.c 18 Apr 2005 15:43:35 -0000 1.9 +++ poll.c 19 Apr 2005 16:32:11 -0000 1.10 @@ -16,7 +16,6 @@ #include "apr.h" #include "apr_pools.h" #include "apr_poll.h" -#include "apr_thread_mutex.h" #include "tcn.h" /* Internal poll structure for queryset @@ -25,13 +24,10 @@ typedef struct tcn_pollset { apr_pool_t *pool; apr_int32_t nelts; - apr_int32_t nadds; apr_int32_t nalloc; apr_pollset_t *pollset; - apr_thread_mutex_t *mutex; - apr_pollfd_t *query_set; - apr_pollfd_t *query_add; - apr_time_t *query_ttl; + apr_pollfd_t *socket_set; + apr_interval_time_t *socket_ttl; apr_interval_time_t max_ttl; } tcn_pollset_t; @@ -42,14 +38,10 @@ apr_pool_t *p = J2P(pool, apr_pool_t *); apr_pollset_t *pollset = NULL; tcn_pollset_t *tps = NULL; - apr_thread_mutex_t *mutex = NULL; apr_uint32_t f = (apr_uint32_t)flags; UNREFERENCED(o); TCN_ASSERT(pool != 0); - TCN_THROW_IF_ERR(apr_thread_mutex_create(&mutex, - APR_THREAD_MUTEX_DEFAULT, p), mutex); - if (f & APR_POLLSET_THREADSAFE) { apr_status_t rv = apr_pollset_create(&pollset, (apr_uint32_t)size, p, f); if (rv == APR_ENOTIMPL) @@ -63,23 +55,16 @@ TCN_THROW_IF_ERR(apr_pollset_create(&pollset, (apr_uint32_t)size, p, f), pollset); } - tps = apr_palloc(p, sizeof(tcn_pollset_t)); tps->pollset = pollset; - tps->mutex = mutex; - tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); - tps->query_add = apr_palloc(p, size * sizeof(apr_pollfd_t)); - tps->query_ttl = apr_palloc(p, size * sizeof(apr_time_t)); + tps->socket_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); + tps->socket_ttl = apr_palloc(p, size * sizeof(apr_interval_time_t)); tps->nelts = 0; - tps->nadds = 0; tps->nalloc = size; tps->pool = p; tps->max_ttl = J2T(ttl); - return P2J(tps); cleanup: - if (mutex) - apr_thread_mutex_destroy(mutex); - return 0; + return P2J(tps); } TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset) @@ -88,7 +73,6 @@ UNREFERENCED_STDARGS; TCN_ASSERT(pollset != 0); - apr_thread_mutex_destroy(p->mutex); return (jint)apr_pollset_destroy(p->pollset); } @@ -98,124 +82,80 @@ { tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); apr_pollfd_t fd; - apr_status_t rv; UNREFERENCED_STDARGS; TCN_ASSERT(socket != 0); - if (p->nadds == p->nalloc) + if (p->nelts == p->nalloc) return APR_ENOMEM; - if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) - return rv; + memset(&fd, 0, sizeof(apr_pollfd_t)); fd.desc_type = APR_POLL_SOCKET; fd.reqevents = (apr_int16_t)reqevents; fd.desc.s = J2P(socket, apr_socket_t *); fd.client_data = J2P(data, void *); - p->query_add[p->nadds] = fd; - p->nadds++; - apr_thread_mutex_unlock(p->mutex); - return (jint)rv; + p->socket_set[p->nelts++] = fd; + return (jint)apr_pollset_add(p->pollset, &fd); } -TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset, - jlong socket) +static apr_status_t do_remove(tcn_pollset_t *p, const apr_pollfd_t *fd) { - tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); - apr_pollfd_t fd; apr_int32_t i; - apr_status_t rv; - UNREFERENCED_STDARGS; - TCN_ASSERT(socket != 0); - - memset(&fd, 0, sizeof(apr_pollfd_t)); - fd.desc_type = APR_POLL_SOCKET; - fd.desc.s = J2P(socket, apr_socket_t *); - - if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) - return (jint)rv; for (i = 0; i < p->nelts; i++) { - if (fd.desc.s == p->query_set[i].desc.s) { + if (fd->desc.s == p->socket_set[i].desc.s) { /* Found an instance of the fd: remove this and any other copies */ apr_int32_t dst = i; apr_int32_t old_nelts = p->nelts; p->nelts--; for (i++; i < old_nelts; i++) { - if (fd.desc.s == p->query_set[i].desc.s) { + if (fd->desc.s == p->socket_set[i].desc.s) { p->nelts--; } else { - p->query_set[dst] = p->query_set[i]; - dst++; - } - } - break; - } - } - /* Remove from add queue if present - * This is unlikely to happen, but do it anyway. - */ - for (i = 0; i < p->nadds; i++) { - if (fd.desc.s == p->query_add[i].desc.s) { - /* Found an instance of the fd: remove this and any other copies */ - apr_int32_t dst = i; - apr_int32_t old_nelts = p->nadds; - p->nadds--; - for (i++; i < old_nelts; i++) { - if (fd.desc.s == p->query_add[i].desc.s) { - p->nadds--; - } - else { - p->query_add[dst] = p->query_add[i]; + p->socket_set[dst] = p->socket_set[i]; dst++; } } break; } } + return apr_pollset_remove(p->pollset, fd); +} + +TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset, + jlong socket) +{ + tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); + apr_pollfd_t fd; - rv = apr_pollset_remove(p->pollset, &fd); - apr_thread_mutex_unlock(p->mutex); - return (jint)rv; + UNREFERENCED_STDARGS; + TCN_ASSERT(socket != 0); + + memset(&fd, 0, sizeof(apr_pollfd_t)); + fd.desc_type = APR_POLL_SOCKET; + fd.desc.s = J2P(socket, apr_socket_t *); + + return (jint)do_remove(p, &fd); } + TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset, - jlong timeout, jlongArray set) + jlong timeout, jlongArray set, + jboolean remove) { const apr_pollfd_t *fd = NULL; tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); jlong *pset = (*e)->GetLongArrayElements(e, set, NULL); - apr_int32_t n, i = 0, num = 0; + apr_int32_t i, num = 0; apr_status_t rv = APR_SUCCESS; UNREFERENCED(o); TCN_ASSERT(pollset != 0); - if ((rv = apr_thread_mutex_lock(p->mutex)) != APR_SUCCESS) - return (jint)(-rv); - /* Add what is present in add queue */ - for (n = 0; n < p->nadds; n++) { - apr_pollfd_t pf = p->query_add[n]; - if (p->nelts == p->nalloc) { - /* In case the add queue is too large - * skip adding to pollset - */ - break; - } - if ((rv = apr_pollset_add(p->pollset, &pf)) != APR_SUCCESS) - break; - p->query_ttl[p->nelts] = apr_time_now(); - p->query_set[p->nelts] = pf; - p->nelts++; - } - p->nadds = 0; - apr_thread_mutex_unlock(p->mutex); if (rv != APR_SUCCESS) return (jint)(-rv); - rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd); - apr_thread_mutex_lock(p->mutex); - if (rv != APR_SUCCESS) + if (apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd) != APR_SUCCESS) num = 0; if (num > 0) { @@ -223,33 +163,57 @@ pset[i*4+0] = (jlong)(fd->rtnevents); pset[i*4+1] = P2J(fd->desc.s); pset[i*4+2] = P2J(fd->client_data); + if (remove) + do_remove(p, fd); fd ++; } + (*e)->ReleaseLongArrayElements(e, set, pset, 0); } - /* In any case check for timeout sockets */ + else + (*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT); + + return (jint)num; +} + +TCN_IMPLEMENT_CALL(jint, Poll, maintain)(TCN_STDARGS, jlong pollset, + jlongArray set, jboolean remove) +{ + tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *); + jlong *pset = (*e)->GetLongArrayElements(e, set, NULL); + apr_int32_t i = 0, num = 0; + apr_time_t now = apr_time_now(); + apr_pollfd_t fd; + + UNREFERENCED(o); + TCN_ASSERT(pollset != 0); + + /* Check for timeout sockets */ if (p->max_ttl > 0) { - apr_time_t now = apr_time_now(); - /* TODO: Add thread mutex protection - * or make sure the Java part is synchronized. - */ - for (n = 0; n < p->nelts; n++) { - if ((now - p->query_ttl[n]) > p->max_ttl) { - p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN; + for (i = 0; i < p->nelts; i++) { + if ((now - p->socket_ttl[i]) > p->max_ttl) { + p->socket_set[i].rtnevents = APR_POLLHUP | APR_POLLIN; if (num < p->nelts) { - pset[num*4+0] = (jlong)(p->query_set[n].rtnevents); - pset[num*4+1] = P2J(p->query_set[n].desc.s); - pset[num*4+2] = P2J(p->query_set[n].client_data); + fd = p->socket_set[i]; + pset[num*4+0] = (jlong)(fd.rtnevents); + pset[num*4+1] = P2J(fd.desc.s); + pset[num*4+2] = P2J(fd.client_data); num++; } } } + if (remove && num) { + memset(&fd, 0, sizeof(apr_pollfd_t)); + for (i = 0; i < num; i++) { + fd.desc_type = APR_POLL_SOCKET; + fd.desc.s = (apr_socket_t *)pset[i*4+1]; + do_remove(p, &fd); + } + } } - apr_thread_mutex_unlock(p->mutex); if (num) (*e)->ReleaseLongArrayElements(e, set, pset, 0); else (*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT); - return (jint)num; }
--------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]