Author: asankha
Date: Sun Oct 15 22:05:05 2006
New Revision: 464379
URL: http://svn.apache.org/viewvc?view=rev&rev=464379
Log:
prepare to take out SK and SC dependencies out from read and write handlers
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpMessage.java
Sun Oct 15 22:05:05 2006
@@ -38,6 +38,8 @@
*/
public abstract class HttpMessage {
+ private static final int DEFAULT_BUFFER_SIZE = 4096;
+
private static final Log log = LogFactory.getLog(HttpMessage.class);
/**
@@ -47,11 +49,7 @@
/**
* holder of the body content of this message
*/
- protected ByteBuffer buffer = ByteBuffer.allocate(4096);
- /**
- * position at the main buffer where the body starts (e.g. for requests)
- */
- protected int bodyStart;
+ protected ByteBuffer buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
/**
* A flag to detect if the getOutputStream() caller did not properly
close() the stream
@@ -106,7 +104,7 @@
*/
public InputStream getInputStream() {
// position to the start of the body
- buffer.position(bodyStart);
+ buffer.position(0);
// Returns an input stream for a ByteBuffer.
// The read() methods use the relative ByteBuffer get() methods.
@@ -135,7 +133,7 @@
// position for body
buffer.clear();
outputStreamOpened = true;
- buffer.position(bodyStart);
+ buffer.position(0);
// Returns an output stream for a ByteBuffer.
// The write() methods use the relative ByteBuffer put() methods.
@@ -180,19 +178,6 @@
}
/**
- * Set the given buffer and the start position within that buffer as the
- * body of this httpMessage
- *
- * @param buffer an externally allocated [and populated] buffer containing
the message body
- * @param bodyStart the start position of the actual body content within
the buffer (default 0)
- */
- public void setBuffer(ByteBuffer buffer, int bodyStart) {
- log.debug("HttpMessage.setBuffer() - buffer : " + buffer + "
bodyStart: " + bodyStart);
- this.buffer = buffer;
- this.bodyStart = bodyStart;
- }
-
- /**
* Return a string representation of the message in HTTP wire-format
* @return a String representation of the message in HTTP wire-format
*/
@@ -213,16 +198,12 @@
sb.append(Constants.CRLF);
if (buffer.limit() > 0) {
- buffer.position(bodyStart);
- ByteBuffer bodyBuf = buffer;
- if (bodyStart > 0) {
- bodyBuf = buffer.slice();
- }
+ buffer.position(0);
Charset set = Charset.forName("us-ascii");
CharsetDecoder dec = set.newDecoder();
try {
- sb.append(dec.decode(bodyBuf));
+ sb.append(dec.decode(buffer));
} catch (CharacterCodingException e) {
e.printStackTrace();
}
@@ -240,4 +221,31 @@
* @return the first line of text for the toString()
*/
public abstract String toStringLine();
+
+ /**
+ * Reset the internal state of this message to be reused
+ */
+ public void reset() {
+ if (buffer.capacity() > DEFAULT_BUFFER_SIZE) {
+ buffer = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
+ } else {
+ buffer.clear();
+ }
+ }
+
+ /**
+ * Return a reference to the internal ByteBuffer of this message
+ * @return the reference to the internal ByteBuffer used
+ */
+ public ByteBuffer getBuffer() {
+ return buffer;
+ }
+
+ /**
+ * Set the internal buffer to the given ByteBuffer
+ * @param buffer
+ */
+ public void setBuffer(ByteBuffer buffer) {
+ this.buffer = buffer;
+ }
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpRequest.java
Sun Oct 15 22:05:05 2006
@@ -165,7 +165,7 @@
* Return a ByteBuffer representation of this message in HTTP wire-format
* @return the ByteBuffer representation of this message
*/
- public ByteBuffer getBuffer() {
+ public ByteBuffer getWireBuffer() {
return ByteBuffer.wrap(toString().getBytes());
}
@@ -186,7 +186,7 @@
* Causes the request to contain an empty body (i.e. for a GET etc)
*/
public void setEmptyBody() {
- buffer.position(bodyStart);
+ buffer.position(0);
buffer.flip();
}
@@ -196,7 +196,7 @@
* @param body
*/
public void setBody(String body) {
- buffer.position(bodyStart);
+ buffer.position(0);
buffer.put(body.getBytes());
buffer.flip();
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/HttpResponse.java
Sun Oct 15 22:05:05 2006
@@ -101,7 +101,7 @@
* Return a ByteBuffer representation of this message in HTTP wire-format
for transmission
* @return the ByteBuffer representation of this message
*/
- public ByteBuffer getBuffer() {
+ public ByteBuffer getWireBuffer() {
if (buffer.limit() > 0) {
headers.put(Constants.CONTENT_LENGTH,
Integer.toString(buffer.limit()));
}
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/IncomingHandler.java
Sun Oct 15 22:05:05 2006
@@ -41,8 +41,8 @@
private SelectionKey sk;
private SocketChannel socket;
- private ReadHandler incomingHandler = new ReadHandler(true);
- private WriteHandler responseHandler = new WriteHandler();
+ private ReadHandler readHandler = new ReadHandler(true);
+ private WriteHandler writeHandler = new WriteHandler();
private HttpService httpService;
@@ -60,7 +60,7 @@
}
public void setResponse(HttpResponse response) {
- responseHandler.setMessage(response.getBuffer(),
response.isConnectionClose());
+ writeHandler.setMessage(response.getWireBuffer(),
response.isConnectionClose());
sk.interestOps(SelectionKey.OP_WRITE);
sk.selector().wakeup();
log.debug("\tIncomingHandler.setResponse()");
@@ -73,27 +73,27 @@
if (sk.isReadable()) {
log.debug("\tIncomingHandler run() - READABLE");
- if (incomingHandler.handle(socket, sk)) {
+ if (readHandler.handle(socket, sk)) {
log.debug("\tA httpMessage has been read completely");
// if httpMessage processing is complete
- HttpRequest request = (HttpRequest)
incomingHandler.getHttpMessage();
+ HttpRequest request = (HttpRequest)
readHandler.getHttpMessage();
request.setHandler(this);
log.debug("\tFire event for received httpMessage");
httpService.handleRequest(request);
// if pipelining is used
- if (!incomingHandler.isConnectionClose()) {
+ if (!readHandler.isConnectionClose()) {
// prepare to read another httpMessage - reset and reuse
- incomingHandler.reset();
+ readHandler.reset();
log.debug("\tReset read handler to read next pipelined
httpMessage");
}
}
} else if (sk.isWritable()) {
log.debug("\tIncomingHandler run() - WRITEABLE");
- if (responseHandler.handle(socket)) {
+ if (writeHandler.handle(socket)) {
log.debug("\tThe response has been written completely");
// response has been written completely
- if (responseHandler.isConnectionClose()) {
+ if (writeHandler.isConnectionClose()) {
log.debug("\tClosing connection normally");
sk.cancel();
try {
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/OutgoingHandler.java
Sun Oct 15 22:05:05 2006
@@ -42,7 +42,11 @@
this.httpService = httpService;
this.socket = socket;
this.sk = sk;
- writeHandler.setMessage(request.getBuffer(), true /* connection close
*/);
+ request.getWireBuffer().position(0);
+ if (!request.isChunked()) {
+ request.addHeader(Constants.CONTENT_LENGTH,
Integer.toString(request.getBuffer().limit()));
+ }
+ writeHandler.setMessage(request.getWireBuffer(), true /* connection
close */);
}
public Runnable getCallback() {
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReactorTester.java
Sun Oct 15 22:05:05 2006
@@ -24,15 +24,15 @@
public static void main(String[] args) throws Exception {
ReactorTester rt = new ReactorTester();
- //rt.runDemo();
- rt.simpleGet();
+ rt.runDemo();
+ //rt.simpleGet();
}
private void simpleGet() throws IOException {
HttpRequest request = new HttpRequest(
- new URL("https://localhost:8443/"));
+ new URL("http://localhost:8080/"));
request.setMethod(Constants.GET);
- request.addHeader("Host", "127.0.0.1:8443");
+ request.addHeader("Host", "127.0.0.1:8080");
request.setEmptyBody();
request.setSecure(true);
request.setConnectionClose();
@@ -59,7 +59,6 @@
public void handleRequest(HttpRequest request) {
try {
- System.out.println("Processing Request : " + request);
// create new HttpRequest
HttpRequest forwardReq = new HttpRequest(
new
URL("http://localhost:9000/axis2/services/SimpleStockQuoteService"));
Modified:
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
URL:
http://svn.apache.org/viewvc/incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java?view=diff&rev=464379&r1=464378&r2=464379
==============================================================================
---
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
(original)
+++
incubator/synapse/branches/NIO/modules/niohttp/src/org/apache/axis2/transport/niohttp/impl/ReadHandler.java
Sun Oct 15 22:05:05 2006
@@ -52,9 +52,8 @@
CharsetDecoder asciiDecoder = Charset.forName("us-ascii").newDecoder();
- ByteBuffer buffer = ByteBuffer.allocate(4096);
- ByteBuffer chunkedBuffer = ByteBuffer.allocate(4096);
HttpMessage httpMessage;
+ ByteBuffer buffer;
// where should new bytes read from the incoming channel be stored
int readPos;
@@ -63,7 +62,6 @@
// holders for parsed data
String curHeaderName = null;
StringBuffer curHeaderValue = new StringBuffer();
- int bodyStart;
int contentLength;
int currentChunkRemainder;
boolean lastChunkReceived = false;
@@ -76,8 +74,8 @@
private boolean messageComplete = false;
public void reset() {
- buffer.clear();
- chunkedBuffer.clear();
+ httpMessage.reset();
+ buffer = httpMessage.getBuffer();
if (requestMode) {
httpMessage = new HttpRequest();
} else {
@@ -87,7 +85,6 @@
processPos = 0;
curHeaderName = null;
curHeaderValue = new StringBuffer();
- bodyStart = 0;
contentLength = 0;
currentChunkRemainder = 0;
lastChunkReceived = false;
@@ -104,6 +101,7 @@
} else {
httpMessage = new HttpResponse();
}
+ buffer = httpMessage.getBuffer();
}
public boolean handle(SocketChannel socket, SelectionKey sk) {
@@ -386,9 +384,15 @@
debug("\t\t\theaders parsed");
parsingHeader = false;
- // prepare to parse body
- bodyStart = processPos;
- debug("\t\t\tparsed headers. begin parsing body to buffer
position:" + bodyStart);
+ buffer.position(processPos);
+ ByteBuffer body = buffer.slice();
+ buffer.position(0);
+ buffer.put(body);
+
+ readPos -= processPos;
+ processPos = 0;
+
+ debug("\t\t\tparsed headers. begin parsing body to buffer");
if (httpMessage.isChunked()) {
parsingChunks = true;
@@ -402,36 +406,20 @@
return true;
}
- private String parseHeaderName(ByteBuffer buf) {
- return readToColon(buf);
- }
-
- private String parseHeaderValue(ByteBuffer buf) {
- int firstChar;
- do {
- String value = readToCRLF(buf);
- if (value != null) {
- curHeaderValue.append(value);
- }
- firstChar = buf.get(buf.position());
- } while (firstChar == Constants.SP || firstChar == Constants.HT);
- return curHeaderValue.toString();
- }
-
private boolean parseNextChunk() {
debug("\t\t\tparseNextChunk(currentChunkRemainder: " +
currentChunkRemainder +
" processPos: " + processPos + " readPos: " + readPos);
- if (currentChunkRemainder > 0) {
- // now start processing from where we left off until we reach the
end
- buffer.position(processPos);
- byte b;
- while (currentChunkRemainder > 0 && buffer.position() < readPos) {
- b = buffer.get();
- chunkedBuffer.put(b);
- processPos++;
- currentChunkRemainder--;
+ if (currentChunkRemainder > 0) {
+ // have we read the full chunk?
+ if (readPos > processPos + currentChunkRemainder) {
+ processPos += currentChunkRemainder;
+ currentChunkRemainder = 0;
+ } else {
+ currentChunkRemainder -= (readPos - processPos);
+ processPos = readPos;
}
+ buffer.position(processPos);
if (currentChunkRemainder == 0) {
// read to end of data CRLF and discard trailing CRLF
@@ -440,6 +428,9 @@
}
}
if (currentChunkRemainder == 0) {
+ // save the position we are at
+ int chunkHeadStart = processPos;
+
// is there another chunk?
String chunkHead = readToCRLF(buffer);
debug("\t\t\treading chunkHead : " + chunkHead);
@@ -456,11 +447,20 @@
return true;
}
+ // we need to discard the chunk head from our buffer now
+ buffer.position(processPos);
+ ByteBuffer remainder = buffer.slice();
+ buffer.position(chunkHeadStart);
+ buffer.put(remainder);
+ readPos -= (processPos - chunkHeadStart);
+ processPos = chunkHeadStart;
+
+ //System.out.println(Util.dumpAsHex(buffer.array(),
buffer.limit()));
+
// did we encounter the "0" chunk?
if (currentChunkRemainder == 0) {
debug("\t\t\tall chunks received");
- chunkedBuffer.flip();
// read upto end of next CRLF
String footer;
@@ -496,25 +496,21 @@
}
if (lastChunkReceived && messageComplete) {
- // copy chunked body to main buffer, to start at the bodyStart
position
- buffer.position(bodyStart);
- chunkedBuffer.position(0);
- buffer.put(chunkedBuffer);
buffer.flip();
- httpMessage.setBuffer(buffer, bodyStart);
+ httpMessage.setBuffer(buffer);
+ debug("\t\t\tfinish reading. body ends at : " + processPos + "
in buffer : " + buffer);
}
} else {
- if (readPos >= bodyStart + contentLength) {
+ if (readPos >= contentLength) {
// do we have the whole body in our buffer?
processPos = readPos;
buffer.position(processPos);
buffer.flip();
+ httpMessage.setBuffer(buffer);
- debug("\t\t\tfinish reading. body starts from: " +
- bodyStart + " and ends: " + processPos + " in buffer : " +
buffer);
- httpMessage.setBuffer(buffer, bodyStart);
+ debug("\t\t\tfinish reading. body ends at : " + processPos + "
in buffer : " + buffer);
messageComplete = true;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]