Author: rhs
Date: Tue Jul 22 11:33:00 2008
New Revision: 678848

URL: http://svn.apache.org/viewvc?rev=678848&view=rev
Log:
Updated the io transport to use a separate write thread with a circular buffer 
that does opportunistic write batching. Fixed error handling and shutdown for 
the io transport. Switched default from mina to the io transport for the 0-10 
client. Modified InputHandler to accumulate bytes in the outer loop and 
simplified the state machine accordingly. These changes should address 
QPID-1188, prevent the Java client from running out of memory when writing 
messages faster than the network and/or broker can keep up, and in general 
improve performance.

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
   (with props)
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
   (with props)
Removed:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoHandler.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
    
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue Jul 22 11:33:00 2008
@@ -59,6 +59,7 @@
 import javax.naming.StringRefAddr;
 import java.io.IOException;
 import java.net.ConnectException;
+import java.net.UnknownHostException;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
 import java.util.*;
@@ -467,6 +468,7 @@
                 if (connectionException.getCause() != null)
                 {
                     message = connectionException.getCause().getMessage();
+                    connectionException.getCause().printStackTrace();
                 }
                 else
                 {
@@ -486,18 +488,19 @@
                 }
             }
 
-            AMQException e = new AMQConnectionFailureException(message, 
connectionException);
-
-            if (connectionException != null)
+            for (Throwable th = connectionException; th != null; th = 
th.getCause())
             {
-                if (connectionException instanceof UnresolvedAddressException)
+                if (th instanceof UnresolvedAddressException ||
+                    th instanceof UnknownHostException)
                 {
-                    e = new AMQUnresolvedAddressException(message, 
_failoverPolicy.getCurrentBrokerDetails().toString(),
-                                                          null);
+                    throw new AMQUnresolvedAddressException
+                        (message,
+                         _failoverPolicy.getCurrentBrokerDetails().toString(),
+                         connectionException);
                 }
-
             }
-            throw e;
+
+            throw new AMQConnectionFailureException(message, 
connectionException);
         }
 
         _connectionMetaData = new QpidConnectionMetaData(this);

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
 Tue Jul 22 11:33:00 2008
@@ -43,7 +43,7 @@
 import org.apache.qpidity.transport.TransportConstants;
 import org.apache.qpidity.transport.ProtocolHeader;
 import org.apache.qpidity.transport.SessionDelegate;
-import org.apache.qpidity.transport.network.io.IoHandler;
+import org.apache.qpidity.transport.network.io.IoTransport;
 import org.apache.qpidity.transport.network.mina.MinaHandler;
 import org.apache.qpidity.transport.network.nio.NioHandler;
 import org.slf4j.Logger;
@@ -167,15 +167,16 @@
         connectionDelegate.setPassword(password);
         connectionDelegate.setVirtualHost(virtualHost);
 
-        if (System.getProperty("transport","mina").equalsIgnoreCase("nio"))
+        String transport = System.getProperty("transport","io");
+        if (transport.equalsIgnoreCase("nio"))
         {
             _logger.info("using NIO Transport");
             _conn = NioHandler.connect(host, port,connectionDelegate);
         }
-        else if (System.getProperty("transport","mina").equalsIgnoreCase("io"))
+        else if (transport.equalsIgnoreCase("io"))
         {
             _logger.info("using Plain IO Transport");
-            _conn = IoHandler.connect(host, port,connectionDelegate);
+            _conn = IoTransport.connect(host, port,connectionDelegate);
         }
         else
         {
@@ -287,20 +288,6 @@
         ssn.attach(ch);
         ssn.sessionAttach(ssn.getName());
         ssn.sessionRequestTimeout(expiryInSeconds);
-        String transport = System.getProperty("transport","mina");
-
-        try
-        {
-            if (Boolean.getBoolean("batch") && 
("io".equalsIgnoreCase(transport) || "nio".equalsIgnoreCase(transport)))
-            {
-                _logger.debug("using batch mode in transport " + transport);
-                IoHandler.startBatchingFrames(_conn.getConnectionId());
-            }
-        }
-        catch(Exception e)
-        {
-            e.printStackTrace();
-        }
         return ssn;
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Session.java
 Tue Jul 22 11:33:00 2008
@@ -249,7 +249,7 @@
         }
     }
 
-    protected void invoke(Method m)
+    public void invoke(Method m)
     {
         if (m.getEncodedTrack() == Frame.L4)
         {

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
 Tue Jul 22 11:33:00 2008
@@ -21,6 +21,7 @@
 package org.apache.qpidity.transport.network;
 
 import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
 
 import org.apache.qpidity.transport.ProtocolError;
 import org.apache.qpidity.transport.ProtocolHeader;
@@ -38,53 +39,41 @@
  * @author Rafael H. Schloming
  */
 
-public class InputHandler implements Receiver<ByteBuffer>
+public final class InputHandler implements Receiver<ByteBuffer>
 {
 
     public enum State
     {
         PROTO_HDR,
-        PROTO_HDR_M,
-        PROTO_HDR_Q,
-        PROTO_HDR_P,
-        PROTO_HDR_CLASS,
-        PROTO_HDR_INSTANCE,
-        PROTO_HDR_MAJOR,
-        PROTO_HDR_MINOR,
         FRAME_HDR,
-        FRAME_HDR_TYPE,
-        FRAME_HDR_SIZE1,
-        FRAME_HDR_SIZE2,
-        FRAME_HDR_RSVD1,
-        FRAME_HDR_TRACK,
-        FRAME_HDR_CH1,
-        FRAME_HDR_CH2,
-        FRAME_HDR_RSVD2,
-        FRAME_HDR_RSVD3,
-        FRAME_HDR_RSVD4,
-        FRAME_HDR_RSVD5,
-        FRAME_FRAGMENT,
+        FRAME_BODY,
         ERROR;
     }
 
     private final Receiver<NetworkEvent> receiver;
     private State state;
-
-    private byte instance;
-    private byte major;
-    private byte minor;
+    private ByteBuffer input = null;
+    private int needed;
 
     private byte flags;
     private SegmentType type;
     private byte track;
     private int channel;
-    private int size;
-    private ByteBuffer body;
 
     public InputHandler(Receiver<NetworkEvent> receiver, State state)
     {
         this.receiver = receiver;
         this.state = state;
+
+        switch (state)
+        {
+        case PROTO_HDR:
+            needed = 8;
+            break;
+        case FRAME_HDR:
+            needed = Frame.HEADER_SIZE;
+            break;
+        }
     }
 
     public InputHandler(Receiver<NetworkEvent> receiver)
@@ -92,18 +81,6 @@
         this(receiver, PROTO_HDR);
     }
 
-    private void init()
-    {
-        receiver.received(new ProtocolHeader(instance, major, minor));
-    }
-
-    private void frame()
-    {
-        Frame frame = new Frame(flags, type, track, channel, body);
-        assert size == frame.getSize();
-        receiver.received(frame);
-    }
-
     private void error(String fmt, Object ... args)
     {
         receiver.received(new ProtocolError(Frame.L1, fmt, args));
@@ -111,157 +88,109 @@
 
     public void received(ByteBuffer buf)
     {
-        while (buf.hasRemaining())
+        int limit = buf.limit();
+        int remaining = buf.remaining();
+        while (remaining > 0)
         {
-            state = next(buf);
+            if (remaining >= needed)
+            {
+                int consumed = needed;
+                int pos = buf.position();
+                if (input == null)
+                {
+                    buf.limit(pos + needed);
+                    input = buf;
+                    state = next(pos);
+                    buf.limit(limit);
+                    buf.position(pos + consumed);
+                }
+                else
+                {
+                    buf.limit(pos + needed);
+                    input.put(buf);
+                    buf.limit(limit);
+                    input.flip();
+                    state = next(0);
+                }
+
+                remaining -= consumed;
+                input = null;
+            }
+            else
+            {
+                if (input == null)
+                {
+                    input = ByteBuffer.allocate(needed);
+                }
+                input.put(buf);
+                needed -= remaining;
+                remaining = 0;
+            }
         }
     }
 
-    private State next(ByteBuffer buf)
+    private State next(int pos)
     {
+        input.order(ByteOrder.BIG_ENDIAN);
+
         switch (state) {
         case PROTO_HDR:
-            return expect(buf, 'A', PROTO_HDR_M);
-        case PROTO_HDR_M:
-            return expect(buf, 'M', PROTO_HDR_Q);
-        case PROTO_HDR_Q:
-            return expect(buf, 'Q', PROTO_HDR_P);
-        case PROTO_HDR_P:
-            return expect(buf, 'P', PROTO_HDR_CLASS);
-        case PROTO_HDR_CLASS:
-            return expect(buf, 1, PROTO_HDR_INSTANCE);
-        case PROTO_HDR_INSTANCE:
-            instance = buf.get();
-            return PROTO_HDR_MAJOR;
-        case PROTO_HDR_MAJOR:
-            major = buf.get();
-            return PROTO_HDR_MINOR;
-        case PROTO_HDR_MINOR:
-            minor = buf.get();
-            init();
+            if (input.get(pos) != 'A' &&
+                input.get(pos + 1) != 'M' &&
+                input.get(pos + 2) != 'Q' &&
+                input.get(pos + 3) != 'P')
+            {
+                error("bad protocol header: %s", str(input));
+                return ERROR;
+            }
+
+            byte instance = input.get(pos + 5);
+            byte major = input.get(pos + 6);
+            byte minor = input.get(pos + 7);
+            receiver.received(new ProtocolHeader(instance, major, minor));
+            needed = Frame.HEADER_SIZE;
             return FRAME_HDR;
         case FRAME_HDR:
-            flags = buf.get();
-            return FRAME_HDR_TYPE;
-        case FRAME_HDR_TYPE:
-            type = SegmentType.get(buf.get());
-            return FRAME_HDR_SIZE1;
-        case FRAME_HDR_SIZE1:
-            size = (0xFF & buf.get()) << 8;
-            return FRAME_HDR_SIZE2;
-        case FRAME_HDR_SIZE2:
-            size += 0xFF & buf.get();
-            size -= 12;
+            flags = input.get(pos);
+            type = SegmentType.get(input.get(pos + 1));
+            int size = (0xFFFF & input.getShort(pos + 2));
+            size -= Frame.HEADER_SIZE;
             if (size < 0 || size > (64*1024 - 12))
             {
                 error("bad frame size: %d", size);
                 return ERROR;
             }
-            else
-            {
-                return FRAME_HDR_RSVD1;
-            }
-        case FRAME_HDR_RSVD1:
-            return expect(buf, 0, FRAME_HDR_TRACK);
-        case FRAME_HDR_TRACK:
-            byte b = buf.get();
+            byte b = input.get(pos + 5);
             if ((b & 0xF0) != 0) {
                 error("non-zero reserved bits in upper nibble of " +
                       "frame header byte 5: '%x'", b);
                 return ERROR;
             } else {
                 track = (byte) (b & 0xF);
-                return FRAME_HDR_CH1;
             }
-        case FRAME_HDR_CH1:
-            channel = (0xFF & buf.get()) << 8;
-            return FRAME_HDR_CH2;
-        case FRAME_HDR_CH2:
-            channel += 0xFF & buf.get();
-            return FRAME_HDR_RSVD2;
-        case FRAME_HDR_RSVD2:
-            return expect(buf, 0, FRAME_HDR_RSVD3);
-        case FRAME_HDR_RSVD3:
-            return expect(buf, 0, FRAME_HDR_RSVD4);
-        case FRAME_HDR_RSVD4:
-            return expect(buf, 0, FRAME_HDR_RSVD5);
-        case FRAME_HDR_RSVD5:
-            if (!expect(buf, 0))
+            channel = (0xFFFF & input.getShort(pos + 6));
+            if (size == 0)
             {
-                return ERROR;
-            }
-
-            if (size > buf.remaining()) {
-                body = ByteBuffer.allocate(size);
-                body.put(buf);
-                return FRAME_FRAGMENT;
-            } else {
-                body = buf.slice();
-                body.limit(size);
-                buf.position(buf.position() + size);
-                frame();
+                Frame frame = new Frame(flags, type, track, channel, 
ByteBuffer.allocate(0));
+                receiver.received(frame);
+                needed = Frame.HEADER_SIZE;
                 return FRAME_HDR;
             }
-        case FRAME_FRAGMENT:
-            int delta = body.remaining();
-            if (delta > buf.remaining()) {
-                body.put(buf);
-                return FRAME_FRAGMENT;
-            } else {
-                ByteBuffer fragment = buf.slice();
-                fragment.limit(delta);
-                buf.position(buf.position() + delta);
-                body.put(fragment);
-                body.flip();
-                frame();
-                return FRAME_HDR;
+            else
+            {
+                needed = size;
+                return FRAME_BODY;
             }
+        case FRAME_BODY:
+            Frame frame = new Frame(flags, type, track, channel, 
input.slice());
+            receiver.received(frame);
+            needed = Frame.HEADER_SIZE;
+            return FRAME_HDR;
         default:
             throw new IllegalStateException();
         }
     }
 
-    private State expect(ByteBuffer buf, int expected, State next)
-    {
-        return expect(buf, (byte) expected, next);
-    }
-
-    private State expect(ByteBuffer buf, char expected, State next)
-    {
-        return expect(buf, (byte) expected, next);
-    }
-
-    private State expect(ByteBuffer buf, byte expected, State next)
-    {
-        if (expect(buf, expected))
-        {
-            return next;
-        }
-        else
-        {
-            return ERROR;
-        }
-    }
-
-    private boolean expect(ByteBuffer buf, int expected)
-    {
-        return expect(buf, (byte) expected);
-    }
-
-    private boolean expect(ByteBuffer buf, byte expected)
-    {
-        byte b = buf.get();
-        if (b == expected)
-        {
-            return true;
-        }
-        else
-        {
-            error("expecting '%x', got '%x'", expected, b);
-            return false;
-        }
-    }
-
     public void exception(Throwable t)
     {
         receiver.exception(t);

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java?rev=678848&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
 Tue Jul 22 11:33:00 2008
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpidity.transport.network.io;
+
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.util.Logger;
+
+import java.io.InputStream;
+import java.io.IOException;
+import java.net.Socket;
+import java.nio.ByteBuffer;
+
+/**
+ * IoReceiver
+ *
+ */
+
+final class IoReceiver extends Thread
+{
+
+    private static final Logger log = Logger.get(IoReceiver.class);
+
+    private final IoTransport transport;
+    private final Receiver<ByteBuffer> receiver;
+    private final int bufferSize;
+    private final Socket socket;
+
+    public IoReceiver(IoTransport transport, Receiver<ByteBuffer> receiver, 
int bufferSize)
+    {
+        this.transport = transport;
+        this.receiver = receiver;
+        this.bufferSize = bufferSize;
+        this.socket = transport.getSocket();
+
+        setName(String.format("IoReceive - %s", 
socket.getRemoteSocketAddress()));
+        start();
+    }
+
+    public void run()
+    {
+        final int threshold = bufferSize / 2;
+
+        // I set the read buffer size simillar to SO_RCVBUF
+        // Haven't tested with a lower value to see if it's better or worse
+        byte[] buffer = new byte[bufferSize];
+        try
+        {
+            InputStream in = socket.getInputStream();
+            int read = 0;
+            int offset = 0;
+            while ((read = in.read(buffer, offset, bufferSize-offset)) != -1)
+            {
+                if (read > 0)
+                {
+                    ByteBuffer b = ByteBuffer.wrap(buffer,offset,read);
+                    receiver.received(b);
+                    offset+=read;
+                    if (offset > threshold)
+                    {
+                        offset = 0;
+                        buffer = new byte[bufferSize];
+                    }
+                }
+            }
+        }
+        catch (Throwable t)
+        {
+            receiver.exception(new TransportException("error in read thread", 
t));
+        }
+        finally
+        {
+            try
+            {
+                transport.getSender().close();
+            }
+            catch (TransportException e)
+            {
+                log.error(e, "error closing");
+            }
+            finally
+            {
+                receiver.closed();
+            }
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoReceiver.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoSender.java
 Tue Jul 22 11:33:00 2008
@@ -21,106 +21,243 @@
 import java.io.IOException;
 import java.io.OutputStream;
 import java.net.Socket;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.qpidity.transport.Sender;
 import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.util.Logger;
 
-public class IoSender implements Sender<java.nio.ByteBuffer>
+import static org.apache.qpidity.transport.util.Functions.*;
+
+
+final class IoSender extends Thread implements Sender<ByteBuffer>
 {
-    private final Object lock = new Object();
-    private Socket _socket;
-    private OutputStream _outStream;
 
-    public IoSender(Socket socket)
-    {
-        this._socket = socket;
+    private static final Logger log = Logger.get(IoSender.class);
+
+    // by starting here, we ensure that we always test the wraparound
+    // case, we should probably make this configurable somehow so that
+    // we can test other cases as well
+    private final static int START = Integer.MAX_VALUE - 10;
+
+    private final IoTransport transport;
+    private final long timeout;
+    private final Socket socket;
+    private final OutputStream out;
+
+    private final byte[] buffer;
+    private final AtomicInteger head = new AtomicInteger(START);
+    private final AtomicInteger tail = new AtomicInteger(START);
+    private final Object notFull = new Object();
+    private final Object notEmpty = new Object();
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+
+    private IOException exception = null;
+
+
+    public IoSender(IoTransport transport, int bufferSize, long timeout)
+    {
+        this.transport = transport;
+        this.socket = transport.getSocket();
+        this.buffer = new byte[bufferSize];
+        this.timeout = timeout;
+
         try
         {
-            _outStream = _socket.getOutputStream();
+            out = socket.getOutputStream();
         }
-        catch(IOException e)
+        catch (IOException e)
         {
-            throw new TransportException("Error getting output stream for 
socket",e);
+            throw new TransportException("Error getting output stream for 
socket", e);
         }
-    }
 
-    /*
-     * Currently I don't implement any in memory buffering
-     * and just write straight to the wire.
-     * I want to experiment with buffering and see if I can
-     * get more performance, all though latency will suffer
-     * a bit.
-     */
-    public void send(java.nio.ByteBuffer buf)
-    {
-        write(buf);
+        setName(String.format("IoSender - %s", 
socket.getRemoteSocketAddress()));
+        start();
     }
 
-    public void flush()
+    private static final int mod(int n, int m)
     {
-        // pass
+        int r = n % m;
+        return r < 0 ? m + r : r;
     }
 
-    /* The extra copying sucks.
-     * If I know for sure that the buf is backed
-     * by an array then I could do buf.array()
-     */
-    private void write(java.nio.ByteBuffer buf)
+    public void send(ByteBuffer buf)
     {
-        byte[] array = null;
-
-        if (buf.hasArray())
-        {
-            array = buf.array();
-        }
-        else
+        if (closed.get())
         {
-            array = new byte[buf.remaining()];
-            buf.get(array);
+            throw new TransportException("sender is closed", exception);
         }
 
-        if( _socket.isConnected())
+        final int size = buffer.length;
+        int remaining = buf.remaining();
+
+        while (remaining > 0)
         {
-            synchronized (lock)
+            final int hd = head.get();
+            final int tl = tail.get();
+
+            if (hd - tl >= size)
             {
-                try
+                synchronized (notFull)
                 {
-                    _outStream.write(array);
+                    long start = System.currentTimeMillis();
+                    long elapsed = 0;
+                    while (head.get() - tail.get() >= size && elapsed < 
timeout)
+                    {
+                        try
+                        {
+                            notFull.wait(timeout - elapsed);
+                        }
+                        catch (InterruptedException e)
+                        {
+                            // pass
+                        }
+                        elapsed += System.currentTimeMillis() - start;
+                    }
+
+                    if (head.get() - tail.get() >= size)
+                    {
+                        throw new TransportException(String.format("write 
timed out: %s, %s", head.get(), tail.get()));
+                    }
                 }
-                catch(Exception e)
+                continue;
+            }
+
+            final int hd_idx = mod(hd, size);
+            final int tl_idx = mod(tl, size);
+            final int length;
+
+            if (tl_idx > hd_idx)
+            {
+                length = Math.min(tl_idx - hd_idx, remaining);
+            }
+            else
+            {
+                length = Math.min(size - hd_idx, remaining);
+            }
+
+            buf.get(buffer, hd_idx, length);
+            head.getAndAdd(length);
+            if (hd == tail.get())
+            {
+                synchronized (notEmpty)
                 {
-                    throw new TransportException("Error trying to write to the 
socket",e);
+                    notEmpty.notify();
                 }
             }
-        }
-        else
-        {
-            throw new TransportException("Trying to write on a closed socket");
+            remaining -= length;
         }
     }
 
-    /*
-     * Haven't used this, but the intention is
-     * to experiment with it in the future.
-     * Also need to make sure the buffer size
-     * is configurable
-     */
-    public void setStartBatching()
+    public void flush()
     {
+        // pass
     }
 
     public void close()
     {
-        synchronized (lock)
+        if (closed.getAndSet(true))
         {
+            synchronized (notEmpty)
+            {
+                notEmpty.notify();
+            }
+
             try
             {
-                _socket.close();
+                join(timeout);
+                if (isAlive())
+                {
+                    throw new TransportException("join timed out");
+                }
+                socket.close();
+            }
+            catch (InterruptedException e)
+            {
+                throw new TransportException(e);
             }
-            catch(Exception e)
+            catch (IOException e)
             {
-                e.printStackTrace();
+                throw new TransportException(e);
+            }
+
+            if (exception != null)
+            {
+                throw new TransportException(exception);
+            }
+        }
+    }
+
+    public void run()
+    {
+        final int size = buffer.length;
+
+        while (true)
+        {
+            final int hd = head.get();
+            final int tl = tail.get();
+
+            if (hd == tl)
+            {
+                if (closed.get())
+                {
+                    break;
+                }
+
+                synchronized (notEmpty)
+                {
+                    while (head.get() == tail.get() && !closed.get())
+                    {
+                        try
+                        {
+                            notEmpty.wait();
+                        }
+                        catch (InterruptedException e)
+                        {
+                            // pass
+                        }
+                    }
+                }
+
+                continue;
+            }
+
+            final int hd_idx = mod(hd, size);
+            final int tl_idx = mod(tl, size);
+
+            final int length;
+            if (tl_idx < hd_idx)
+            {
+                length = hd_idx - tl_idx;
+            }
+            else
+            {
+                length = size - tl_idx;
+            }
+
+            try
+            {
+                out.write(buffer, tl_idx, length);
+            }
+            catch (IOException e)
+            {
+                log.error(e, "error in read thread");
+                exception = e;
+                closed.set(true);
+                break;
+            }
+            tail.getAndAdd(length);
+            if (head.get() - tl >= size)
+            {
+                synchronized (notFull)
+                {
+                    notFull.notify();
+                }
             }
         }
     }
+
 }

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java?rev=678848&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
 Tue Jul 22 11:33:00 2008
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpidity.transport.network.io;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.net.SocketException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.Receiver;
+import org.apache.qpidity.transport.TransportException;
+import org.apache.qpidity.transport.network.Assembler;
+import org.apache.qpidity.transport.network.Disassembler;
+import org.apache.qpidity.transport.network.InputHandler;
+import org.apache.qpidity.transport.network.OutputHandler;
+import org.apache.qpidity.transport.util.Logger;
+
+/**
+ * This class provides a socket based transport using the java.io
+ * classes.
+ *
+ * The following params are configurable via JVM arguments
+ * TCP_NO_DELAY - amqj.tcpNoDelay
+ * SO_RCVBUF    - amqj.receiveBufferSize
+ * SO_SNDBUF    - amqj.sendBufferSize
+ */
+public final class IoTransport
+{
+
+    private static final Logger log = Logger.get(IoTransport.class);
+
+    private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+
+    private IoReceiver receiver;
+    private IoSender sender;
+    private Socket socket;
+    private int readBufferSize;
+    private int writeBufferSize;
+    private final long timeout = 60000;
+
+    private IoTransport()
+    {
+        readBufferSize = 
Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+        writeBufferSize = 
Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+    }
+
+    public static final Connection connect(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        IoTransport handler = new IoTransport();
+        return handler.connectInternal(host,port,delegate);
+    }
+
+    private Connection connectInternal(String host, int port,
+            ConnectionDelegate delegate)
+    {
+        try
+        {
+            InetAddress address = InetAddress.getByName(host);
+            socket = new Socket();
+            socket.setReuseAddress(true);
+            socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
+
+            log.debug("default-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("default-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+            socket.setSendBufferSize(writeBufferSize);
+            socket.setReceiveBufferSize(readBufferSize);
+
+            log.debug("new-SO_RCVBUF : %s", socket.getReceiveBufferSize());
+            log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
+
+            socket.connect(new InetSocketAddress(address, port));
+        }
+        catch (SocketException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+        catch (IOException e)
+        {
+            throw new TransportException("Error connecting to broker", e);
+        }
+
+        sender = new IoSender(this, 2*writeBufferSize, timeout);
+        Connection conn = new Connection
+            (new Disassembler(new OutputHandler(sender), 64*1024 - 1),
+             delegate);
+        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)), 
2*readBufferSize);
+
+        return conn;
+    }
+
+    IoSender getSender()
+    {
+        return sender;
+    }
+
+    IoReceiver getReceiver()
+    {
+        return receiver;
+    }
+
+    Socket getSocket()
+    {
+        return socket;
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/io/IoTransport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java?rev=678848&r1=678847&r2=678848&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpidity/transport/ConnectionTest.java
 Tue Jul 22 11:33:00 2008
@@ -24,6 +24,7 @@
 
 import org.apache.qpid.util.concurrent.Condition;
 
+import org.apache.qpidity.transport.network.io.IoTransport;
 import org.apache.qpidity.transport.network.mina.MinaHandler;
 import org.apache.qpidity.transport.util.Logger;
 
@@ -67,7 +68,7 @@
 
     private Connection connect(final Condition closed)
     {
-        Connection conn = MinaHandler.connect("localhost", port, new 
ConnectionDelegate()
+        Connection conn = IoTransport.connect("localhost", port, new 
ConnectionDelegate()
         {
             public SessionDelegate getSessionDelegate()
             {


Reply via email to