Author: rajith
Date: Fri Nov 30 08:16:14 2007
New Revision: 599856
URL: http://svn.apache.org/viewvc?rev=599856&view=rev
Log:
Added a very basic plain nio transport. You could flip between the transports
using -Dtransport="nio". By default it's the MINA transport.
You can also turn on batching for the nio transport by using -Dbatch="true".
This option has no effect on MINA.
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
Fri Nov 30 08:16:14 2007
@@ -103,7 +103,7 @@
origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration());
origMessage.setJMSMessageID(message.getJMSMessageID());
origMessage.setJMSDeliveryMode(deliveryMode);
-
+
BasicContentHeaderProperties contentHeaderProperties =
message.getContentHeaderProperties();
if (contentHeaderProperties.reset())
{
@@ -159,6 +159,7 @@
catch (RuntimeException rte)
{
JMSException ex = new JMSException("Exception when sending
message");
+ rte.printStackTrace();
ex.setLinkedException(rte);
throw ex;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
Fri Nov 30 08:16:14 2007
@@ -19,6 +19,7 @@
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.SessionDelegate;
import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpidity.transport.network.nio.NioHandler;
public class Client implements org.apache.qpidity.nclient.Connection
@@ -72,7 +73,16 @@
connectionDelegate.setPassword(password);
connectionDelegate.setVirtualHost(virtualHost);
- _conn = MinaHandler.connect(host, port,connectionDelegate);
+ if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
+ {
+ System.out.println("using NIO");
+ _conn = NioHandler.connect(host, port,connectionDelegate);
+ }
+ else
+ {
+ System.out.println("using MINA");
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+ }
// XXX: hardcoded version numbers
_conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
@@ -119,6 +129,11 @@
ClientSession ssn = new ClientSession();
ssn.attach(ch);
ssn.sessionOpen(expiryInSeconds);
+ if (Boolean.getBoolean("batch") &&
System.getProperty("transport").equalsIgnoreCase("nio"))
+ {
+ System.out.println("using batching");
+ NioHandler.startBatchingFrames(_conn.getConnectionId());
+ }
return ssn;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
Fri Nov 30 08:16:14 2007
@@ -21,7 +21,7 @@
{
static
{
- String max = "message_size_before_sync";
+ String max = "message_size_before_sync"; // KB's
try
{
MAX_NOT_SYNC_DATA_LENGH = new
Long(System.getProperties().getProperty(max, "200000000"));
@@ -132,7 +132,7 @@
public void endData()
{
super.endData();
- if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >=
MAX_NOT_SYNC_DATA_LENGH)
+ /* if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >=
MAX_NOT_SYNC_DATA_LENGH)
{
sync();
}
@@ -140,7 +140,7 @@
{
executionFlush();
_currentDataSizeNotFlushed = 0;
- }
+ }*/
}
public RangeSet getAccquiredMessages()
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
Fri Nov 30 08:16:14 2007
@@ -48,6 +48,8 @@
final private Sender<ConnectionEvent> sender;
final private ConnectionDelegate delegate;
+ // want to make this final
+ private int _connectionId;
final private Map<Integer,Channel> channels = new
HashMap<Integer,Channel>();
@@ -56,6 +58,16 @@
{
this.sender = sender;
this.delegate = delegate;
+ }
+
+ public void setConnectionId(int id)
+ {
+ _connectionId = id;
+ }
+
+ public int getConnectionId()
+ {
+ return _connectionId;
}
public ConnectionDelegate getConnectionDelegate()
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java?rev=599856&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
Fri Nov 30 08:16:14 2007
@@ -0,0 +1,118 @@
+package org.apache.qpidity.transport.network.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+
+public class NioHandler implements Runnable
+{
+ private Receiver<ByteBuffer> _receiver;
+ private SocketChannel _ch;
+ private ByteBuffer _readBuf;
+ private static Map<Integer,NioSender> _handlers = new
ConcurrentHashMap<Integer,NioSender>();
+ private AtomicInteger _count = new AtomicInteger();
+
+ private NioHandler(){}
+
+ public static final Connection connect(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ NioHandler handler = new NioHandler();
+ return handler.connectInternal(host,port,delegate);
+ }
+
+ private Connection connectInternal(String host, int port,
+ ConnectionDelegate delegate)
+ {
+ try
+ {
+ SocketAddress address = new InetSocketAddress(host,port);
+ _ch = SocketChannel.open();
+ _ch.socket().setReuseAddress(true);
+ _ch.configureBlocking(true);
+ _ch.socket().setTcpNoDelay(true);
+ if (address != null)
+ {
+ _ch.socket().connect(address);
+ }
+ while (_ch.isConnectionPending())
+ {
+
+ }
+
+ }
+ catch (SocketException e)
+ {
+
+ e.printStackTrace();
+ }
+ catch (IOException e)
+ {
+ e.printStackTrace();
+ }
+
+ NioSender sender = new NioSender(_ch);
+ Connection con = new Connection
+ (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+ delegate);
+
+ con.setConnectionId(_count.incrementAndGet());
+ _handlers.put(con.getConnectionId(),sender);
+
+ _receiver = new InputHandler(new Assembler(con),
InputHandler.State.FRAME_HDR);
+
+ Thread t = new Thread(this);
+ t.start();
+
+ return con;
+ }
+
+ public void run()
+ {
+ _readBuf = ByteBuffer.allocate(1024);
+ long read = 0;
+ while(_ch.isConnected() && _ch.isOpen())
+ {
+ try
+ {
+ read = _ch.read(_readBuf);
+ if (read > 0)
+ {
+ ByteBuffer b = _readBuf;
+ b.flip();
+ _receiver.received(b);
+ _readBuf.clear();
+ }
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+ //throw new EOFException("The underlying socket/channel has closed");
+ }
+
+ public static void startBatchingFrames(int connectionId)
+ {
+ NioSender sender = _handlers.get(connectionId);
+ sender.setStartBatching();
+ }
+
+
+}
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java?rev=599856&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
Fri Nov 30 08:16:14 2007
@@ -0,0 +1,95 @@
+package org.apache.qpidity.transport.network.nio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.transport.Sender;
+
+public class NioSender implements Sender<java.nio.ByteBuffer>
+{
+ private final Object lock = new Object();
+ private SocketChannel _ch;
+ private boolean _batch = false;
+ private ByteBuffer _batcher;
+
+ public NioSender(SocketChannel ch)
+ {
+ this._ch = ch;
+ }
+
+ public void send(java.nio.ByteBuffer buf)
+ {
+ if (_batch)
+ {
+ //System.out.println(_batcher.position() + " , " +
buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
+ if (_batcher.position() + buf.remaining() >= _batcher.capacity())
+ {
+ _batcher.flip();
+ write(_batcher);
+ _batcher.clear();
+ if (buf.remaining() > _batcher.capacity())
+ {
+ write(buf);
+ }
+ else
+ {
+ _batcher.put(buf);
+ }
+ }
+ else
+ {
+ _batcher.put(buf);
+ }
+ }
+ else
+ {
+ write(buf);
+ }
+ }
+
+ private void write(java.nio.ByteBuffer buf)
+ {
+ synchronized (lock)
+ {
+ if( _ch.isConnected() && _ch.isOpen())
+ {
+ try
+ {
+ _ch.write(buf);
+ }
+ catch(Exception e)
+ {
+ e.fillInStackTrace();
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Trying to write on a closed
socket");
+ }
+
+ }
+ }
+
+ public void setStartBatching()
+ {
+ _batch = true;
+ _batcher = ByteBuffer.allocate(1024);
+ }
+
+ public void close()
+ {
+ // MINA will sometimes throw away in-progress writes when you
+ // ask it to close
+ synchronized (lock)
+ {
+ try
+ {
+ _ch.close();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+}