Author: rhs
Date: Fri Aug 8 23:03:24 2008
New Revision: 684182
URL: http://svn.apache.org/viewvc?rev=684182&view=rev
Log:
QPID-1218: cleaned up the interface to IoTransport a bit; added IoAcceptor;
fixed Session tracking of sync point; default JAVA inside qpid-run
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
(with props)
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
(with props)
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/trunk/qpid/java/common/bin/qpid-run
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
Fri Aug 8 23:03:24 2008
@@ -50,7 +50,7 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import javax.management.JMException;
import javax.security.sasl.SaslServer;
@@ -847,7 +847,7 @@
return (_clientVersion == null) ? null : _clientVersion.toString();
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// No-op, interface munging between this and AMQProtocolSession
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQIoTransportProtocolSession.java
Fri Aug 8 23:03:24 2008
@@ -16,12 +16,12 @@
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ProtocolInitiation;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
public class AMQIoTransportProtocolSession extends AMQProtocolSession
{
- protected IoSender _ioSender;
+ protected Sender<java.nio.ByteBuffer> _ioSender;
private SaslClient _saslClient;
private ConnectionTuneParameters _connectionTuneParameters;
@@ -102,7 +102,7 @@
}
@Override
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
_ioSender = sender;
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Fri Aug 8 23:03:24 2008
@@ -44,7 +44,7 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.client.handler.ClientMethodDispatcherImpl;
/**
@@ -538,7 +538,7 @@
_protocolHandler.propagateExceptionToAllWaiters(error);
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// No-op, interface munging
}
Modified: incubator/qpid/trunk/qpid/java/common/bin/qpid-run
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/bin/qpid-run?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/bin/qpid-run (original)
+++ incubator/qpid/trunk/qpid/java/common/bin/qpid-run Fri Aug 8 23:03:24 2008
@@ -66,6 +66,10 @@
QPID_WORK=$HOME
fi
+if [ -z "$JAVA" ]; then
+ JAVA=java
+fi
+
if $cygwin; then
QPID_HOME=$(cygpath -w $QPID_HOME)
QPID_WORK=$(cygpath -w $QPID_WORK)
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQVersionAwareProtocolSession.java
Fri Aug 8 23:03:24 2008
@@ -21,9 +21,12 @@
package org.apache.qpid.protocol;
import org.apache.qpid.framing.*;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.AMQException;
+import java.nio.ByteBuffer;
+
+
/**
* AMQVersionAwareProtocolSession is implemented by all AMQP session classes,
that need to provide an awareness to
* callers of the version of the AMQP protocol that they are able to work with.
@@ -55,7 +58,7 @@
public void heartbeatBodyReceived(int channelId, HeartbeatBody body)
throws AMQException;
- public void setSender(IoSender sender);
+ public void setSender(Sender<ByteBuffer> sender);
public void init();
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
Fri Aug 8 23:03:24 2008
@@ -248,6 +248,11 @@
context.connectionOpenOk(hosts);
}
+ @Override public void connectionClose(Channel ch, ConnectionClose close)
+ {
+ ch.connectionCloseOk();
+ }
+
public String getPassword()
{
return _password;
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
Fri Aug 8 23:03:24 2008
@@ -23,7 +23,8 @@
import java.io.IOException;
import java.nio.ByteBuffer;
-import org.apache.qpid.transport.network.mina.MinaHandler;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
/**
@@ -62,7 +63,9 @@
delegate.setUsername("guest");
delegate.setPassword("guest");
- MinaHandler.accept("0.0.0.0", 5672, delegate);
+ IoAcceptor ioa = new IoAcceptor
+ ("0.0.0.0", 5672, new ConnectionBinding(delegate));
+ ioa.start();
}
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/RangeSet.java
Fri Aug 8 23:03:24 2008
@@ -47,6 +47,11 @@
return ranges.iterator();
}
+ public Range getFirst()
+ {
+ return ranges.getFirst();
+ }
+
public boolean includes(Range range)
{
for (Range r : this)
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
Fri Aug 8 23:03:24 2008
@@ -76,7 +76,8 @@
// completed incoming commands
private final Object processedLock = new Object();
private RangeSet processed = new RangeSet();
- private Range syncPoint = null;
+ private int maxProcessed = commandsIn - 1;
+ private int syncPoint = maxProcessed;
// outgoing command count
private int commandsOut = 0;
@@ -165,7 +166,16 @@
synchronized (processedLock)
{
processed.add(range);
- flush = syncPoint != null && processed.includes(syncPoint);
+ Range first = processed.getFirst();
+ int lower = first.getLower();
+ int upper = first.getUpper();
+ int old = maxProcessed;
+ if (le(lower, maxProcessed + 1))
+ {
+ maxProcessed = max(maxProcessed, upper);
+ }
+ flush = lt(old, syncPoint) && ge(maxProcessed, syncPoint);
+ syncPoint = maxProcessed;
}
if (flush)
{
@@ -206,15 +216,11 @@
{
int id = getCommandsIn() - 1;
log.debug("%s synced to %d", this, id);
- Range range = new Range(0, id - 1);
boolean flush;
synchronized (processedLock)
{
- flush = processed.includes(range);
- if (!flush)
- {
- syncPoint = range;
- }
+ syncPoint = id;
+ flush = ge(maxProcessed, syncPoint);
}
if (flush)
{
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java?rev=684182&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
Fri Aug 8 23:03:24 2008
@@ -0,0 +1,60 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.Connection;
+import org.apache.qpid.transport.ConnectionDelegate;
+import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+
+/**
+ * ConnectionBinding
+ *
+ */
+
+public class ConnectionBinding implements Binding<Connection,ByteBuffer>
+{
+
+ private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
+
+ private final ConnectionDelegate delegate;
+
+ public ConnectionBinding(ConnectionDelegate delegate)
+ {
+ this.delegate = delegate;
+ }
+
+ public Connection endpoint(Sender<ByteBuffer> sender)
+ {
+ // XXX: hardcoded max-frame
+ return new Connection
+ (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
+ }
+
+ public Receiver<ByteBuffer> receiver(Connection conn)
+ {
+ return new InputHandler(new Assembler(conn));
+ }
+
+}
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/ConnectionBinding.java
------------------------------------------------------------------------------
svn:eol-style = native
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java?rev=684182&view=auto
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
Fri Aug 8 23:03:24 2008
@@ -0,0 +1,81 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transport.network.io;
+
+import org.apache.qpid.transport.Binding;
+import org.apache.qpid.transport.TransportException;
+
+import java.io.IOException;
+
+import java.net.InetSocketAddress;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.SocketAddress;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * IoAcceptor
+ *
+ */
+
+public class IoAcceptor<E> extends Thread
+{
+
+
+ private ServerSocket socket;
+ private Binding<E,ByteBuffer> binding;
+
+ public IoAcceptor(SocketAddress address, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ socket = new ServerSocket();
+ socket.setReuseAddress(true);
+ socket.bind(address);
+ this.binding = binding;
+
+ setName(String.format("IoAcceptor - %s", socket.getInetAddress()));
+ }
+
+ public IoAcceptor(String host, int port, Binding<E,ByteBuffer> binding)
+ throws IOException
+ {
+ this(new InetSocketAddress(host, port), binding);
+ }
+
+ public void run()
+ {
+ while (true)
+ {
+ try
+ {
+ Socket sock = socket.accept();
+ IoTransport<E> transport = new IoTransport<E>(sock, binding);
+ }
+ catch (IOException e)
+ {
+ throw new TransportException(e);
+ }
+ }
+ }
+
+}
Propchange:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoAcceptor.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoTransport.java
Fri Aug 8 23:03:24 2008
@@ -27,11 +27,14 @@
import java.nio.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.transport.Binding;
import org.apache.qpid.transport.Connection;
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.TransportException;
import org.apache.qpid.transport.network.Assembler;
+import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.util.Logger;
@@ -45,7 +48,7 @@
* SO_RCVBUF - amqj.receiveBufferSize
* SO_SNDBUF - amqj.sendBufferSize
*/
-public final class IoTransport
+public final class IoTransport<E>
{
static
@@ -59,47 +62,90 @@
private static final Logger log = Logger.get(IoTransport.class);
private static int DEFAULT_READ_WRITE_BUFFER_SIZE = 64 * 1024;
+ private static int readBufferSize = Integer.getInteger
+ ("amqj.receiveBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
+ private static int writeBufferSize = Integer.getInteger
+ ("amqj.sendBufferSize", DEFAULT_READ_WRITE_BUFFER_SIZE);
- private IoReceiver receiver;
- private IoSender sender;
private Socket socket;
- private int readBufferSize;
- private int writeBufferSize;
- private final long timeout = 60000;
+ private IoSender sender;
+ private E endpoint;
+ private IoReceiver receiver;
+ private long timeout = 60000;
+
+ IoTransport(Socket socket, Binding<E,ByteBuffer> binding)
+ {
+ this.socket = socket;
+ this.sender = new IoSender(this, 2*writeBufferSize, timeout);
+ this.endpoint = binding.endpoint(sender);
+ this.receiver = new IoReceiver(this, binding.receiver(endpoint),
+ 2*readBufferSize, timeout);
+ }
+
+ IoSender getSender()
+ {
+ return sender;
+ }
+
+ IoReceiver getReceiver()
+ {
+ return receiver;
+ }
+
+ Socket getSocket()
+ {
+ return socket;
+ }
- private IoTransport()
+ public static final <E> E connect(String host, int port,
+ Binding<E,ByteBuffer> binding)
{
- readBufferSize =
Integer.getInteger("amqj.receiveBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
- writeBufferSize =
Integer.getInteger("amqj.sendBufferSize",DEFAULT_READ_WRITE_BUFFER_SIZE);
+ Socket socket = createSocket(host, port);
+ IoTransport<E> transport = new IoTransport<E>(socket, binding);
+ return transport.endpoint;
}
public static final Connection connect(String host, int port,
- ConnectionDelegate delegate)
+ ConnectionDelegate delegate)
+ {
+ return connect(host, port, new ConnectionBinding(delegate));
+ }
+
+ public static void connect_0_9(AMQVersionAwareProtocolSession session,
String host, int port)
{
- IoTransport handler = new IoTransport();
- return handler.connectInternal(host,port,delegate);
+ connect(host, port, new Binding_0_9(session));
}
- private Connection connectInternal(String host, int port,
- ConnectionDelegate delegate)
+ private static class Binding_0_9
+ implements Binding<AMQVersionAwareProtocolSession,ByteBuffer>
{
- createSocket(host, port);
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- Connection conn = new Connection
- (new Disassembler(sender, 64*1024 - 1), delegate);
- receiver = new IoReceiver(this, new InputHandler(new Assembler(conn)),
- 2*readBufferSize, timeout);
+ private AMQVersionAwareProtocolSession session;
+
+ Binding_0_9(AMQVersionAwareProtocolSession session)
+ {
+ this.session = session;
+ }
+
+ public AMQVersionAwareProtocolSession endpoint(Sender<ByteBuffer>
sender)
+ {
+ session.setSender(sender);
+ return session;
+ }
+
+ public Receiver<ByteBuffer> receiver(AMQVersionAwareProtocolSession
ssn)
+ {
+ return new InputHandler_0_9(ssn);
+ }
- return conn;
}
- private void createSocket(String host, int port)
+ private static Socket createSocket(String host, int port)
{
try
{
InetAddress address = InetAddress.getByName(host);
- socket = new Socket();
+ Socket socket = new Socket();
socket.setReuseAddress(true);
socket.setTcpNoDelay(Boolean.getBoolean("amqj.tcpNoDelay"));
@@ -113,6 +159,7 @@
log.debug("new-SO_SNDBUF : %s", socket.getSendBufferSize());
socket.connect(new InetSocketAddress(address, port));
+ return socket;
}
catch (SocketException e)
{
@@ -124,36 +171,4 @@
}
}
- IoSender getSender()
- {
- return sender;
- }
-
- IoReceiver getReceiver()
- {
- return receiver;
- }
-
- Socket getSocket()
- {
- return socket;
- }
-
- public static void connect_0_9 (AMQVersionAwareProtocolSession session,
String host, int port)
- {
- IoTransport handler = new IoTransport();
- handler.connectInternal_0_9(session, host, port);
- }
-
- public void connectInternal_0_9(AMQVersionAwareProtocolSession session,
String host, int port)
- {
-
- createSocket(host, port);
-
- sender = new IoSender(this, 2*writeBufferSize, timeout);
- receiver = new IoReceiver(this, new InputHandler_0_9(session),
- 2*readBufferSize, timeout);
- session.setSender(sender);
- }
-
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/mina/MinaHandler.java
Fri Aug 8 23:03:24 2008
@@ -38,6 +38,7 @@
import org.apache.qpid.transport.ConnectionDelegate;
import org.apache.qpid.transport.Receiver;
import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.ConnectionBinding;
import org.apache.qpid.transport.util.Logger;
@@ -55,7 +56,6 @@
//RA making this public until we sort out the package issues
public class MinaHandler<E> implements IoHandler
{
- private static final int MAX_FRAME_SIZE = 64 * 1024 - 1;
/** Default buffer size for pending messages reads */
private static final String DEFAULT_READ_BUFFER_LIMIT = "262144";
/** Default buffer size for pending messages writes */
@@ -201,7 +201,7 @@
IoAcceptor acceptor = new SocketAcceptor();
acceptor.bind(address, new MinaHandler<E>(binding));
}
-
+
public static final <E> E connect(String host, int port,
Binding<E,java.nio.ByteBuffer> binding)
{
@@ -262,43 +262,13 @@
ConnectionDelegate delegate)
throws IOException
{
- accept(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
+ accept(host, port, new ConnectionBinding(delegate));
}
public static final Connection connect(String host, int port,
ConnectionDelegate delegate)
{
- return connect(host, port, new ConnectionBinding
- (delegate, InputHandler.State.PROTO_HDR));
- }
-
- private static class ConnectionBinding
- implements Binding<Connection,java.nio.ByteBuffer>
- {
-
- private final ConnectionDelegate delegate;
- private final InputHandler.State state;
-
- ConnectionBinding(ConnectionDelegate delegate,
- InputHandler.State state)
- {
- this.delegate = delegate;
- this.state = state;
- }
-
- public Connection endpoint(Sender<java.nio.ByteBuffer> sender)
- {
- // XXX: hardcoded max-frame
- return new Connection
- (new Disassembler(sender, MAX_FRAME_SIZE), delegate);
- }
-
- public Receiver<java.nio.ByteBuffer> receiver(Connection conn)
- {
- return new InputHandler(new Assembler(conn), state);
- }
-
+ return connect(host, port, new ConnectionBinding(delegate));
}
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
Fri Aug 8 23:03:24 2008
@@ -24,8 +24,9 @@
import org.apache.qpid.util.concurrent.Condition;
+import org.apache.qpid.transport.network.ConnectionBinding;
+import org.apache.qpid.transport.network.io.IoAcceptor;
import org.apache.qpid.transport.network.io.IoTransport;
-import org.apache.qpid.transport.network.mina.MinaHandler;
import org.apache.qpid.transport.util.Logger;
import junit.framework.TestCase;
@@ -63,7 +64,9 @@
public void closed() {}
};
- MinaHandler.accept("localhost", port, server);
+ IoAcceptor ioa = new IoAcceptor
+ ("localhost", port, new ConnectionBinding(server));
+ ioa.start();
}
private Connection connect(final Condition closed)
Modified:
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java?rev=684182&r1=684181&r2=684182&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
(original)
+++
incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MockProtocolSession.java
Fri Aug 8 23:03:24 2008
@@ -29,7 +29,7 @@
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.transport.network.io.IoSender;
+import org.apache.qpid.transport.Sender;
import javax.security.sasl.SaslServer;
import java.util.HashMap;
@@ -248,7 +248,7 @@
return null; //To change body of implemented methods use File |
Settings | File Templates.
}
- public void setSender(IoSender sender)
+ public void setSender(Sender<java.nio.ByteBuffer> sender)
{
// FIXME AS TODO