Author: asankha
Date: Fri Oct 27 00:03:50 2006
New Revision: 468299
URL: http://svn.apache.org/viewvc?view=rev&rev=468299
Log:
refactoring and unit testing code added
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/NHttpException.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
Modified: incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
--- incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties
(original)
+++ incubator/synapse/branches/NIO/modules/niohttp/src/log4j.properties Fri Oct
27 00:03:50 2006
@@ -3,7 +3,7 @@
# Set the level to DEBUG if you want to log all SlideExceptions (some of them
aren't errors)
#log4j.category.org.apache.axis2=DEBUG
log4j.category.org.apache.synapse=DEBUG
-log4j.category.org.apache.axis2.transport.niohttp=TRACE
+log4j.category.org.apache.axis2.transport.niohttp=DEBUG
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/GenericIOHandler.java
Fri Oct 27 00:03:50 2006
@@ -58,7 +58,7 @@
log.trace("attempting to read into NW a maximum of "
+ msgReader.availableForWrite() + " bytes");
- if (readFromPhysicalNetwork() > 0) {
+ if (readFromPhysicalNetwork(msgReader.availableForWrite()) >= 0) {
if (log.isTraceEnabled()) {
log.trace("read into NW buffer : \n" +
@@ -80,9 +80,10 @@
/** acp
* Read from the physical NW until the NW read buffer is filled up
+ * @param maxBytes maximum number of bytes to read from the NW
* @return the number of bytes actually read
*/
- protected int readFromPhysicalNetwork() {
+ protected int readFromPhysicalNetwork(int maxBytes) {
log.debug("attempting to read from the physical NW");
@@ -91,7 +92,15 @@
int bytesRead = 0;
try {
- bytesRead = socket.read(nwReadBuffer);
+ // dont bite more than we can chew.. if nwReadBuffer cannot
accomodate what
+ // can digest, read only what we can
+ if (maxBytes < appReadPos + nwReadPos) {
+ return 0;
+
+ } else {
+ bytesRead = socket.read(nwReadBuffer);
+ }
+
} catch (IOException e) {
handleException("Error reading into NW buffer from socket : " +
e.getMessage(), e);
}
@@ -149,6 +158,8 @@
/** acp
* Process a ready write event
+ * @param closeConnectionIfDone close the connection if all data is written
+ * to the wire and a connection close has been requested after the write
*/
protected void processReadyWrite(boolean closeConnectionIfDone) {
@@ -209,6 +220,12 @@
log.debug("attempting to write from message body into App buffer");
InputStream is = msgWriter.getInputStream();
+ // does this message have a body? i.e. this could be a GET
+ if (is == null) {
+ log.debug("message has an empty body");
+ msgWriter.setStreamingBody(false); // indicate end of body
+ return;
+ }
appWriteBuffer.position(appWritePos);
try {
@@ -305,7 +322,7 @@
}
} catch (IOException e1) {}
- //TODO throw new ();
+ //throw new NHttpException(msg, e);
}
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
Fri Oct 27 00:03:50 2006
@@ -43,8 +43,6 @@
*/
public abstract class HttpMessage {
- private static final int DEFAULT_BUFFER_SIZE = 4096;
-
private static final Log log = LogFactory.getLog(HttpMessage.class);
private InputStream inputStream;
@@ -71,6 +69,8 @@
outputStream = new ChunkedOutputStream(pipedOS);
} else if (getContentLength() > 0) {
outputStream = new ContentLengthOutputStream(pipedOS,
getContentLength());
+ } else if (getContentLength() == 0) {
+ return null; // no body.. e.g. a GET
} else {
throw new UnsupportedOperationException("Unsupported body
streaming");
}
@@ -88,10 +88,6 @@
* http headers of this message
*/
protected Map headers = new HashMap();
- /**
- * holder of the body content of this message
- */
- protected ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
/**
* A flag to detect if the getOutputStream() caller did not properly
close() the stream
@@ -140,58 +136,6 @@
}
/**
- * Get an OutputStream to write the body of this httpMessage
- * @return an OutputStream to write the body
- */
-/* public OutputStream getOutputStream() {
- // position for body
- buffer.clear();
- outputStreamOpened = true;
- buffer.position(0);
-
- // Returns an output stream for a ByteBuffer.
- // The write() methods use the relative ByteBuffer put() methods.
- return new OutputStream() {
- public synchronized void write(int b) throws IOException {
- while (true) {
- try {
- buffer.put((byte) b);
- return;
- } catch (BufferOverflowException bo) {
- expandBuffer();
- }
- }
- }
-
- public synchronized void write(byte[] bytes, int off, int len)
throws IOException {
- while (true) {
- try {
- buffer.put(bytes, off, len);
- return;
- } catch (BufferOverflowException bo) {
- expandBuffer();
- }
- }
- }
-
- public void close() throws IOException {
- buffer.flip();
- outputStreamOpened = false;
- }
- };
- }*/
-
- /**
- * Expand (double) the main ByteBuffer of this message
- */
- private void expandBuffer() {
- ByteBuffer newBuf = ByteBuffer.allocate(buffer.capacity() * 2);
- log.debug("Expanding ByteBuffer to " + newBuf.capacity() + " bytes");
- buffer.flip();
- buffer = newBuf.put(buffer);
- }
-
- /**
* Return a string representation of the message inputStream HTTP
wire-format
* @return a String representation of the message inputStream HTTP
wire-format
*/
@@ -221,31 +165,4 @@
* @return the first line of text for the toString()
*/
public abstract String toStringLine();
-
- /**
- * Reset the internal state of this message to be reused
- */
- public void reset() {
- if (buffer.capacity() > DEFAULT_BUFFER_SIZE) {
- buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- } else {
- buffer.clear();
- }
- }
-
- /**
- * Return a reference to the internal ByteBuffer of this message
- * @return the reference to the internal ByteBuffer used
- */
- public ByteBuffer getBuffer() {
- return buffer;
- }
-
- /**
- * Set the internal buffer to the given ByteBuffer
- * @param buffer
- */
- public void setBuffer(ByteBuffer buffer) {
- this.buffer = buffer;
- }
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
Fri Oct 27 00:03:50 2006
@@ -182,22 +182,4 @@
}
}
- /**
- * Causes the request to contain an empty body (i.e. for a GET etc)
- */
- public void setEmptyBody() {
- buffer.position(0);
- buffer.flip();
- }
-
- //------------------------------ TESTING CODE ------------------------
- /**
- * Convenience method for testing etc
- * @param body
- */
- public void setBody(String body) {
- buffer.position(0);
- buffer.put(body.getBytes());
- buffer.flip();
- }
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
Fri Oct 27 00:03:50 2006
@@ -45,7 +45,6 @@
if (request.isConnectionClose()) {
addHeader(Constants.CONNECTION, Constants.CLOSE);
}
- buffer.flip();
}
/**
@@ -76,7 +75,6 @@
public void commit() {
if (outputStreamOpened) {
// if someone didnt properly close the OutputStream after writing,
flip buffer
- buffer.flip();
outputStreamOpened = false;
}
if (request != null) {
@@ -95,17 +93,6 @@
public ResponseStatus getStatus() {
return status;
- }
-
- /**
- * Return a ByteBuffer representation of this message in HTTP wire-format
for transmission
- * @return the ByteBuffer representation of this message
- */
- public ByteBuffer getWireBuffer() {
- if (buffer.limit() > 0) {
- headers.put(Constants.CONTENT_LENGTH,
Integer.toString(buffer.limit()));
- }
- return ByteBuffer.wrap(toString().getBytes());
}
public String toStringLine() {
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
Fri Oct 27 00:03:50 2006
@@ -52,6 +52,9 @@
}
public void setResponse(HttpResponse response) {
+ if (msgWriter == null) {
+ msgWriter = new MessageWriter(false, response);
+ }
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
@@ -61,6 +64,11 @@
*/
public void run() {
+ if (!sk.isValid()) {
+ sk.cancel();
+ return;
+ }
+
try {
if (sk.isReadable()) {
log.debug("readable");
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/NHttpException.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/NHttpException.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/NHttpException.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/NHttpException.java
Fri Oct 27 00:03:50 2006
@@ -20,4 +20,12 @@
public NHttpException(String msg) {
super(msg);
}
+
+ public NHttpException(String msg, Exception e) {
+ super(msg, e);
+ }
+
+ public NHttpException(Throwable throwable) {
+ super(throwable);
+ }
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
Fri Oct 27 00:03:50 2006
@@ -45,10 +45,6 @@
}
public void setRequest(HttpRequest request) {
- if (!request.isChunked()) {
- request.addHeader(Constants.CONTENT_LENGTH,
- Integer.toString(request.getBuffer().limit()));
- }
msgWriter = new MessageWriter(true, request);
}
@@ -64,6 +60,12 @@
* The main handler routing for outgoing messages and responses
*/
public void run() {
+
+ if (!sk.isValid()) {
+ sk.cancel();
+ return;
+ }
+
try {
if (sk.isConnectable() && socket.finishConnect()) {
log.debug("socket was connectable and now connected");
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/Reactor.java
Fri Oct 27 00:03:50 2006
@@ -19,27 +19,19 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URL;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
import edu.emory.mathcs.backport.java.util.concurrent.ExecutorService;
import edu.emory.mathcs.backport.java.util.concurrent.ThreadPoolExecutor;
import edu.emory.mathcs.backport.java.util.concurrent.LinkedBlockingQueue;
import edu.emory.mathcs.backport.java.util.concurrent.TimeUnit;
-//import org.apache.mina.filter.thread.LeaderFollowersThreadPool;
-
/**
* dynamic buffer expansion on need - done
* TODO socket timeouts, 100 continue, 202 processing - asap
@@ -90,8 +82,6 @@
private ExecutorService workerPool = null;
- //private LeaderFollowersThreadPool lfPool = null;
-
public static synchronized Reactor getInstance(boolean secure) {
if (secure) {
return _httpsReactor;
@@ -103,15 +93,9 @@
public static synchronized Reactor createReactor(
String host, int port, boolean secure, HttpService httpService) throws
IOException {
if (secure) {
- if (_httpsReactor != null) {
- _httpsReactor.setShutdownRequested(true);
- }
_httpsReactor = new Reactor(host, port, secure, httpService);
return _httpsReactor;
} else {
- if (_httpReactor != null) {
- _httpReactor.setShutdownRequested(true);
- }
_httpReactor = new Reactor(host, port, secure, httpService);
return _httpReactor;
}
@@ -148,9 +132,6 @@
new org.apache.axis2.util.threadpool.DefaultThreadFactory(
new ThreadGroup("NioHttp Worker thread group"),
"NioHttpWorker"));
-
- //lfPool = new LeaderFollowersThreadPool("LeaderFollower", 10);
- //lfPool.init();
}
/**
@@ -168,8 +149,21 @@
selected.clear();
}
}
+ log.info("Reactor shutting down as a shutdown has been
requested..");
+
} catch (IOException e) {
- e.printStackTrace();
+ log.fatal("Reactor encountered an error while selecting : " +
e.getMessage(), e);
+
+ } finally {
+ if (serverSocketChannel != null && serverSocketChannel.isOpen()) {
+ try {
+ serverSocketChannel.close();
+ } catch (IOException e) {}
+ try {
+ selector.close();
+ } catch (IOException e) {}
+ }
+ log.info("Reactor shutdown. Server socket channel and the main
selector closed..");
}
}
@@ -185,9 +179,7 @@
IOHandler h = (IOHandler) r;
if (!h.isBeingProcessed()) {
h.lock();
- //r.run();
workerPool.execute(r);
- //lfPool.submit(r);
}
}
}
@@ -212,7 +204,7 @@
new IncomingHandler(socket, selector, httpService);
}
} catch (IOException e) {
- handleException("Exception while accepting a connection : " +
e.getMessage(), e);
+ log.warn("Error accepting a connection : " + e.getMessage(),
e);
}
}
}
@@ -227,6 +219,7 @@
* for this request
*/
public void send(HttpRequest request, Runnable callback) {
+
SocketChannel socket = null;
try {
InetSocketAddress addr = new InetSocketAddress(
@@ -248,22 +241,17 @@
socket.close();
} catch (IOException ioe) {}
}
- handleException("IO Exception : " + e.getMessage() +
+ log.error("IO Exception : " + e.getMessage() +
" sending request : " + request + " : " , e);
}
}
- private static void handleException(String msg, Exception e) {
- // todo decide how to handle exceptions and cleanup resources etc
- log.error(msg, e);
- e.printStackTrace();
- }
-
/**
* Request to shutdown the reactor
* @param shutdownRequested if true, will request the reactor to shutdown
*/
public void setShutdownRequested(boolean shutdownRequested) {
+ log.info("reactor shudown requested..");
this.shutdownRequested = shutdownRequested;
}
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=diff&rev=468299&r1=468298&r2=468299
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
Fri Oct 27 00:03:50 2006
@@ -15,73 +15,106 @@
*/
package org.apache.axis2.transport.niohttp.impl;
+import junit.framework.TestCase;
+import junit.framework.Test;
+import junit.framework.TestSuite;
+import junit.extensions.TestSetup;
+import junit.extensions.RepeatedTest;
+
import java.net.URL;
import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.ByteBuffer;
-public class ReactorTester {
+public class ReactorTester extends TestCase {
- private Reactor r = null;
+ private static Reactor reactor, synapseReactor = null;
+ private static Thread rt = null;
- public static void main(String[] args) throws Exception {
- ReactorTester rt = new ReactorTester();
- //rt.runDemo();
- //rt.simpleGet();
- rt.simplePost();
+ public ReactorTester(String name) {
+ super(name);
}
- private void simplePost() throws IOException {
+ public static Test suite() {
+ TestSuite suite = new TestSuite();
+ suite.addTest(new RepeatedTest(new
ReactorTester("testChunkedPostWithStreaming"), 100));
+ suite.addTest(new RepeatedTest(new
ReactorTester("testChunkedPostWithoutStreaming"), 100));
+ suite.addTest(new RepeatedTest(new
ReactorTester("testContentLengthPostWithStreaming"), 100));
+ suite.addTest(new RepeatedTest(new
ReactorTester("testContentLengthPostWithoutStreaming"), 100));
+ //suite.addTest(new RepeatedTest(new ReactorTester("testSimpleGet"),
100));
+
+ TestSetup setup = new TestSetup(suite) {
+ protected void setUp() throws Exception {
+ System.out.println("One time setup of the reactors");
+ reactor = createReactor(9001);
+ rt = new Thread(reactor);
+ rt.setDaemon(true);
+ rt.start();
+ }
- HttpRequest request = new HttpRequest(
- new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+ protected void tearDown() throws Exception {
+ System.out.println("One time shutdown of the reactors");
+ rt.join();
+ System.out.println("Reactor shutdown..");
+ }
+ };
+ return setup;
+ }
+
+ public void testChunkedPostWithStreaming() throws IOException {
+
+ HttpRequest request = new HttpRequest(new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
request.setMethod(Constants.POST);
request.addHeader("Host", "localhost:9000");
- request.setSecure(true);
request.setConnectionClose();
request.addHeader(Constants.TRANSFER_ENCODING, Constants.CHUNKED);
request.addHeader(Constants.CONTENT_TYPE, "text/xml; charset=utf-8");
OutputStream os = request.getOutputStream();
-
- r = Reactor.createReactor(null, 9001, false, new HttpService() {
- public void handleRequest(HttpRequest request) {
- System.out.println("?");
- }
- public void handleResponse(final HttpResponse response, Runnable
callback) {
+ reactor.send(request, null);
- Runnable r = new Runnable() {
- public void run() {
- System.out.println("Response : " + response);
- InputStream in = response.getInputStream();
+ byte[] bodyBytes = body.getBytes();
+ int incr = 32;
+ for (int i=0; i<bodyBytes.length ; i += incr) {
+ os.write(bodyBytes, i,
+ bodyBytes.length - i < incr ? bodyBytes.length - i : incr);
+ }
+ os.flush();
+ os.close();
+ }
- try {
- byte[] buf = new byte[1024];
- int len;
+ public void testChunkedPostWithoutStreaming() throws IOException {
- Charset set = Charset.forName("us-ascii");
- CharsetDecoder dec = set.newDecoder();
+ HttpRequest request = new HttpRequest(new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+ request.setMethod(Constants.POST);
+ request.addHeader("Host", "localhost:9000");
+ request.setConnectionClose();
+ request.addHeader(Constants.TRANSFER_ENCODING, Constants.CHUNKED);
+ request.addHeader(Constants.CONTENT_TYPE, "text/xml; charset=utf-8");
+ OutputStream os = request.getOutputStream();
- while ((len = in.read(buf)) > 0) {
- //System.out.println("Stream : " +
Util.dumpAsHex(buf, len));
- System.out.println("Stream Chunk : " +
dec.decode(ByteBuffer.wrap(buf, 0, len)));
- }
- in.close();
+ byte[] bodyBytes = body.getBytes();
+ int incr = 32;
+ for (int i=0; i<bodyBytes.length ; i += incr) {
+ os.write(bodyBytes, i,
+ bodyBytes.length - i < incr ? bodyBytes.length - i : incr);
+ }
+ os.flush();
+ os.close();
- } catch (IOException e) {
- e.printStackTrace();
- }
+ reactor.send(request, null);
+ }
- System.out.println("*** TEST COMPLETED ***");
- }
- };
- Thread t = new Thread(r);
- t.start();
- }
- });
- new Thread(r).start();
- //r.send(request, null);
+ public void testContentLengthPostWithoutStreaming() throws IOException {
+
+ HttpRequest request = new HttpRequest(new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+ request.setMethod(Constants.POST);
+ request.addHeader("Host", "localhost:9000");
+ request.setConnectionClose();
+ request.addHeader(Constants.CONTENT_LENGTH,
Integer.toString(body.getBytes().length));
+ request.addHeader(Constants.CONTENT_TYPE, "text/xml; charset=utf-8");
+ OutputStream os = request.getOutputStream();
byte[] bodyBytes = body.getBytes();
int incr = 32;
@@ -92,21 +125,47 @@
os.flush();
os.close();
- r.send(request, null);
+ reactor.send(request, null);
+ }
+
+ public void testContentLengthPostWithStreaming() throws IOException {
+
+ HttpRequest request = new HttpRequest(new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
+ request.setMethod(Constants.POST);
+ request.addHeader("Host", "localhost:9000");
+ request.setConnectionClose();
+ request.addHeader(Constants.CONTENT_LENGTH,
Integer.toString(body.getBytes().length));
+ request.addHeader(Constants.CONTENT_TYPE, "text/xml; charset=utf-8");
+ OutputStream os = request.getOutputStream();
+
+ reactor.send(request, null);
+
+ byte[] bodyBytes = body.getBytes();
+ int incr = 32;
+ for (int i=0; i<bodyBytes.length ; i += incr) {
+ os.write(bodyBytes, i,
+ bodyBytes.length - i < incr ? bodyBytes.length - i : incr);
+ }
+ os.flush();
+ os.close();
}
- private void simpleGet() throws IOException {
+ public void testSimpleGet() throws IOException {
HttpRequest request = new HttpRequest(
//new URL("http://localhost:8080/data.jsp")); // content length
new URL("http://www.asankha.com:80/data.php")); // chunked
request.setMethod(Constants.GET);
//request.addHeader("Host", "localhost:8080");
request.addHeader("Host", "www.asankha.com:80");
- request.setEmptyBody();
- request.setSecure(true);
request.setConnectionClose();
- r = Reactor.createReactor(null, 9001, false, new HttpService() {
+ reactor.send(request, null);
+ }
+
+ private static Reactor createReactor(int port) throws IOException {
+
+ return Reactor.createReactor(null, port, false, new HttpService() {
+
public void handleRequest(HttpRequest request) {
System.out.println("?");
}
@@ -131,38 +190,42 @@
}
in.close();
+ System.out.println("*** TEST PASSED ***");
+
} catch (IOException e) {
+ System.out.println("*** TEST FAILED ***");
e.printStackTrace();
}
- System.out.println("*** TEST COMPLETED ***");
+
}
};
Thread t = new Thread(r);
t.start();
}
});
- new Thread(r).start();
- r.send(request, null);
}
+ public void runGenericSynapseUseCase() throws IOException {
+ synapseReactor = Reactor.createReactor(null, 9002, false,
- private void runDemo() throws IOException {
-
- r = Reactor.createReactor(null, 9001, false,
new HttpService() {
public void handleRequest(HttpRequest request) {
try {
- // create new HttpRequest
HttpRequest forwardReq = new HttpRequest(
new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
-
- //Util.copyStreams(request.getInputStream(),
forwardReq.getOutputStream());
+ forwardReq.setMethod(Constants.POST);
+ forwardReq.addHeader("Host", "localhost:9000");
+ forwardReq.setConnectionClose();
+ forwardReq.addHeader(Constants.TRANSFER_ENCODING,
Constants.CHUNKED);
+ forwardReq.addHeader(Constants.CONTENT_TYPE,
"text/xml; charset=utf-8");
SimpleCallback cb = new SimpleCallback(request);
- r.send(forwardReq, cb);
+ synapseReactor.send(forwardReq, cb);
+
+ Util.copyStreams(request.getInputStream(),
forwardReq.getOutputStream());
} catch (Exception e) {
e.printStackTrace();
@@ -170,14 +233,22 @@
}
public void handleResponse(HttpResponse response, Runnable
callback) {
- System.out.println("Received Response : " + response);
+ System.out.println("Received Response : \n" + response);
SimpleCallback cb = (SimpleCallback) callback;
cb.setResponse(response);
cb.run();
}
});
- new Thread(r).start();
+ Thread t = new Thread(synapseReactor);
+ t.start();
+
+ /*// wait till the reactor is done
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }*/
}
@@ -200,13 +271,18 @@
public void run() {
HttpResponse newResponse = request.createResponse();
try {
- Util.copyStreams(response.getInputStream(),
newResponse.getOutputStream());
newResponse.setStatus(ResponseStatus.OK);
newResponse.addHeader(Constants.CONTENT_TYPE,
Constants.TEXT_XML);
+ newResponse.addHeader(Constants.TRANSFER_ENCODING,
Constants.CHUNKED);
+ newResponse.setConnectionClose();
+
+ newResponse.commit();
+
+ Util.copyStreams(response.getInputStream(),
newResponse.getOutputStream());
+
} catch (IOException e) {
e.printStackTrace();
}
- newResponse.commit();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]