Author: ritchiem
Date: Tue Feb 12 09:36:07 2008
New Revision: 620876

URL: http://svn.apache.org/viewvc?rev=620876&view=rev
Log:
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries to 
use as for connection.
Modified based on review by Robert Godfrey due to Thread safety around 
SocketTransportConnection.java and TransportConnection.java. Now use a safe Map 
to store all registered sockets in the TransportConnection.java these are then 
removed as used or on request.

Modified:
    
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java

Modified: 
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
 Tue Feb 12 09:36:07 2008
@@ -23,6 +23,7 @@
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.jms.ConnectionListener;
 import org.apache.qpid.url.URLSyntaxException;
 
@@ -36,6 +37,7 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.channels.SocketChannel;
+import java.util.UUID;
 
 /**
  * This is a simple application that demonstrates how you can use the Qpid 
AMQP interfaces to use existing sockets as
@@ -66,9 +68,14 @@
     MessageProducer _producer;
     Session _session;
 
+    String Socket1_ID = UUID.randomUUID().toString();
+    String Socket2_ID = UUID.randomUUID().toString();
+
+
 
     /** Here we can see the broker we are connecting to is set to be 
'socket:///' signifying we will provide the socket. */
-    public static final String CONNECTION = "amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='socket:///'";
+    public final String CONNECTION = "amqp://guest:[EMAIL 
PROTECTED]/test?brokerlist='socket://" + Socket1_ID + ";socket://" + Socket2_ID 
+ "'";
+
 
     public ExistingSocketConnectorDemo() throws IOException, 
URLSyntaxException, AMQException, JMSException
     {
@@ -76,7 +83,10 @@
         Socket socket = SocketChannel.open().socket();
         socket.connect(new InetSocketAddress("localhost", 5672));
 
-        _connection = new AMQConnection(CONNECTION, socket);
+        TransportConnection.registerOpenSocket(Socket1_ID, socket);
+
+
+        _connection = new AMQConnection(CONNECTION);
 
         _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
@@ -130,7 +140,7 @@
             socket.connect(new InetSocketAddress("localhost", 5673));
 
             // This is the new method to pass in an open socket for the 
connection to use.
-            ((AMQConnection) _connection).setOpenSocket(socket);
+            TransportConnection.registerOpenSocket(Socket2_ID, socket);
         }
         catch (IOException e)
         {

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
 Tue Feb 12 09:36:07 2008
@@ -157,7 +157,10 @@
             }
             else
             {
-                setPort(port);
+                if (!_transport.equalsIgnoreCase(SOCKET))
+                {
+                    setPort(port);
+                }
             }
 
             String queryString = connection.getQuery();
@@ -264,13 +267,16 @@
         sb.append(_transport);
         sb.append("://");
 
-        if (!(_transport.equalsIgnoreCase("vm")))
+        if (!(_transport.equalsIgnoreCase(VM)))
         {
             sb.append(_host);
         }
 
-        sb.append(':');
-        sb.append(_port);
+        if (!(_transport.equalsIgnoreCase(SOCKET)))
+        {
+            sb.append(':');
+            sb.append(_port);
+        }
 
         sb.append(printOptionsURL());
 

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
 Tue Feb 12 09:36:07 2008
@@ -30,8 +30,6 @@
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.state.AMQStateManager;
-import org.apache.qpid.client.transport.ITransportConnection;
-import org.apache.qpid.client.transport.SocketTransportConnection;
 import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.exchange.ExchangeDefaults;
 import org.apache.qpid.framing.*;
@@ -64,7 +62,6 @@
 import javax.naming.StringRefAddr;
 import java.io.IOException;
 import java.net.ConnectException;
-import java.net.Socket;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
 import java.util.*;
@@ -160,8 +157,6 @@
     private static final long DEFAULT_TIMEOUT = 1000 * 30;
     private ProtocolVersion _protocolVersion;
 
-    /** The active socket that is to be used as a value for connection */
-    private Socket _openSocket;
 
     /**
      * @param broker      brokerdetails
@@ -179,7 +174,7 @@
         this(new AMQConnectionURL(
                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password 
+ "@"
                 + ((clientName == null) ? "" : clientName) + "/" + virtualHost 
+ "?brokerlist='"
-                + AMQBrokerDetails.checkTransport(broker) + "'"), null, null);
+                + AMQBrokerDetails.checkTransport(broker) + "'"), null);
     }
 
     /**
@@ -198,7 +193,7 @@
         this(new AMQConnectionURL(
                 ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + password 
+ "@"
                 + ((clientName == null) ? "" : clientName) + "/" + virtualHost 
+ "?brokerlist='"
-                + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig, 
null);
+                + AMQBrokerDetails.checkTransport(broker) + "'"), sslConfig);
     }
 
     public AMQConnection(String host, int port, String username, String 
password, String clientName, String virtualHost)
@@ -223,38 +218,26 @@
                    + "'" + "," + ConnectionURL.OPTIONS_SSL + "='true'")
                 : (ConnectionURL.AMQ_PROTOCOL + "://" + username + ":" + 
password + "@"
                    + ((clientName == null) ? "" : clientName) + virtualHost + 
"?brokerlist='tcp://" + host + ":" + port
-                   + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), 
sslConfig, null);
-    }
-
-    public AMQConnection(String connection, Socket socket) throws 
AMQException, URLSyntaxException
-    {
-        this(new AMQConnectionURL(connection), null, socket);
+                   + "'" + "," + ConnectionURL.OPTIONS_SSL + "='false'")), 
sslConfig);
     }
 
     public AMQConnection(String connection) throws AMQException, 
URLSyntaxException
     {
-        this(new AMQConnectionURL(connection), null, null);
+        this(new AMQConnectionURL(connection), null);
     }
 
     public AMQConnection(String connection, SSLConfiguration sslConfig) throws 
AMQException, URLSyntaxException
     {
-        this(new AMQConnectionURL(connection), sslConfig, null);
+        this(new AMQConnectionURL(connection), sslConfig);
     }
 
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration 
sslConfig) throws AMQException
     {
-        this(connectionURL, sslConfig, null);
-    }
-
-    public AMQConnection(ConnectionURL connectionURL, SSLConfiguration 
sslConfig, Socket socket) throws AMQException
-    {
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
         }
 
-        _openSocket = socket;
-
         _sslConfiguration = sslConfig;
         if (connectionURL == null)
         {
@@ -414,23 +397,7 @@
         try
         {
 
-            ITransportConnection connection = 
TransportConnection.getInstance(brokerDetail);
-
-            if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
-            {
-                if (_openSocket != null)
-                {
-                    ((SocketTransportConnection) 
connection).setOpenSocket(_openSocket);
-                }
-                else
-                {
-                    throw new IllegalArgumentException("Active Socket must be 
provided for broker " +
-                                                       "with 'socket' 
transport:" + brokerDetail);
-                }
-
-            }
-
-            connection.connect(_protocolHandler, brokerDetail);
+            
TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, 
brokerDetail);
              // this blocks until the connection has been set up or when an 
error
              // has prevented the connection being set up
 
@@ -1325,11 +1292,6 @@
     public AMQSession getSession(int channelId)
     {
         return _sessions.get(channelId);
-    }
-
-    public void setOpenSocket(Socket socket)
-    {
-        _openSocket = socket;
     }
 
     public ProtocolVersion getProtocolVersion()

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
 Tue Feb 12 09:36:07 2008
@@ -38,6 +38,8 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class SocketTransportConnection implements ITransportConnection
 {
@@ -46,8 +48,6 @@
 
     private SocketConnectorFactory _socketConnectorFactory;
 
-    private Socket _openSocket;
-
     static interface SocketConnectorFactory
     {
         IoConnector newSocketConnector();
@@ -58,11 +58,6 @@
         _socketConnectorFactory = socketConnectorFactory;
     }
 
-    public void setOpenSocket(Socket openSocket)
-    {
-        _openSocket = openSocket;
-    }
-
     public void connect(AMQProtocolHandler protocolHandler, BrokerDetails 
brokerDetail) throws IOException
     {
         
ByteBuffer.setUseDirectBuffers(Boolean.getBoolean("amqj.enableDirectBuffers"));
@@ -99,15 +94,18 @@
         {
             address = null;
 
-            if (_openSocket != null)
+            Socket socket = 
TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+            if (socket != null)
             {
-                _logger.info("Using existing Socket:" + _openSocket);
-                ((ExistingSocketConnector) 
ioConnector).setOpenSocket(_openSocket);
+                _logger.info("Using existing Socket:" + socket);
+
+                ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
             }
             else
             {
                 throw new IllegalArgumentException("Active Socket must be 
provided for broker " +
-                                                   "with 'socket' transport:" 
+ brokerDetail);
+                                                   "with 'socket://<SocketID>' 
transport:" + brokerDetail);
             }
         }
         else

Modified: 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=620876&r1=620875&r2=620876&view=diff
==============================================================================
--- 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Tue Feb 12 09:36:07 2008
@@ -37,6 +37,9 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
 
 /**
  * The TransportConnection is a helper class responsible for connecting to an 
AMQ server. It sets up the underlying
@@ -61,6 +64,18 @@
 
     private static final String DEFAULT_QPID_SERVER = 
"org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
 
+    private static Map<String, Socket> _openSocketRegister = new 
ConcurrentHashMap<String, Socket>();
+
+    public static void registerOpenSocket(String socketID, Socket openSocket)
+    {
+        _openSocketRegister.put(socketID, openSocket);
+    }
+
+    public static Socket removeOpenSocket(String socketID)
+    {
+        return _openSocketRegister.remove(socketID);
+    }
+
     public static ITransportConnection getInstance(BrokerDetails details) 
throws AMQTransportConnectionException
     {
         int transport = getTransport(details.getTransport());
@@ -305,7 +320,7 @@
         synchronized (_inVmPipeAddress)
         {
             _inVmPipeAddress.clear();
-        }        
+        }
         _acceptor = null;
         _currentInstance = -1;
         _currentVMPort = -1;


Reply via email to