Author: rhs
Date: Fri Aug  8 23:03:24 2008
New Revision: 684182

URL: http://svn.apache.org/viewvc?rev=684182&view=rev
Log:
QPID-1218: cleaned up the interface to IoTransport a bit; added IoAcceptor; 
fixed Session tracking of sync point; default JAVA inside qpid-run

Added:
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
   (with props)
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
   (with props)
Modified:
    
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
    incubator/qpid/trunk/qpid/java/common/bin/qpid-run
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
    
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
    
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java

Modified: 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
 Fri Aug  8 23:03:24 2008
@@ -50,7 +50,7 @@
 import org.apache.qpid.server.state.AMQStateManager;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
 
 import javax.management.JMException;
 import javax.security.sasl.SaslServer;
@@ -847,7 +847,7 @@
         return (_clientVersion == null) ? null : _clientVersion.toString();
     }
 
-    public void setSender(IoSender sender)
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
     {
        // No-op, interface munging between this and AMQProtocolSession
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
 Fri Aug  8 23:03:24 2008
@@ -16,12 +16,12 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.ProtocolInitiation;
 import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
 
 public class AMQIoTransportProtocolSession extends AMQProtocolSession
 {
 
-    protected IoSender _ioSender;
+    protected Sender<java.nio.ByteBuffer> _ioSender;
     private SaslClient _saslClient;
     private ConnectionTuneParameters _connectionTuneParameters;
     
@@ -102,7 +102,7 @@
     }
     
     @Override
-    public void setSender(IoSender sender)
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
     {
         _ioSender = sender;
     }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
 Fri Aug  8 23:03:24 2008
@@ -44,7 +44,7 @@
 import org.apache.qpid.framing.*;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
 
 /**
@@ -538,7 +538,7 @@
         _protocolHandler.propagateExceptionToAllWaiters(error);
     }
 
-    public void setSender(IoSender sender)
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
     {
         // No-op, interface munging
     }

Modified: incubator/qpid/trunk/qpid/java/common/bin/qpid-run
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/bin/qpid-run?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/bin/qpid-run (original)
+++ incubator/qpid/trunk/qpid/java/common/bin/qpid-run Fri Aug  8 23:03:24 2008
@@ -66,6 +66,10 @@
     QPID_WORK=$HOME
 fi
 
+if [ -z "$JAVA" ]; then
+    JAVA=java
+fi
+
 if $cygwin; then
   QPID_HOME=$(cygpath -w $QPID_HOME)
   QPID_WORK=$(cygpath -w $QPID_WORK)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
 Fri Aug  8 23:03:24 2008
@@ -21,9 +21,12 @@
 package org.apache.qpid.protocol;
 
 import org.apache.qpid.framing.*;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.AMQException;
 
+import java.nio.ByteBuffer;
+
+
 /**
  * AMQVersionAwareProtocolSession is implemented by all AMQP session classes, 
that need to provide an awareness to
  * callers of the version of the AMQP protocol that they are able to work with.
@@ -55,7 +58,7 @@
     public void heartbeatBodyReceived(int channelId, HeartbeatBody body) 
throws AMQException;
 
 
-    public void setSender(IoSender sender);
+    public void setSender(Sender<ByteBuffer> sender);
     public void init();
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
 Fri Aug  8 23:03:24 2008
@@ -248,6 +248,11 @@
         context.connectionOpenOk(hosts);
     }
 
+    @Override public void connectionClose(Channel ch, ConnectionClose close)
+    {
+        ch.connectionCloseOk();
+    }
+
     public String getPassword()
     {
         return _password;

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
 Fri Aug  8 23:03:24 2008
@@ -23,7 +23,8 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
-import org.apache.qpid.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
 
 
 /**
@@ -62,7 +63,9 @@
         delegate.setUsername("guest");
         delegate.setPassword("guest");
 
-        MinaHandler.accept("0.0.0.0", 5672, delegate);
+        IoAcceptor ioa = new IoAcceptor
+            ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+        ioa.start();
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
 Fri Aug  8 23:03:24 2008
@@ -47,6 +47,11 @@
         return ranges.iterator();
     }
 
+    public Range getFirst()
+    {
+        return ranges.getFirst();
+    }
+
     public boolean includes(Range range)
     {
         for (Range r : this)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Fri Aug  8 23:03:24 2008
@@ -76,7 +76,8 @@
     // completed incoming commands
     private final Object processedLock = new Object();
     private RangeSet processed = new RangeSet();
-    private Range syncPoint = null;
+    private int maxProcessed = commandsIn - 1;
+    private int syncPoint = maxProcessed;
 
     // outgoing command count
     private int commandsOut = 0;
@@ -165,7 +166,16 @@
         synchronized (processedLock)
         {
             processed.add(range);
-            flush = syncPoint != null && processed.includes(syncPoint);
+            Range first = processed.getFirst();
+            int lower = first.getLower();
+            int upper = first.getUpper();
+            int old = maxProcessed;
+            if (le(lower, maxProcessed + 1))
+            {
+                maxProcessed = max(maxProcessed, upper);
+            }
+            flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
+            syncPoint = maxProcessed;
         }
         if (flush)
         {
@@ -206,15 +216,11 @@
     {
         int id = getCommandsIn() - 1;
         log.debug("%s synced to %d", this, id);
-        Range range = new Range(0, id - 1);
         boolean flush;
         synchronized (processedLock)
         {
-            flush = processed.includes(range);
-            if (!flush)
-            {
-                syncPoint = range;
-            }
+            syncPoint = id;
+            flush = ge(maxProcessed, syncPoint);
         }
         if (flush)
         {

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=684182&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
 Fri Aug  8 23:03:24 2008
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+
+/**
+ * ConnectionBinding
+ *
+ */
+
+public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+{
+
+    private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+    private final ConnectionDelegate delegate;
+
+    public ConnectionBinding(ConnectionDelegate delegate)
+    {
+        this.delegate = delegate;
+    }
+
+    public Connection endpoint(Sender<ByteBuffer> sender)
+    {
+        // XXX: hardcoded max-frame
+        return new Connection
+            (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+    }
+
+    public Receiver<ByteBuffer> receiver(Connection conn)
+    {
+        return new InputHandler(new Assembler(conn));
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=684182&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
 Fri Aug  8 23:03:24 2008
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.TransportException;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * IoAcceptor
+ *
+ */
+
+public class IoAcceptor<E> extends Thread
+{
+
+
+    private ServerSocket socket;
+    private Binding<E,ByteBuffer> binding;
+
+    public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+        throws IOException
+    {
+        socket = new ServerSocket();
+        socket.setReuseAddress(true);
+        socket.bind(address);
+        this.binding = binding;
+
+        setName(String.format("IoAcceptor - %s", socket.getInetAddress()));
+    }
+
+    public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+        throws IOException
+    {
+        this(new InetSocketAddress(host, port), binding);
+    }
+
+    public void run()
+    {
+        while (true)
+        {
+            try
+            {
+                Socket sock = socket.accept();
+                IoTransport<E> transport = new IoTransport<E>(sock, binding);
+            }
+            catch (IOException e)
+            {
+                throw new TransportException(e);
+            }
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
 Fri Aug  8 23:03:24 2008
@@ -27,11 +27,14 @@
 import java.nio.ByteBuffer;
 
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Binding;
 import org.apache.qpid.transport.Connection;
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
 import org.apache.qpid.transport.TransportException;
 import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.ConnectionBinding;
 import org.apache.qpid.transport.network.Disassembler;
 import org.apache.qpid.transport.network.InputHandler;
 import org.apache.qpid.transport.util.Logger;
@@ -45,7 +48,7 @@
  * SO_RCVBUF    - amqj.receiveBufferSize
  * SO_SNDBUF    - amqj.sendBufferSize
  */
-public final class IoTransport
+public final class IoTransport<E>
 {
 
     static
@@ -59,47 +62,90 @@
     private static final Logger log = Logger.get(IoTransport.class);
 
     private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+    private static int readBufferSize = Integer.getInteger
+        ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+    private static int writeBufferSize = Integer.getInteger
+        ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
 
-    private IoReceiver receiver;
-    private IoSender sender;
     private Socket socket;
-    private int readBufferSize;
-    private int writeBufferSize;
-    private final long timeout = 60000;
+    private IoSender sender;
+    private E endpoint;
+    private IoReceiver receiver;
+    private long timeout = 60000;
+
+    IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+    {
+        this.socket = socket;
+        this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+        this.endpoint = binding.endpoint(sender);
+        this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+                                       2*readBufferSize, timeout);
+    }
+
+    IoSender getSender()
+    {
+        return sender;
+    }
+
+    IoReceiver getReceiver()
+    {
+        return receiver;
+    }
+
+    Socket getSocket()
+    {
+        return socket;
+    }
 
-    private IoTransport()
+    public static final <E> E connect(String host, int port,
+                                      Binding<E,ByteBuffer> binding)
     {
-        readBufferSize = 
Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
-        writeBufferSize = 
Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+        Socket socket = createSocket(host, port);
+        IoTransport<E> transport = new IoTransport<E>(socket, binding);
+        return transport.endpoint;
     }
 
     public static final Connection connect(String host, int port,
-            ConnectionDelegate delegate)
+                                           ConnectionDelegate delegate)
+    {
+        return connect(host, port, new ConnectionBinding(delegate));
+    }
+
+    public static void connect_0_9(AMQVersionAwareProtocolSession session, 
String host, int port)
     {
-        IoTransport handler = new IoTransport();
-        return handler.connectInternal(host,port,delegate);
+        connect(host, port, new Binding_0_9(session));
     }
 
-    private Connection connectInternal(String host, int port,
-            ConnectionDelegate delegate)
+    private static class Binding_0_9
+        implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
     {
-        createSocket(host, port);
 
-        sender = new IoSender(this, 2*writeBufferSize, timeout);
-        Connection conn = new Connection
-            (new Disassembler(sender, 64*1024 - 1), delegate);
-        receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
-                                  2*readBufferSize, timeout);
+        private AMQVersionAwareProtocolSession session;
+
+        Binding_0_9(AMQVersionAwareProtocolSession session)
+        {
+            this.session = session;
+        }
+
+        public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer> 
sender)
+        {
+            session.setSender(sender);
+            return session;
+        }
+
+        public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession 
ssn)
+        {
+            return new InputHandler_0_9(ssn);
+        }
 
-        return conn;
     }
 
-    private void createSocket(String host, int port)
+    private static Socket createSocket(String host, int port)
     {
         try
         {
             InetAddress address = InetAddress.getByName(host);
-            socket = new Socket();
+            Socket socket = new Socket();
             socket.setReuseAddress(true);
             socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
 
@@ -113,6 +159,7 @@
             log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
 
             socket.connect(new InetSocketAddress(address, port));
+            return socket;
         }
         catch (SocketException e)
         {
@@ -124,36 +171,4 @@
         }
     }
 
-    IoSender getSender()
-    {
-        return sender;
-    }
-
-    IoReceiver getReceiver()
-    {
-        return receiver;
-    }
-
-    Socket getSocket()
-    {
-        return socket;
-    }
-
-    public static void connect_0_9 (AMQVersionAwareProtocolSession session, 
String host, int port)
-    {
-        IoTransport handler = new IoTransport();
-        handler.connectInternal_0_9(session, host, port);
-    }
-    
-    public void connectInternal_0_9(AMQVersionAwareProtocolSession session, 
String host, int port)
-    {
-
-        createSocket(host, port);
-
-        sender = new IoSender(this, 2*writeBufferSize, timeout);
-        receiver = new IoReceiver(this, new InputHandler_0_9(session),
-                    2*readBufferSize, timeout);
-        session.setSender(sender);
-    }
-
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
 Fri Aug  8 23:03:24 2008
@@ -38,6 +38,7 @@
 import org.apache.qpid.transport.ConnectionDelegate;
 import org.apache.qpid.transport.Receiver;
 import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.ConnectionBinding;
 
 import org.apache.qpid.transport.util.Logger;
 
@@ -55,7 +56,6 @@
 //RA making this public until we sort out the package issues
 public class MinaHandler<E> implements IoHandler
 {
-    private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
     /** Default buffer size for pending messages reads */
     private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
     /** Default buffer size for pending messages writes */
@@ -201,7 +201,7 @@
         IoAcceptor acceptor = new SocketAcceptor();
         acceptor.bind(address, new MinaHandler<E>(binding));
     }
-                   
+
     public static final <E> E connect(String host, int port,
                                       Binding<E,java.nio.ByteBuffer> binding)
     {
@@ -262,43 +262,13 @@
                                     ConnectionDelegate delegate)
         throws IOException
     {
-        accept(host, port, new ConnectionBinding
-               (delegate, InputHandler.State.PROTO_HDR));
+        accept(host, port, new ConnectionBinding(delegate));
     }
 
     public static final Connection connect(String host, int port,
                                            ConnectionDelegate delegate)
     {
-        return connect(host, port, new ConnectionBinding
-                       (delegate, InputHandler.State.PROTO_HDR));
-    }
-
-    private static class ConnectionBinding
-        implements Binding<Connection,java.nio.ByteBuffer>
-    {
-
-        private final ConnectionDelegate delegate;
-        private final InputHandler.State state;
-
-        ConnectionBinding(ConnectionDelegate delegate,
-                          InputHandler.State state)
-        {
-            this.delegate = delegate;
-            this.state = state;
-        }
-
-        public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
-        {
-            // XXX: hardcoded max-frame
-            return new Connection
-                (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
-        }
-
-        public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
-        {
-            return new InputHandler(new Assembler(conn), state);
-        }
-
+        return connect(host, port, new ConnectionBinding(delegate));
     }
 
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
 Fri Aug  8 23:03:24 2008
@@ -24,8 +24,9 @@
 
 import org.apache.qpid.util.concurrent.Condition;
 
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
 import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.network.mina.MinaHandler;
 import org.apache.qpid.transport.util.Logger;
 
 import junit.framework.TestCase;
@@ -63,7 +64,9 @@
             public void closed() {}
         };
 
-        MinaHandler.accept("localhost", port, server);
+        IoAcceptor ioa = new IoAcceptor
+            ("localhost", port, new ConnectionBinding(server));
+        ioa.start();
     }
 
     private Connection connect(final Condition closed)

Modified: 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
 Fri Aug  8 23:03:24 2008
@@ -29,7 +29,7 @@
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.server.protocol.AMQProtocolSession;
 import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
 
 import javax.security.sasl.SaslServer;
 import java.util.HashMap;
@@ -248,7 +248,7 @@
         return null;  //To change body of implemented methods use File | 
Settings | File Templates.
     }
 
-    public void setSender(IoSender sender)
+    public void setSender(Sender<java.nio.ByteBuffer> sender)
     {
         // FIXME AS TODO
         


Reply via email to