Author: rajith
Date: Fri Nov 30 08:16:14 2007
New Revision: 599856

URL: http://svn.apache.org/viewvc?rev=599856&view=rev
Log:
Added a very basic plain nio transport. You could flip between the transports 
using -Dtransport="nio". By default it's the MINA transport.
You can also turn on batching for the nio transport by using -Dbatch="true". 
This option has no effect on MINA.

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
 Fri Nov 30 08:16:14 2007
@@ -103,7 +103,7 @@
         
origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration());
         origMessage.setJMSMessageID(message.getJMSMessageID());
         origMessage.setJMSDeliveryMode(deliveryMode);
-        
+
         BasicContentHeaderProperties contentHeaderProperties = 
message.getContentHeaderProperties();
         if (contentHeaderProperties.reset())
         {
@@ -159,6 +159,7 @@
         catch (RuntimeException rte)
         {
             JMSException ex = new JMSException("Exception when sending 
message");
+            rte.printStackTrace();
             ex.setLinkedException(rte);
             throw ex;
         }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 Fri Nov 30 08:16:14 2007
@@ -19,6 +19,7 @@
 import org.apache.qpidity.transport.ProtocolHeader;
 import org.apache.qpidity.transport.SessionDelegate;
 import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpidity.transport.network.nio.NioHandler;
 
 
 public class Client implements org.apache.qpidity.nclient.Connection
@@ -72,7 +73,16 @@
         connectionDelegate.setPassword(password);
         connectionDelegate.setVirtualHost(virtualHost);
 
-        _conn = MinaHandler.connect(host, port,connectionDelegate);
+        if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
+        {
+            System.out.println("using NIO");
+            _conn = NioHandler.connect(host, port,connectionDelegate);
+        }
+        else
+        {
+            System.out.println("using MINA");
+            _conn = MinaHandler.connect(host, port,connectionDelegate);
+        }
 
         // XXX: hardcoded version numbers
         _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
@@ -119,6 +129,11 @@
         ClientSession ssn = new ClientSession();
         ssn.attach(ch);
         ssn.sessionOpen(expiryInSeconds);
+        if (Boolean.getBoolean("batch") && 
System.getProperty("transport").equalsIgnoreCase("nio"))
+        {
+            System.out.println("using batching");
+            NioHandler.startBatchingFrames(_conn.getConnectionId());
+        }
         return ssn;
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
 Fri Nov 30 08:16:14 2007
@@ -21,7 +21,7 @@
 {
     static
     {
-        String max = "message_size_before_sync";
+            String max = "message_size_before_sync"; // KB's
             try
             {
                 MAX_NOT_SYNC_DATA_LENGH = new 
Long(System.getProperties().getProperty(max, "200000000"));
@@ -132,7 +132,7 @@
     public void endData()
     {
         super.endData();
-        if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= 
MAX_NOT_SYNC_DATA_LENGH)
+    /*    if( MAX_NOT_SYNC_DATA_LENGH != -1 && _currentDataSizeNotSynced >= 
MAX_NOT_SYNC_DATA_LENGH)
         {
             sync();
         }
@@ -140,7 +140,7 @@
         {
            executionFlush();
             _currentDataSizeNotFlushed = 0;
-        }
+        }*/
     }
 
     public RangeSet getAccquiredMessages()

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java?rev=599856&r1=599855&r2=599856&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Connection.java
 Fri Nov 30 08:16:14 2007
@@ -48,6 +48,8 @@
 
     final private Sender<ConnectionEvent> sender;
     final private ConnectionDelegate delegate;
+    // want to make this final
+    private int _connectionId;
 
     final private Map<Integer,Channel> channels = new 
HashMap<Integer,Channel>();
 
@@ -56,6 +58,16 @@
     {
         this.sender = sender;
         this.delegate = delegate;
+    }
+
+    public void setConnectionId(int id)
+    {
+        _connectionId = id;
+    }
+
+    public int getConnectionId()
+    {
+        return _connectionId;
     }
 
     public ConnectionDelegate getConnectionDelegate()

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java?rev=599856&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioHandler.java
 Fri Nov 30 08:16:14 2007
@@ -0,0 +1,118 @@
+package org.apache.qpidity.transport.network.nio;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+
+public class NioHandler implements Runnable
+{
+    private Receiver<ByteBuffer> _receiver;
+    private SocketChannel _ch;
+    private ByteBuffer _readBuf;
+    private static Map<Integer,NioSender> _handlers = new 
ConcurrentHashMap<Integer,NioSender>();
+    private AtomicInteger _count = new AtomicInteger();
+
+    private NioHandler(){}
+
+    public static final Connection connect(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        NioHandler handler = new NioHandler();
+        return handler.connectInternal(host,port,delegate);
+    }
+
+    private Connection connectInternal(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        try
+        {
+            SocketAddress address = new InetSocketAddress(host,port);
+            _ch = SocketChannel.open();
+            _ch.socket().setReuseAddress(true);
+            _ch.configureBlocking(true);
+            _ch.socket().setTcpNoDelay(true);
+            if (address != null)
+            {
+                _ch.socket().connect(address);
+            }
+            while (_ch.isConnectionPending())
+            {
+
+            }
+
+        }
+        catch (SocketException e)
+        {
+
+            e.printStackTrace();
+        }
+        catch (IOException e)
+        {
+            e.printStackTrace();
+        }
+
+        NioSender sender = new NioSender(_ch);
+        Connection con = new Connection
+            (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+             delegate);
+
+        con.setConnectionId(_count.incrementAndGet());
+        _handlers.put(con.getConnectionId(),sender);
+
+        _receiver = new InputHandler(new Assembler(con), 
InputHandler.State.FRAME_HDR);
+
+        Thread t = new Thread(this);
+        t.start();
+
+        return con;
+    }
+
+    public void run()
+    {
+        _readBuf = ByteBuffer.allocate(1024);
+        long read = 0;
+        while(_ch.isConnected() && _ch.isOpen())
+        {
+            try
+            {
+                read = _ch.read(_readBuf);
+                if (read > 0)
+                {
+                    ByteBuffer b = _readBuf;
+                    b.flip();
+                    _receiver.received(b);
+                    _readBuf.clear();
+                }
+            }
+            catch(Exception e)
+            {
+                e.printStackTrace();
+            }
+        }
+
+        //throw new EOFException("The underlying socket/channel has closed");
+    }
+
+    public static void startBatchingFrames(int connectionId)
+    {
+        NioSender sender = _handlers.get(connectionId);
+        sender.setStartBatching();
+    }
+
+
+}

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java?rev=599856&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/nio/NioSender.java
 Fri Nov 30 08:16:14 2007
@@ -0,0 +1,95 @@
+package org.apache.qpidity.transport.network.nio;
+
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.transport.Sender;
+
+public class NioSender implements Sender<java.nio.ByteBuffer>
+{
+    private final Object lock = new Object();
+    private SocketChannel _ch;
+    private boolean _batch =  false;
+    private ByteBuffer _batcher;
+
+    public NioSender(SocketChannel ch)
+    {
+        this._ch = ch;
+    }
+
+    public void send(java.nio.ByteBuffer buf)
+    {
+        if (_batch)
+        {
+            //System.out.println(_batcher.position() + " , " +  
buf.remaining() + " , " + buf.position() + ","+_batcher.capacity());
+            if (_batcher.position() + buf.remaining() >= _batcher.capacity())
+            {
+                _batcher.flip();
+                write(_batcher);
+                _batcher.clear();
+                if (buf.remaining() > _batcher.capacity())
+                {
+                    write(buf);
+                }
+                else
+                {
+                    _batcher.put(buf);
+                }
+            }
+            else
+            {
+                _batcher.put(buf);
+            }
+        }
+        else
+        {
+            write(buf);
+        }
+    }
+
+    private void write(java.nio.ByteBuffer buf)
+    {
+        synchronized (lock)
+        {
+            if( _ch.isConnected() && _ch.isOpen())
+            {
+                try
+                {
+                    _ch.write(buf);
+                }
+                catch(Exception e)
+                {
+                    e.fillInStackTrace();
+                }
+            }
+            else
+            {
+                throw new RuntimeException("Trying to write on a closed 
socket");
+            }
+
+        }
+    }
+
+    public void setStartBatching()
+    {
+        _batch = true;
+        _batcher = ByteBuffer.allocate(1024);
+    }
+
+    public void close()
+    {
+        // MINA will sometimes throw away in-progress writes when you
+        // ask it to close
+        synchronized (lock)
+        {
+            try
+            {
+                _ch.close();
+            }
+            catch(Exception e)
+            {
+                e.printStackTrace();
+            }
+        }
+    }
+}


Reply via email to