Author: bryanduxbury
Date: Fri Mar 26 05:12:27 2010
New Revision: 927695
URL: http://svn.apache.org/viewvc?rev=927695&view=rev
Log:
THRIFT-719. java: Update Nonblocking and HsHa server to avoid an extra buffer
copy
This patch causes Nonblocking and HsHa servers to explicitly enforce use of
TFramedTransport and make sure that the actual invoker is deserializing from a
TMemoryInputTransport. This should provide a substantial boost in performance.
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java?rev=927695&r1=927694&r2=927695&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/THsHaServer.java
Fri Mar 26 05:12:27 2010
@@ -112,8 +112,7 @@ public class THsHaServer extends TNonblo
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory,
Options options) {
- this(new TProcessorFactory(processor), serverTransport,
- new TFramedTransport.Factory(),
+ this(new TProcessorFactory(processor), serverTransport,
new TFramedTransport.Factory(),
protocolFactory, protocolFactory,
options);
@@ -142,7 +141,7 @@ public class THsHaServer extends TNonblo
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory, new Options());
}
@@ -156,7 +155,7 @@ public class THsHaServer extends TNonblo
TProtocolFactory protocolFactory,
Options options) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory,
options);
}
@@ -166,12 +165,11 @@ public class THsHaServer extends TNonblo
*/
public THsHaServer( TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
@@ -180,13 +178,12 @@ public class THsHaServer extends TNonblo
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory)
{
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory, new Options());
}
@@ -195,14 +192,13 @@ public class THsHaServer extends TNonblo
*/
public THsHaServer( TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options)
{
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
options);
Modified:
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
URL:
http://svn.apache.org/viewvc/incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java?rev=927695&r1=927694&r2=927695&view=diff
==============================================================================
---
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
(original)
+++
incubator/thrift/trunk/lib/java/src/org/apache/thrift/server/TNonblockingServer.java
Fri Mar 26 05:12:27 2010
@@ -20,7 +20,6 @@
package org.apache.thrift.server;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
@@ -30,9 +29,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import org.apache.thrift.TByteArrayOutputStream;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -42,10 +38,13 @@ import org.apache.thrift.protocol.TProto
import org.apache.thrift.protocol.TProtocolFactory;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TIOStreamTransport;
+import org.apache.thrift.transport.TMemoryInputTransport;
import org.apache.thrift.transport.TNonblockingServerTransport;
import org.apache.thrift.transport.TNonblockingTransport;
import org.apache.thrift.transport.TTransport;
import org.apache.thrift.transport.TTransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A nonblocking TServer implementation. This allows for fairness amongst all
@@ -100,7 +99,7 @@ public class TNonblockingServer extends
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport) {
this(processorFactory, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
new TBinaryProtocol.Factory(), new TBinaryProtocol.Factory());
}
@@ -108,7 +107,7 @@ public class TNonblockingServer extends
TNonblockingServerTransport serverTransport,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- new TFramedTransport.Factory(), new TFramedTransport.Factory(),
+ new TFramedTransport.Factory(),
protocolFactory, protocolFactory);
}
@@ -117,7 +116,7 @@ public class TNonblockingServer extends
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processor, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
@@ -126,42 +125,39 @@ public class TNonblockingServer extends
TFramedTransport.Factory transportFactory,
TProtocolFactory protocolFactory) {
this(processorFactory, serverTransport,
- transportFactory, transportFactory,
+ transportFactory,
protocolFactory, protocolFactory);
}
public TNonblockingServer(TProcessor processor,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(new TProcessorFactory(processor), serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory) {
this(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ outputTransportFactory,
inputProtocolFactory, outputProtocolFactory,
new Options());
}
public TNonblockingServer(TProcessorFactory processorFactory,
TNonblockingServerTransport serverTransport,
- TFramedTransport.Factory inputTransportFactory,
TFramedTransport.Factory outputTransportFactory,
TProtocolFactory inputProtocolFactory,
TProtocolFactory outputProtocolFactory,
Options options) {
super(processorFactory, serverTransport,
- inputTransportFactory, outputTransportFactory,
+ null, outputTransportFactory,
inputProtocolFactory, outputProtocolFactory);
options_ = options;
options_.validate();
@@ -522,7 +518,7 @@ public class TNonblockingServer extends
// if this frame will always be too large for this server, log the
// error and close the connection.
- if (frameSize + 4 > MAX_READ_BUFFER_BYTES) {
+ if (frameSize > MAX_READ_BUFFER_BYTES) {
LOGGER.error("Read a frame size of " + frameSize
+ ", which is bigger than the maximum allowable buffer size for
ALL connections.");
return false;
@@ -530,17 +526,15 @@ public class TNonblockingServer extends
// if this frame will push us over the memory limit, then return.
// with luck, more memory will free up the next time around.
- if (readBufferBytesAllocated + frameSize + 4 >
MAX_READ_BUFFER_BYTES) {
+ if (readBufferBytesAllocated + frameSize > MAX_READ_BUFFER_BYTES) {
return true;
}
// incremement the amount of memory allocated to read buffers
- readBufferBytesAllocated += frameSize + 4;
+ readBufferBytesAllocated += frameSize;
// reallocate the readbuffer as a frame-sized buffer
- buffer_ = ByteBuffer.allocate(frameSize + 4);
- // put the frame size at the head of the buffer
- buffer_.putInt(frameSize);
+ buffer_ = ByteBuffer.allocate(frameSize);
state_ = READING_FRAME;
} else {
@@ -699,8 +693,7 @@ public class TNonblockingServer extends
* the data it needs to handle an invocation.
*/
private TTransport getInputTransport() {
- return inputTransportFactory_.getTransport(new TIOStreamTransport(
- new ByteArrayInputStream(buffer_.array())));
+ return new TMemoryInputTransport(buffer_.array());
}
/**