Author: asankha
Date: Wed Jan 24 23:27:59 2007
New Revision: 499693

URL: http://svn.apache.org/viewvc?view=rev&rev=499693
Log:
pool outgoing connections at application level
close all pipe channels
update axis2.xml used by sample axis2 server to use NIO server instead of 
SimpleHttpServer

Added:
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
Modified:
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
    
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
    webservices/synapse/trunk/java/repository/conf/axis2.xml
    
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
    webservices/synapse/trunk/java/src/main/assembly/bin.xml

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientHandler.java
 Wed Jan 24 23:27:59 2007
@@ -92,6 +92,37 @@
     }
 
     /**
+     * Submit a new request over an already established connection, which has 
been 
+     * 'kept alive'
+     * @param conn the connection to use to send the request, which has been 
kept open
+     * @param axis2Req the new request
+     */
+    public void submitRequest(final NHttpClientConnection conn, 
Axis2HttpRequest axis2Req) {
+
+        try {
+            HttpContext context = conn.getContext();
+
+            context.setAttribute(HttpExecutionContext.HTTP_CONNECTION, conn);
+            context.setAttribute(HttpExecutionContext.HTTP_TARGET_HOST, 
axis2Req.getHttpHost());
+
+            context.setAttribute(OUTGOING_MESSAGE_CONTEXT, 
axis2Req.getMsgContext());
+            context.setAttribute(REQUEST_SOURCE_CHANNEL, 
axis2Req.getSourceChannel());
+
+            HttpRequest request = axis2Req.getRequest();
+            request.getParams().setDefaults(this.params);
+            this.httpProcessor.process(request, context);
+
+            conn.submitRequest(request);
+            context.setAttribute(HttpExecutionContext.HTTP_REQUEST, request);
+
+        } catch (IOException e) {
+            handleException("I/O Error : " + e.getMessage(), e, conn);
+        } catch (HttpException e) {
+            handleException("Unexpected HTTP protocol error: " + 
e.getMessage(), e, conn);
+        }
+    }
+
+    /**
      * Invoked when the destination is connected
      * @param conn the connection being processed
      * @param attachment the attachment set previously
@@ -178,8 +209,10 @@
 
             if (decoder.isCompleted()) {
                 sink.close();
-                 if (!connStrategy.keepAlive(response, context)) {
+                if (!connStrategy.keepAlive(response, context)) {
                     conn.close();
+                } else {
+                    ConnectionPool.release(conn);
                 }
             }
 

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ClientWorker.java
 Wed Jan 24 23:27:59 2007
@@ -29,6 +29,7 @@
 import org.apache.commons.logging.LogFactory;
 
 import java.io.InputStream;
+import java.io.IOException;
 
 /**
  * Performs processing of the HTTP response received for our outgoing request. 
An instance of this
@@ -97,9 +98,13 @@
                 in,
                 outMsgCtx.getEnvelope().getNamespace().getNamespaceURI());
             responseMsgCtx.setEnvelope(envelope);
+
+            in.close();
         } catch (AxisFault af) {
             log.error("Fault creating response SOAP envelope", af);
             return;
+        } catch (IOException e) {
+            log.error("Error closing input stream from which message was 
read", e);
         }
 
         AxisEngine engine = new AxisEngine(cfgCtx);

Added: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java?view=auto&rev=499693
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
 (added)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ConnectionPool.java
 Wed Jan 24 23:27:59 2007
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ *  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.axis2.transport.nhttp;
+
+import org.apache.http.nio.NHttpClientConnection;
+import org.apache.http.protocol.HttpExecutionContext;
+import org.apache.http.HttpHost;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.util.*;
+import java.io.IOException;
+
+public class ConnectionPool {
+
+    private static final Log log = LogFactory.getLog(ConnectionPool.class);
+
+    /** A map of available connections for reuse. The key selects the 
host+port of the
+     * connection and the value contains a List of available connections to 
destination
+     */
+    private static Map connMap = Collections.synchronizedMap(new HashMap());
+
+    public static NHttpClientConnection getConnection(String host, int port) {
+
+        String key = host + ":" + Integer.toString(port);
+        List connections = (List) connMap.get(key);
+
+        if (connections == null) {
+            log.debug("No connections available for reuse");
+            return null;
+
+        } else {
+            NHttpClientConnection conn = null;
+
+            while (!connections.isEmpty()) {
+                conn = (NHttpClientConnection) connections.remove(0);
+
+                if (conn.isOpen()) {
+                    log.debug("A connection to host : " + host + " on port : " 
+
+                        port + " is available in the pool, and will be 
reused");
+                    return conn;
+                } else {
+                    log.debug("closing stale connection");
+                    try {
+                        conn.close();
+                    } catch (IOException ignore) {
+                        ignore.printStackTrace();
+                    }
+                }
+            }
+            return null;
+        }
+    }
+
+    public static void release(NHttpClientConnection conn) {
+
+        HttpHost host = (HttpHost) conn.getContext().getAttribute(
+            HttpExecutionContext.HTTP_TARGET_HOST);
+        String key = host.getHostName() + ":" + 
Integer.toString(host.getPort());
+
+        List connections = (List) connMap.get(key);
+        if (connections == null) {
+            connections = Collections.synchronizedList(new LinkedList());
+            connMap.put(key, connections);
+        }
+
+        connections.add(conn);
+
+        log.debug("Released a connection to host: " + host.getHostName() + " 
on port : " +
+            host.getPort() + " to the connection pool of current size : " + 
connections.size());
+    }
+}

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/HttpCoreNIOSender.java
 Wed Jan 24 23:27:59 2007
@@ -27,6 +27,7 @@
 import org.apache.axis2.transport.OutTransportInfo;
 import org.apache.axiom.om.OMOutputFormat;
 import org.apache.http.nio.NHttpClientHandler;
+import org.apache.http.nio.NHttpClientConnection;
 import org.apache.http.nio.impl.reactor.DefaultConnectingIOReactor;
 import org.apache.http.nio.impl.DefaultClientIOEventDispatch;
 import org.apache.http.nio.reactor.ConnectingIOReactor;
@@ -61,6 +62,8 @@
     private ConfigurationContext cfgCtx;
     /** The IOReactor */
     private ConnectingIOReactor ioReactor = null;
+    /** The client handler */
+    private NHttpClientHandler handler = null;
 
     /**
      * Initialize the transport sender, and execute reactor in new seperate 
thread
@@ -93,7 +96,7 @@
             log.error("Error starting the IOReactor", e);
         }
 
-        NHttpClientHandler handler = new ClientHandler(cfgCtx, params);
+        handler = new ClientHandler(cfgCtx, params);
         IOEventDispatch ioEventDispatch = new 
DefaultClientIOEventDispatch(handler, params);
 
         try {
@@ -113,7 +116,7 @@
     private HttpParams getClientParameters() {
         HttpParams params = new DefaultHttpParams(null);
         params
-            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 5000)
+            .setIntParameter(HttpConnectionParams.SO_TIMEOUT, 30000)
             .setIntParameter(HttpConnectionParams.CONNECTION_TIMEOUT, 10000)
             .setIntParameter(HttpConnectionParams.SOCKET_BUFFER_SIZE, 8 * 1024)
             .setBooleanParameter(HttpConnectionParams.STALE_CONNECTION_CHECK, 
false)
@@ -169,9 +172,18 @@
             HttpHost httpHost = new HttpHost(url.getHost(), url.getPort(), 
url.getProtocol());
 
             Axis2HttpRequest axis2Req = new Axis2HttpRequest(epr, httpHost, 
msgContext);
-            SessionRequest req = ioReactor.connect(
-                new InetSocketAddress(url.getHost(), url.getPort()), null, 
axis2Req);
 
+            NHttpClientConnection conn = 
ConnectionPool.getConnection(url.getHost(), url.getPort());
+
+            if (conn == null) {
+                SessionRequest req = ioReactor.connect(
+                    new InetSocketAddress(url.getHost(), url.getPort()), null, 
axis2Req);
+                log.debug("A new connection established");
+            } else {
+                ((ClientHandler) handler).submitRequest(conn, axis2Req);
+                log.debug("An existing connection reused");
+            }
+            
             axis2Req.streamMessageContents();
 
         } catch (MalformedURLException e) {
@@ -209,6 +221,10 @@
         } catch (IOException e) {
             handleException("IO Error sending response message", e);
         }
+
+        try {
+            worker.getIs().close();
+        } catch (IOException ignore) {}        
     }
 
     private void sendUsingOutputStream(MessageContext msgContext) throws 
AxisFault {

Modified: 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
 (original)
+++ 
webservices/synapse/trunk/java/modules/nhttp/src/org/apache/axis2/transport/nhttp/ServerWorker.java
 Wed Jan 24 23:27:59 2007
@@ -205,6 +205,10 @@
         return os;
     }
 
+    public InputStream getIs() {
+        return is;
+    }
+
     public ServerHandler getServiceHandler() {
         return serverHandler;
     }

Modified: webservices/synapse/trunk/java/repository/conf/axis2.xml
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/axis2.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- webservices/synapse/trunk/java/repository/conf/axis2.xml (original)
+++ webservices/synapse/trunk/java/repository/conf/axis2.xml Wed Jan 24 
23:27:59 2007
@@ -102,7 +102,6 @@
     <!-- ================================================= -->
     <!-- Transport Ins -->
     <!-- ================================================= -->
-    <!--<transportReceiver name="http" 
class="org.apache.axis2.transport.nhttp.AsyncHTTPListener"/>-->
     <!--<transportReceiver name="http"
                        
class="org.apache.axis2.transport.http.SimpleHTTPServer">
         <parameter name="port" locked="false">8080</parameter>-->

Modified: 
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- 
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml 
(original)
+++ 
webservices/synapse/trunk/java/repository/conf/sample/resources/misc/axis2.xml 
Wed Jan 24 23:27:59 2007
@@ -102,10 +102,9 @@
     <!-- ================================================= -->
     <!-- Transport Ins -->
     <!-- ================================================= -->
-    <!--<transportReceiver name="http" 
class="org.apache.axis2.transport.nhttp.AsyncHTTPListener"/>-->
-    <transportReceiver name="http"
+    <!--<transportReceiver name="http"
                        
class="org.apache.axis2.transport.http.SimpleHTTPServer">
-        <parameter name="port" locked="false">9000</parameter>
+        <parameter name="port" locked="false">9000</parameter>-->
     <!-- Here is the complete list of supported parameters (see example 
settings further below):
         port: the port to listen on (default 6060)
         hostname:  if non-null, url prefix used in reply-to endpoint 
references                                 (default null)
@@ -128,6 +127,12 @@
         <!-- <parameter name="RequestMaxThreadPoolSize"  
locked="false">100</parameter>                     -->
         <!-- <parameter name="threadKeepAliveTime"       
locked="false">240000</parameter>                  -->
         <!-- <parameter name="threadKeepAliveTimeUnit"   
locked="false">MILLISECONDS</parameter>            -->
+    <!--</transportReceiver>-->
+
+    <!-- the experimental non blocking http transport based on HttpCore + NIO 
extensions -->
+    <transportReceiver name="http" 
class="org.apache.axis2.transport.nhttp.HttpCoreNIOListener">
+       <parameter name="port" locked="false">8080</parameter>
+       <parameter name="non-blocking" locked="false">true</parameter>
     </transportReceiver>
     
     <!--Uncomment this and configure as appropriate for JMS transport support, 
after setting up your JMS environment (e.g. ActiveMQ)
@@ -176,13 +181,18 @@
                      
class="org.apache.axis2.transport.tcp.TCPTransportSender"/>
     <transportSender name="local"
                      
class="org.apache.axis2.transport.local.LocalTransportSender"/>
-    <transportSender name="http"
-                     
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender"/>
+    <!--<transportSender name="http"
+                     
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender"/>-->
     <!--<transportSender name="http"
                      class="org.apache.axis2.transport.nhttp.AsyncHTTPSender">
         <parameter name="PROTOCOL" locked="false">HTTP/1.1</parameter>
         <parameter name="Transfer-Encoding" locked="false">chunked</parameter>
     </transportSender>-->
+    <!-- the experimental non-blocking http transport based on HttpCore + NIO 
extensions -->
+    <transportSender name="http"
+                 class="org.apache.axis2.transport.nhttp.HttpCoreNIOSender">
+        <parameter name="non-blocking" locked="false">true</parameter>
+    </transportSender>
     <transportSender name="https"
                      
class="org.apache.axis2.transport.http.CommonsHTTPTransportSender">
         <parameter name="PROTOCOL" locked="false">HTTP/1.1</parameter>

Modified: webservices/synapse/trunk/java/src/main/assembly/bin.xml
URL: 
http://svn.apache.org/viewvc/webservices/synapse/trunk/java/src/main/assembly/bin.xml?view=diff&rev=499693&r1=499692&r2=499693
==============================================================================
--- webservices/synapse/trunk/java/src/main/assembly/bin.xml (original)
+++ webservices/synapse/trunk/java/src/main/assembly/bin.xml Wed Jan 24 
23:27:59 2007
@@ -171,6 +171,13 @@
             </includes>
         </fileSet>
         <fileSet>
+            <directory>modules/nhttp/target</directory>
+            <outputDirectory>synapse-${synapse.version}/lib</outputDirectory>
+            <includes>
+                <include>synapse-nhttp-${synapse.version}.jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
             <directory>src/main/release/docs</directory>
             
<outputDirectory>synapse-${synapse.version}/lib/endorsed</outputDirectory>
             <includes>



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to