Author: ritchiem
Date: Tue Feb 12 09:36:07 2008
New Revision: 620876
URL: http://svn.apache.org/viewvc?rev=620876&view=rev
Log:
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to
use as for connection.
Modified based on review by Robert Godfrey due to Thread safety around
SocketTransportConnection.java and TransportConnection.java. Now use a safe Map
to store all registered sockets in the TransportConnection.java these are then
removed as used or on request.
Modified:
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Modified:
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
Tue Feb 12 09:36:07 2008
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.jms.ConnectionListener;
import org.apache.qpid.url.URLSyntaxException;
@@ -36,6 +37,7 @@
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SocketChannel;
+import java.util.UUID;
/**
* This is a simple application that demonstrates how you can use the Qpid
AMQP interfaces to use existing sockets as
@@ -66,9 +68,14 @@
MessageProducer _producer;
Session _session;
+ String Socket1_ID = UUID.randomUUID().toString();
+ String Socket2_ID = UUID.randomUUID().toString();
+
+
/** Here we can see the broker we are connecting to is set to be
'socket:///' signifying we will provide the socket. */
- public static final String CONNECTION = "amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='socket:///'";
+ public final String CONNECTION = "amqp://guest:[EMAIL
PROTECTED]/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID
+ "'";
+
public ExistingSocketConnectorDemo() throws IOException,
URLSyntaxException, AMQException, JMSException
{
@@ -76,7 +83,10 @@
Socket socket = SocketChannel.open().socket();
socket.connect(new InetSocketAddress("localhost", 5672));
- _connection = new AMQConnection(CONNECTION, socket);
+ TransportConnection.registerOpenSocket(Socket1_ID, socket);
+
+
+ _connection = new AMQConnection(CONNECTION);
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -130,7 +140,7 @@
socket.connect(new InetSocketAddress("localhost", 5673));
// This is the new method to pass in an open socket for the
connection to use.
- ((AMQConnection) _connection).setOpenSocket(socket);
+ TransportConnection.registerOpenSocket(Socket2_ID, socket);
}
catch (IOException e)
{
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
Tue Feb 12 09:36:07 2008
@@ -157,7 +157,10 @@
}
else
{
- setPort(port);
+ if (!_transport.equalsIgnoreCase(SOCKET))
+ {
+ setPort(port);
+ }
}
String queryString = connection.getQuery();
@@ -264,13 +267,16 @@
sb.append(_transport);
sb.append("://");
- if (!(_transport.equalsIgnoreCase("vm")))
+ if (!(_transport.equalsIgnoreCase(VM)))
{
sb.append(_host);
}
- sb.append(':');
- sb.append(_port);
+ if (!(_transport.equalsIgnoreCase(SOCKET)))
+ {
+ sb.append(':');
+ sb.append(_port);
+ }
sb.append(printOptionsURL());
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Tue Feb 12 09:36:07 2008
@@ -30,8 +30,6 @@
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.transport.ITransportConnection;
-import org.apache.qpid.client.transport.SocketTransportConnection;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.*;
@@ -64,7 +62,6 @@
import javax.naming.StringRefAddr;
import java.io.IOException;
import java.net.ConnectException;
-import java.net.Socket;
import java.nio.channels.UnresolvedAddressException;
import java.text.MessageFormat;
import java.util.*;
@@ -160,8 +157,6 @@
private static final long DEFAULT_TIMEOUT = 1000 * 30;
private ProtocolVersion _protocolVersion;
- /** The active socket that is to be used as a value for connection */
- private Socket _openSocket;
/**
* @param broker brokerdetails
@@ -179,7 +174,7 @@
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password
+ "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost
+ "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), null, null);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), null);
}
/**
@@ -198,7 +193,7 @@
this(new AMQConnectionURL(
ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password
+ "@"
+ ((clientName == null) ? "" : clientName) + "/" + virtualHost
+ "?brokerlist='"
- + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig,
null);
+ + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
}
public AMQConnection(String host, int port, String username, String
password, String clientName, String virtualHost)
@@ -223,38 +218,26 @@
+ "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
: (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" +
password + "@"
+ ((clientName == null) ? "" : clientName) + virtualHost +
"?brokerlist='tcp://" + host + ":" + port
- + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")),
sslConfig, null);
- }
-
- public AMQConnection(String connection, Socket socket) throws
AMQException, URLSyntaxException
- {
- this(new AMQConnectionURL(connection), null, socket);
+ + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")),
sslConfig);
}
public AMQConnection(String connection) throws AMQException,
URLSyntaxException
{
- this(new AMQConnectionURL(connection), null, null);
+ this(new AMQConnectionURL(connection), null);
}
public AMQConnection(String connection, SSLConfiguration sslConfig) throws
AMQException, URLSyntaxException
{
- this(new AMQConnectionURL(connection), sslConfig, null);
+ this(new AMQConnectionURL(connection), sslConfig);
}
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration
sslConfig) throws AMQException
{
- this(connectionURL, sslConfig, null);
- }
-
- public AMQConnection(ConnectionURL connectionURL, SSLConfiguration
sslConfig, Socket socket) throws AMQException
- {
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
}
- _openSocket = socket;
-
_sslConfiguration = sslConfig;
if (connectionURL == null)
{
@@ -414,23 +397,7 @@
try
{
- ITransportConnection connection =
TransportConnection.getInstance(brokerDetail);
-
- if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
- {
- if (_openSocket != null)
- {
- ((SocketTransportConnection)
connection).setOpenSocket(_openSocket);
- }
- else
- {
- throw new IllegalArgumentException("Active Socket must be
provided for broker " +
- "with 'socket'
transport:" + brokerDetail);
- }
-
- }
-
- connection.connect(_protocolHandler, brokerDetail);
+
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler,
brokerDetail);
// this blocks until the connection has been set up or when an
error
// has prevented the connection being set up
@@ -1325,11 +1292,6 @@
public AMQSession getSession(int channelId)
{
return _sessions.get(channelId);
- }
-
- public void setOpenSocket(Socket socket)
- {
- _openSocket = socket;
}
public ProtocolVersion getProtocolVersion()
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
Tue Feb 12 09:36:07 2008
@@ -38,6 +38,8 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class SocketTransportConnection implements ITransportConnection
{
@@ -46,8 +48,6 @@
private SocketConnectorFactory _socketConnectorFactory;
- private Socket _openSocket;
-
static interface SocketConnectorFactory
{
IoConnector newSocketConnector();
@@ -58,11 +58,6 @@
_socketConnectorFactory = socketConnectorFactory;
}
- public void setOpenSocket(Socket openSocket)
- {
- _openSocket = openSocket;
- }
-
public void connect(AMQProtocolHandler protocolHandler, BrokerDetails
brokerDetail) throws IOException
{
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
@@ -99,15 +94,18 @@
{
address = null;
- if (_openSocket != null)
+ Socket socket =
TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+ if (socket != null)
{
- _logger.info("Using existing Socket:" + _openSocket);
- ((ExistingSocketConnector)
ioConnector).setOpenSocket(_openSocket);
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
}
else
{
throw new IllegalArgumentException("Active Socket must be
provided for broker " +
- "with 'socket' transport:"
+ brokerDetail);
+ "with 'socket://<SocketID>'
transport:" + brokerDetail);
}
}
else
Modified:
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
---
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Tue Feb 12 09:36:07 2008
@@ -37,6 +37,9 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
/**
* The TransportConnection is a helper class responsible for connecting to an
AMQ server. It sets up the underlying
@@ -61,6 +64,18 @@
private static final String DEFAULT_QPID_SERVER =
"org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static Map<String, Socket> _openSocketRegister = new
ConcurrentHashMap<String, Socket>();
+
+ public static void registerOpenSocket(String socketID, Socket openSocket)
+ {
+ _openSocketRegister.put(socketID, openSocket);
+ }
+
+ public static Socket removeOpenSocket(String socketID)
+ {
+ return _openSocketRegister.remove(socketID);
+ }
+
public static ITransportConnection getInstance(BrokerDetails details)
throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -305,7 +320,7 @@
synchronized (_inVmPipeAddress)
{
_inVmPipeAddress.clear();
- }
+ }
_acceptor = null;
_currentInstance = -1;
_currentVMPort = -1;