Author: asankha
Date: Wed Jan 24 23:27:59 2007
New Revision: 499693
URL: http://svn.apache.org/viewvc?view=rev&rev=499693
Log:
pool outgoing connections at application level
close all pipe channels
update axis2.xml used by sample axis2 server to use NIO server instead of
SimpleHttpServer
Added:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
webservices/synapse/trunk/java/repository/conf/axis2.xml
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
webservices/synapse/trunk/java/src/main/assembly/bin.xml
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
---
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
(original)
+++
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
Wed Jan 24 23:27:59 2007
@@ -92,6 +92,37 @@
}
/**
+ * Submit a new request over an already established connection, which has
been
+ * 'kept alive'
+ * @param conn the connection to use to send the request, which has been
kept open
+ * @param axis2Req the new request
+ */
+ public void submitRequest(final NHttpClientConnection conn,
Axis2HttpRequest axis2Req) {
+
+ try {
+ HttpContext context = conn.getContext();
+
+ context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
+ context.setAttribute(HttpExecutionContext.HTTP_TARGET_HOST,
axis2Req.getHttpHost());
+
+ context.setAttribute(OUTGOING_MESSAGE_CONTEXT,
axis2Req.getMsgContext());
+ context.setAttribute(REQUEST_SOURCE_CHANNEL,
axis2Req.getSourceChannel());
+
+ HttpRequest request = axis2Req.getRequest();
+ request.getParams().setDefaults(this.params);
+ this.httpProcessor.process(request, context);
+
+ conn.submitRequest(request);
+ context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
+
+ } catch (IOException e) {
+ handleException("I/O Error : " + e.getMessage(), e, conn);
+ } catch (HttpException e) {
+ handleException("Unexpected HTTP protocol error: " +
e.getMessage(), e, conn);
+ }
+ }
+
+ /**
* Invoked when the destination is connected
* @param conn the connection being processed
* @param attachment the attachment set previously
@@ -178,8 +209,10 @@
if (decoder.isCompleted()) {
sink.close();
- if (!connStrategy.keepAlive(response, context)) {
+ if (!connStrategy.keepAlive(response, context)) {
conn.close();
+ } else {
+ ConnectionPool.release(conn);
}
}
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
---
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
(original)
+++
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
Wed Jan 24 23:27:59 2007
@@ -29,6 +29,7 @@
import org.apache.commons.logging.LogFactory;
import java.io.InputStream;
+import java.io.IOException;
/**
* Performs processing of the HTTP response received for our outgoing request.
An instance of this
@@ -97,9 +98,13 @@
in,
outMsgCtx.getEnvelope().getNamespace().getNamespaceURI());
responseMsgCtx.setEnvelope(envelope);
+
+ in.close();
} catch (AxisFault af) {
log.error("Fault creating response SOAP envelope", af);
return;
+ } catch (IOException e) {
+ log.error("Error closing input stream from which message was
read", e);
}
AxisEngine engine = new AxisEngine(cfgCtx);
Added:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java?view=auto&rev=499693
==============================================================================
---
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
(added)
+++
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
Wed Jan 24 23:27:59 2007
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.axis2.transport.nhttp;
+
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.protocol.HttpExecutionContext;
+import org.apache.http.HttpHost;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.io.IOException;
+
+public class ConnectionPool {
+
+ private static final Log log = LogFactory.getLog(ConnectionPool.class);
+
+ /** A map of available connections for reuse. The key selects the
host+port of the
+ * connection and the value contains a List of available connections to
destination
+ */
+ private static Map connMap = Collections.synchronizedMap(new HashMap());
+
+ public static NHttpClientConnection getConnection(String host, int port) {
+
+ String key = host + ":" + Integer.toString(port);
+ List connections = (List) connMap.get(key);
+
+ if (connections == null) {
+ log.debug("No connections available for reuse");
+ return null;
+
+ } else {
+ NHttpClientConnection conn = null;
+
+ while (!connections.isEmpty()) {
+ conn = (NHttpClientConnection) connections.remove(0);
+
+ if (conn.isOpen()) {
+ log.debug("A connection to host : " + host + " on port : "
+
+ port + " is available in the pool, and will be
reused");
+ return conn;
+ } else {
+ log.debug("closing stale connection");
+ try {
+ conn.close();
+ } catch (IOException ignore) {
+ ignore.printStackTrace();
+ }
+ }
+ }
+ return null;
+ }
+ }
+
+ public static void release(NHttpClientConnection conn) {
+
+ HttpHost host = (HttpHost) conn.getContext().getAttribute(
+ HttpExecutionContext.HTTP_TARGET_HOST);
+ String key = host.getHostName() + ":" +
Integer.toString(host.getPort());
+
+ List connections = (List) connMap.get(key);
+ if (connections == null) {
+ connections = Collections.synchronizedList(new LinkedList());
+ connMap.put(key, connections);
+ }
+
+ connections.add(conn);
+
+ log.debug("Released a connection to host: " + host.getHostName() + "
on port : " +
+ host.getPort() + " to the connection pool of current size : " +
connections.size());
+ }
+}
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
---
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
(original)
+++
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
Wed Jan 24 23:27:59 2007
@@ -27,6 +27,7 @@
import org.apache.axis2.transport.OutTransportInfo;
import org.apache.axiom.om.OMOutputFormat;
import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.NHttpClientConnection;
import org.apache.http.nio.impl.reactor.DefaultConnectingIOReactor;
import org.apache.http.nio.impl.DefaultClientIOEventDispatch;
import org.apache.http.nio.reactor.ConnectingIOReactor;
@@ -61,6 +62,8 @@
private ConfigurationContext cfgCtx;
/** The IOReactor */
private ConnectingIOReactor ioReactor = null;
+ /** The client handler */
+ private NHttpClientHandler handler = null;
/**
* Initialize the transport sender, and execute reactor in new seperate
thread
@@ -93,7 +96,7 @@
log.error("Error starting the IOReactor", e);
}
- NHttpClientHandler handler = new ClientHandler(cfgCtx, params);
+ handler = new ClientHandler(cfgCtx, params);
IOEventDispatch ioEventDispatch = new
DefaultClientIOEventDispatch(handler, params);
try {
@@ -113,7 +116,7 @@
private HttpParams getClientParameters() {
HttpParams params = new DefaultHttpParams(null);
params
- .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 5000)
+ .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
.setIntParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 10000)
.setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
.setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK,
false)
@@ -169,9 +172,18 @@
HttpHost httpHost = new HttpHost(url.getHost(), url.getPort(),
url.getProtocol());
Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr, httpHost,
msgContext);
- SessionRequest req = ioReactor.connect(
- new InetSocketAddress(url.getHost(), url.getPort()), null,
axis2Req);
+ NHttpClientConnection conn =
ConnectionPool.getConnection(url.getHost(), url.getPort());
+
+ if (conn == null) {
+ SessionRequest req = ioReactor.connect(
+ new InetSocketAddress(url.getHost(), url.getPort()), null,
axis2Req);
+ log.debug("A new connection established");
+ } else {
+ ((ClientHandler) handler).submitRequest(conn, axis2Req);
+ log.debug("An existing connection reused");
+ }
+
axis2Req.streamMessageContents();
} catch (MalformedURLException e) {
@@ -209,6 +221,10 @@
} catch (IOException e) {
handleException("IO Error sending response message", e);
}
+
+ try {
+ worker.getIs().close();
+ } catch (IOException ignore) {}
}
private void sendUsingOutputStream(MessageContext msgContext) throws
AxisFault {
Modified:
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
---
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
(original)
+++
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
Wed Jan 24 23:27:59 2007
@@ -205,6 +205,10 @@
return os;
}
+ public InputStream getIs() {
+ return is;
+ }
+
public ServerHandler getServiceHandler() {
return serverHandler;
}
Modified: webservices/synapse/trunk/java/repository/conf/axis2.xml
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/axis2.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- webservices/synapse/trunk/java/repository/conf/axis2.xml (original)
+++ webservices/synapse/trunk/java/repository/conf/axis2.xml Wed Jan 24
23:27:59 2007
@@ -102,7 +102,6 @@
<!-- ================================================= -->
<!-- Transport Ins -->
<!-- ================================================= -->
- <!--<transportReceiver name="http"
class="org.apache.axis2.transport.nhttp.AsyncHTTPListener"/>-->
<!--<transportReceiver name="http"
class="org.apache.axis2.transport.http.SimpleHTTPServer">
<parameter name="port" locked="false">8080</parameter>-->
Modified:
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
---
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
(original)
+++
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
Wed Jan 24 23:27:59 2007
@@ -102,10 +102,9 @@
<!-- ================================================= -->
<!-- Transport Ins -->
<!-- ================================================= -->
- <!--<transportReceiver name="http"
class="org.apache.axis2.transport.nhttp.AsyncHTTPListener"/>-->
- <transportReceiver name="http"
+ <!--<transportReceiver name="http"
class="org.apache.axis2.transport.http.SimpleHTTPServer">
- <parameter name="port" locked="false">9000</parameter>
+ <parameter name="port" locked="false">9000</parameter>-->
<!-- Here is the complete list of supported parameters (see example
settings further below):
port: the port to listen on (default 6060)
hostname: if non-null, url prefix used in reply-to endpoint
references (default null)
@@ -128,6 +127,12 @@
<!-- <parameter name="RequestMaxThreadPoolSize"
locked="false">100</parameter> -->
<!-- <parameter name="threadKeepAliveTime"
locked="false">240000</parameter> -->
<!-- <parameter name="threadKeepAliveTimeUnit"
locked="false">MILLISECONDS</parameter> -->
+ <!--</transportReceiver>-->
+
+ <!-- the experimental non blocking http transport based on HttpCore + NIO
extensions -->
+ <transportReceiver name="http"
class="org.apache.axis2.transport.nhttp.HttpCoreNIOListener">
+ <parameter name="port" locked="false">8080</parameter>
+ <parameter name="non-blocking" locked="false">true</parameter>
</transportReceiver>
<!--Uncomment this and configure as appropriate for JMS transport support,
after setting up your JMS environment (e.g. ActiveMQ)
@@ -176,13 +181,18 @@
class="org.apache.axis2.transport.tcp.TCPTransportSender"/>
<transportSender name="local"
class="org.apache.axis2.transport.local.LocalTransportSender"/>
- <transportSender name="http"
-
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender"/>
+ <!--<transportSender name="http"
+
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender"/>-->
<!--<transportSender name="http"
class="org.apache.axis2.transport.nhttp.AsyncHTTPSender">
<parameter name="PROTOCOL" locked="false">HTTP/1.1</parameter>
<parameter name="Transfer-Encoding" locked="false">chunked</parameter>
</transportSender>-->
+ <!-- the experimental non-blocking http transport based on HttpCore + NIO
extensions -->
+ <transportSender name="http"
+ class="org.apache.axis2.transport.nhttp.HttpCoreNIOSender">
+ <parameter name="non-blocking" locked="false">true</parameter>
+ </transportSender>
<transportSender name="https"
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender">
<parameter name="PROTOCOL" locked="false">HTTP/1.1</parameter>
Modified: webservices/synapse/trunk/java/src/main/assembly/bin.xml
URL:
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/src/main/assembly/bin.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- webservices/synapse/trunk/java/src/main/assembly/bin.xml (original)
+++ webservices/synapse/trunk/java/src/main/assembly/bin.xml Wed Jan 24
23:27:59 2007
@@ -171,6 +171,13 @@
</includes>
</fileSet>
<fileSet>
+ <directory>modules/nhttp/target</directory>
+ <outputDirectory>synapse-${synapse.version}/lib</outputDirectory>
+ <includes>
+ <include>synapse-nhttp-${synapse.version}.jar</include>
+ </includes>
+ </fileSet>
+ <fileSet>
<directory>src/main/release/docs</directory>
<outputDirectory>synapse-${synapse.version}/lib/endorsed</outputDirectory>
<includes>
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]