Author: rajith
Date: Thu May 29 20:25:24 2008
New Revision: 661561

URL: http://svn.apache.org/viewvc?rev=661561&view=rev
Log:
This check in is for QPID-1102.
IoHandler and IoSender uses the java.io classes for IO operations and have 
shown very good improvement in latency and memory usage over MINA.
For certain tests with pub/sub it gives a 2X improvement in throughput.


Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=661561&r1=661560&r2=661561&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 Thu May 29 20:25:24 2008
@@ -1,3 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 package org.apache.qpidity.nclient;
 
 import java.util.List;
@@ -25,6 +44,7 @@
 import org.apache.qpidity.transport.TransportConstants;
 import org.apache.qpidity.transport.ProtocolHeader;
 import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.io.IoHandler;
 import org.apache.qpidity.transport.network.mina.MinaHandler;
 import org.apache.qpidity.transport.network.nio.NioHandler;
 import org.slf4j.Logger;
@@ -52,6 +72,7 @@
 
     public void connect(String host, int port,String virtualHost,String 
username, String password) throws QpidException
     {
+
         final Condition negotiationComplete = _lock.newCondition();
         closeOk = _lock.newCondition();
         _lock.lock();
@@ -122,7 +143,7 @@
             @Override public void init(Channel ch, ProtocolHeader hdr)
             {
                 // TODO: once the merge is done we'll need to update this code
-                // for handling 0.8 protocol version type i.e. major=8 and 
minor=0 :( 
+                // for handling 0.8 protocol version type i.e. major=8 and 
minor=0 :(
                 if (hdr.getMajor() != TransportConstants.getVersionMajor()
                         || hdr.getMinor() != 
TransportConstants.getVersionMinor())
                 {
@@ -148,19 +169,18 @@
         connectionDelegate.setVirtualHost(virtualHost);
 
         if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
-        {            
-            if( _logger.isDebugEnabled())
-            {
-                _logger.debug("using NIO");
-            }
+        {
+            _logger.info("using NIO Transport");
             _conn = NioHandler.connect(host, port,connectionDelegate);
         }
+        else if (System.getProperty("transport","mina").equalsIgnoreCase("io"))
+        {
+            _logger.info("using Plain IO Transport");
+            _conn = IoHandler.connect(host, port,connectionDelegate);
+        }
         else
         {
-            if( _logger.isDebugEnabled())
-            {
-                _logger.debug("using MINA");
-            }
+            _logger.info("using MINA Transport");
             _conn = MinaHandler.connect(host, port,connectionDelegate);
            // _conn = NativeHandler.connect(host, port,connectionDelegate);
         }
@@ -260,10 +280,19 @@
         ssn.attach(ch);
         ssn.sessionAttach(ssn.getName());
         ssn.sessionRequestTimeout(expiryInSeconds);
-        if (Boolean.getBoolean("batch") && 
System.getProperty("transport").equalsIgnoreCase("nio"))
+        String transport = System.getProperty("transport","mina");
+
+        try
+        {
+            if (Boolean.getBoolean("batch") && 
("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport)))
+            {
+                _logger.debug("using batch mode in transport " + transport);
+                IoHandler.startBatchingFrames(_conn.getConnectionId());
+            }
+        }
+        catch(Exception e)
         {
-            System.out.println("using batching");
-            NioHandler.startBatchingFrames(_conn.getConnectionId());
+            e.printStackTrace();
         }
         return ssn;
     }

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java?rev=661561&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
 Thu May 29 20:25:24 2008
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpidity.transport.util.Logger;
+
+/**
+ * This class provides a synchronous IO implementation using
+ * the java.io classes. The IoHandler runs in its own thread.
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF    - amqj.receiveBufferSize
+ * SO_SNDBUF    - amqj.sendBufferSize
+ */
+public class IoHandler implements Runnable
+{
+    private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+
+    private Receiver<ByteBuffer> _receiver;
+    private Socket _socket;
+    private byte[] _readBuf;
+    private static Map<Integer,IoSender> _handlers = new 
ConcurrentHashMap<Integer,IoSender>();
+    private AtomicInteger _count = new AtomicInteger();
+    private int _readBufferSize;
+    private int _writeBufferSize;
+
+    private static final Logger log = Logger.get(IoHandler.class);
+
+    private IoHandler()
+    {
+        _readBufferSize = 
Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+        _writeBufferSize = 
Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+    }
+
+    public static final Connection connect(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        IoHandler handler = new IoHandler();
+        return handler.connectInternal(host,port,delegate);
+    }
+
+    private Connection connectInternal(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        try
+        {
+            InetAddress address = InetAddress.getByName(host);
+            _socket = new Socket();
+            _socket.setReuseAddress(true);
+            _socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+            log.debug("default-SO_RCVBUF : " + _socket.getReceiveBufferSize());
+            log.debug("default-SO_SNDBUF : " + _socket.getSendBufferSize());
+
+            _socket.setSendBufferSize(_writeBufferSize);
+            _socket.setReceiveBufferSize(_readBufferSize);
+
+            log.debug("new-SO_RCVBUF : " + _socket.getReceiveBufferSize());
+            log.debug("new-SO_SNDBUF : " + _socket.getSendBufferSize());
+
+            if (address != null)
+            {
+                _socket.connect(new InetSocketAddress(address, port));
+            }
+            while (!_socket.isConnected())
+            {
+
+            }
+
+        }
+        catch (SocketException e)
+        {
+            log.error(e,"Error connecting to broker");
+        }
+        catch (IOException e)
+        {
+            log.error(e,"Error connecting to broker");
+        }
+
+        IoSender sender = new IoSender(_socket);
+        Connection con = new Connection
+            (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+             delegate);
+
+        con.setConnectionId(_count.incrementAndGet());
+        _handlers.put(con.getConnectionId(),sender);
+
+        _receiver = new InputHandler(new Assembler(con), 
InputHandler.State.PROTO_HDR);
+
+        Thread t = new Thread(this);
+        t.setName("IO Handler Thread-" + _count.get());
+        t.start();
+
+        return con;
+    }
+
+    public void run()
+    {
+        // I set the read buffer size simillar to SO_RCVBUF
+        // Haven't tested with a lower value to see its better or worse
+        _readBuf = new byte[_readBufferSize];
+        try
+        {
+            InputStream in = _socket.getInputStream();
+            int read = 0;
+            while(_socket.isConnected())
+            {
+                try
+                {
+                    read = in.read(_readBuf);
+                    if (read > 0)
+                    {
+                        ByteBuffer b = ByteBuffer.allocate(read);
+                        b.put(_readBuf,0,read);
+                        b.flip();
+                        //byte[] temp = new byte[read];
+                        //System.arraycopy(_readBuf, 0,temp, 0, read);
+                        //ByteBuffer b = ByteBuffer.wrap(temp);
+                        _receiver.received(b);
+                    }
+                }
+                catch(Exception e)
+                {
+                    throw new RuntimeException("Error reading from socket 
input stream",e);
+                }
+            }
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException("Error getting input stream from the 
socket",e);
+        }
+        finally
+        {
+            try
+            {
+                _socket.close();
+            }
+            catch(Exception e)
+            {
+                log.error(e,"Error closing socket");
+            }
+        }
+    }
+
+    public static void startBatchingFrames(int connectionId)
+    {
+        IoSender sender = _handlers.get(connectionId);
+        sender.setStartBatching();
+    }
+
+
+}

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java?rev=661561&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
 Thu May 29 20:25:24 2008
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Sender;
+import org.apache.qpidity.transport.util.Logger;
+
+public class IoSender implements Sender<java.nio.ByteBuffer>
+{
+    private final Object lock = new Object();
+    private Socket _socket;
+    private OutputStream _outStream;
+    private boolean _batch =  false;
+    private ByteBuffer _buffer;
+
+    private static final Logger log = Logger.get(IoHandler.class);
+
+    public IoSender(Socket socket)
+    {
+        this._socket = socket;
+        try
+        {
+            _outStream = _socket.getOutputStream();
+        }
+        catch(IOException e)
+        {
+            throw new RuntimeException("Error getting output stream for 
socket",e);
+        }
+    }
+
+    /*
+     * Currently I don't implement any in memory buffering
+     * and just write straight to the wire.
+     * I want to experiment with buffering and see if I can
+     * get more performance, all though latency will suffer
+     * a bit.
+     */
+    public void send(java.nio.ByteBuffer buf)
+    {
+        write(buf);
+    }
+
+    /* The extra copying sucks.
+     * If I know for sure that the buf is backed
+     * by an array then I could do buf.array()
+     */
+    private void write(java.nio.ByteBuffer buf)
+    {
+        byte[] array = new byte[buf.remaining()];
+        buf.get(array);
+        if( _socket.isConnected())
+        {
+            synchronized (lock)
+            {
+                try
+                {
+                    _outStream.write(array);
+                }
+                catch(Exception e)
+                {
+                    e.fillInStackTrace();
+                    throw new RuntimeException("Error trying to write to the 
socket",e);
+                }
+            }
+        }
+        else
+        {
+            throw new RuntimeException("Trying to write on a closed socket");
+        }
+    }
+
+    /*
+     * Haven't used this, but the intention is
+     * to experiment with it yet.
+     * Also need to make sure the buffer size
+     * is configurable
+     */
+    public void setStartBatching()
+    {
+        _batch = true;
+        try
+        {
+            _buffer = ByteBuffer.allocate(2048);
+        }
+        catch(Exception e)
+        {
+            throw new RuntimeException("Unable to set SO_SNDBUF due to socket 
error",e);
+        }
+    }
+
+    public void close()
+    {
+          synchronized (lock)
+        {
+            try
+            {
+                _socket.close();
+            }
+            catch(Exception e)
+            {
+                e.printStackTrace();
+            }
+        }
+    }
+}


Reply via email to