mturk 2005/04/13 23:47:56
Modified: jni/java/org/apache/tomcat/jni Poll.java
jni/native/src network.c poll.c
Log:
Add time to live to socket Poller, so we can maintain resources.
The returned descriptor will be returned with
APR_POOLIN + APR_POLLHUP in case the recycle is needed.
Revision Changes Path
1.5 +3 -3
jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java
Index: Poll.java
===================================================================
RCS file:
/home/cvs/jakarta-tomcat-connectors/jni/java/org/apache/tomcat/jni/Poll.java,v
retrieving revision 1.4
retrieving revision 1.5
diff -u -r1.4 -r1.5
--- Poll.java 13 Apr 2005 13:16:55 -0000 1.4
+++ Poll.java 14 Apr 2005 06:47:56 -0000 1.5
@@ -60,9 +60,10 @@
* @param size The maximum number of descriptors that this pollset can
hold
* @param p The pool from which to allocate the pollset
* @param flags Optional flags to modify the operation of the pollset.
+ * @param ttl Maximum time to live for a particular socket.
* @return The pointer in which to return the newly created object
*/
- public static native long create(int size, long p, int flags)
+ public static native long create(int size, long p, int flags, long ttl)
throws Error;
/**
* Destroy a pollset object
@@ -98,8 +99,7 @@
* @return Number of signalled descriptors (output parameter)
*/
public static native int poll(long pollset, long timeout,
- long [] descriptors)
- throws Error;
+ long [] descriptors);
/**
* Return socket from poll descriptor
1.4 +6 -6 jakarta-tomcat-connectors/jni/native/src/network.c
Index: network.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/network.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- network.c 13 Apr 2005 13:17:42 -0000 1.3
+++ network.c 14 Apr 2005 06:47:56 -0000 1.4
@@ -183,7 +183,7 @@
UNREFERENCED(o);
apr_socket_opt_get(s, APR_SO_NONBLOCK, &nb);
if (tosend > 0)
- nbytes = min(nbytes, (apr_size_t)tosend);
+ nbytes = min(nbytes - offset, (apr_size_t)tosend);
if (nb)
bytes = (*e)->GetPrimitiveArrayCritical(e, buf, NULL);
else
@@ -213,7 +213,7 @@
goto cleanup;
}
if (len > 0)
- nbytes = min(nbytes, (apr_size_t)len);
+ nbytes = min(nbytes - offset, (apr_size_t)len);
TCN_THROW_IF_ERR(apr_socket_send(s, bytes + offset, &nbytes), nbytes);
cleanup:
@@ -264,7 +264,7 @@
UNREFERENCED(o);
apr_socket_opt_get(s, APR_SO_NONBLOCK, &nb);
if (tosend > 0)
- nbytes = min(nbytes, (apr_size_t)tosend);
+ nbytes = min(nbytes - offset, (apr_size_t)tosend);
if (nb)
bytes = (*e)->GetPrimitiveArrayCritical(e, buf, NULL);
else
@@ -288,7 +288,7 @@
UNREFERENCED(o);
if (toread > 0)
- nbytes = min(nbytes, (apr_size_t)toread);
+ nbytes = min(nbytes - offset, (apr_size_t)toread);
TCN_THROW_IF_ERR(apr_socket_recv(s, bytes + offset, &nbytes), nbytes);
@@ -314,7 +314,7 @@
goto cleanup;
}
if (len > 0)
- nbytes = min(nbytes, (apr_size_t)len);
+ nbytes = min(nbytes - offset, (apr_size_t)len);
TCN_THROW_IF_ERR(apr_socket_recv(s, bytes + offset, &nbytes), nbytes);
@@ -333,7 +333,7 @@
UNREFERENCED(o);
if (toread > 0)
- nbytes = min(nbytes, (apr_size_t)toread);
+ nbytes = min(nbytes - offset, (apr_size_t)toread);
TCN_THROW_IF_ERR(apr_socket_recvfrom(f, s,
(apr_int32_t)flags, bytes + offset, &nbytes), nbytes);
1.4 +83 -17 jakarta-tomcat-connectors/jni/native/src/poll.c
Index: poll.c
===================================================================
RCS file: /home/cvs/jakarta-tomcat-connectors/jni/native/src/poll.c,v
retrieving revision 1.3
retrieving revision 1.4
diff -u -r1.3 -r1.4
--- poll.c 13 Apr 2005 13:17:42 -0000 1.3
+++ poll.c 14 Apr 2005 06:47:56 -0000 1.4
@@ -18,55 +18,86 @@
#include "apr_poll.h"
#include "tcn.h"
+/* Internal poll structure for queryset
+ */
+
+typedef struct tcn_pollset {
+ apr_pool_t *pool;
+ apr_int32_t nelts;
+ apr_int32_t nalloc;
+ apr_pollset_t *pollset;
+ apr_pollfd_t *query_set;
+ apr_time_t *query_add;
+ apr_interval_time_t max_ttl;
+} tcn_pollset_t;
TCN_IMPLEMENT_CALL(jlong, Poll, create)(TCN_STDARGS, jint size,
- jlong pool, jint flags)
+ jlong pool, jint flags,
+ jlong ttl)
{
apr_pool_t *p = J2P(pool, apr_pool_t *);
apr_pollset_t *pollset = NULL;
-
+ tcn_pollset_t *tps = NULL;
UNREFERENCED(o);
TCN_THROW_IF_ERR(apr_pollset_create(&pollset,
(apr_uint32_t)size, p, (apr_uint32_t)flags),
pollset);
+ tps = apr_palloc(p, sizeof(tcn_pollset_t));
+ tps->pollset = pollset;
+ tps->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+ tps->query_add = apr_palloc(p, size * sizeof(apr_time_t));
+ tps->nelts = 0;
+ tps->nalloc = size;
+ tps->pool = p;
+ tps->max_ttl = J2T(ttl);
+
cleanup:
- return P2J(pollset);
+ return P2J(tps);
}
TCN_IMPLEMENT_CALL(jint, Poll, destroy)(TCN_STDARGS, jlong pollset)
{
- apr_pollset_t *p = J2P(pollset, apr_pollset_t *);
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);;
UNREFERENCED_STDARGS;;
- return (jint)apr_pollset_destroy(p);
+ return (jint)apr_pollset_destroy(p->pollset);
}
TCN_IMPLEMENT_CALL(jint, Poll, add)(TCN_STDARGS, jlong pollset,
jlong socket, jlong data,
jint reqevents)
{
- apr_pollset_t *p = J2P(pollset, apr_pollset_t *);
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
apr_pollfd_t fd;
+ apr_status_t rv;
UNREFERENCED_STDARGS;
+ if (p->nelts == p->nalloc) {
+ return APR_ENOMEM;
+ }
memset(&fd, 0, sizeof(apr_pollfd_t));
fd.desc_type = APR_POLL_SOCKET;
fd.reqevents = (apr_int16_t)reqevents;
fd.desc.s = J2P(socket, apr_socket_t *);
fd.client_data = J2P(data, void *);
-
- return (jint)apr_pollset_add(p, &fd);
+ if ((rv = apr_pollset_add(p->pollset, &fd)) == APR_SUCCESS) {
+ p->query_set[p->nelts] = fd;
+ p->query_add[p->nelts] = apr_time_now();
+ p->nelts++;
+ }
+ return (jint)rv;
}
TCN_IMPLEMENT_CALL(jint, Poll, remove)(TCN_STDARGS, jlong pollset,
jlong socket)
{
- apr_pollset_t *p = J2P(pollset, apr_pollset_t *);
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
apr_pollfd_t fd;
+ apr_int32_t i;
UNREFERENCED_STDARGS;;
@@ -74,29 +105,64 @@
fd.desc_type = APR_POLL_SOCKET;
fd.desc.s = J2P(socket, apr_socket_t *);
- return (jint)apr_pollset_remove(p, &fd);
+ for (i = 0; i < p->nelts; i++) {
+ if (fd.desc.s == p->query_set[i].desc.s) {
+ /* Found an instance of the fd: remove this and any other copies
*/
+ apr_int32_t dst = i;
+ apr_int32_t old_nelts = p->nelts;
+ p->nelts--;
+ for (i++; i < old_nelts; i++) {
+ if (fd.desc.s == p->query_set[i].desc.s) {
+ p->nelts--;
+ }
+ else {
+ p->query_set[dst] = p->query_set[i];
+ dst++;
+ }
+ }
+ break;
+ }
+ }
+
+ return (jint)apr_pollset_remove(p->pollset, &fd);
}
TCN_IMPLEMENT_CALL(jint, Poll, poll)(TCN_STDARGS, jlong pollset,
jlong timeout, jlongArray set)
{
const apr_pollfd_t *fd = NULL;
- apr_pollset_t *p = J2P(pollset, apr_pollset_t *);
+ tcn_pollset_t *p = J2P(pollset, tcn_pollset_t *);
jlong *pset = (*e)->GetLongArrayElements(e, set, NULL);
- apr_int32_t i, num = 0;
+ apr_int32_t n, i = 0, num = 0;
+ apr_status_t rv;
UNREFERENCED(o);
- TCN_THROW_IF_ERR(apr_pollset_poll(p, J2T(timeout),
- &num, &fd), num);
+ rv = apr_pollset_poll(p->pollset, J2T(timeout), &num, &fd);
-cleanup:
- if (num) {
+ if (rv == APR_SUCCESS && num > 0) {
for (i = 0; i < num; i++) {
pset[i] = P2J(fd);
fd ++;
}
- (*e)->ReleaseLongArrayElements(e, set, pset, 0);
}
+ /* In any case check for timeout sockets */
+ if (p->max_ttl > 0) {
+ apr_time_t now = apr_time_now();
+ /* TODO: Add thread mutex protection
+ * or make sure the Java part is synchronized.
+ */
+ for (n = 0; n < p->nelts; n++) {
+ if ((now - p->query_add[n]) > p->max_ttl) {
+ p->query_set[n].rtnevents = APR_POLLHUP | APR_POLLIN;
+ if (i < p->nelts) {
+ pset[i++] = P2J(&(p->query_set[n]));
+ num++;
+ }
+ }
+ }
+ }
+ if (num)
+ (*e)->ReleaseLongArrayElements(e, set, pset, 0);
else
(*e)->ReleaseLongArrayElements(e, set, pset, JNI_ABORT);
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]