Author: aidan
Date: Wed Feb 13 06:39:10 2008
New Revision: 627427

URL: http://svn.apache.org/viewvc?rev=627427&view=rev
Log:
Merged revisions 620767,620858,620876,627354,627416 via svnmerge from 
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1

........
  r620767 | ritchiem | 2008-02-12 11:29:19 +0000 (Tue, 12 Feb 2008) | 9 lines
  
  QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries 
to use as for connection.
  AMQBrokerDetails.java, BrokerDetails.java And ConnectionURLTest.java 
augmented to allow new transport type 'socket'
  New ExistingSocketConnector, which utises a given Socket() rather than 
creating its own from a SocketChannel. This code was taken from the Mina 
library v1.0.0.
  Changes to AMQConnection.java, SocketTransportConnection.java were required 
to allow the new Socket object to be passed through to the 
ExistingSocketConnector.
  The TransportConnection.java was updated to return an ExistingSocketConnector 
when the 'socket' transport is used. 
  AMQConnection.makeBrokerConnection was changed when the 'socket' transport is 
being used. This allows the set Socket to be passed down to the 
ExistingSocketConnector for the transport to be run over.
........
  r620858 | rgodfrey | 2008-02-12 16:44:59 +0000 (Tue, 12 Feb 2008) | 1 line
  
  QPID-787 : Allow for quoting of identifiers in selectors
........
  r620876 | ritchiem | 2008-02-12 17:36:07 +0000 (Tue, 12 Feb 2008) | 2 lines
  
  QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries 
to use as for connection.
  Modified based on review by Robert Godfrey due to Thread safety around 
SocketTransportConnection.java and TransportConnection.java. Now use a safe Map 
to store all registered sockets in the TransportConnection.java these are then 
removed as used or on request.
........
  r627354 | ritchiem | 2008-02-13 11:00:13 +0000 (Wed, 13 Feb 2008) | 1 line
  
  QPID-788 : Changed MAXIMUM_STATE_WAIT_TIME to pickup value via 
-Damqj.MaximumStateWait=90000 or default to 30000.
........
  r627416 | rgodfrey | 2008-02-13 14:01:26 +0000 (Wed, 13 Feb 2008) | 1 line
  
  QPID-789 : FieldTable putDataInBuffer method not thread safe
........

Added:
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/
      - copied from r627416, 
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
      - copied unchanged from r627416, 
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/
      - copied from r627416, 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/
      - copied from r627416, 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/
      - copied from r627416, 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/
      - copied from r627416, 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
      - copied unchanged from r627416, 
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
Modified:
    incubator/qpid/branches/thegreatmerge/qpid/   (props changed)
    
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
    
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java

Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
 Wed Feb 13 06:39:10 2008
@@ -172,6 +172,7 @@
 TOKEN [IGNORE_CASE] :
 {
     < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+    | < QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )*  "\""  >
 }
 
 // ----------------------------------------------------------------------------
@@ -589,6 +590,7 @@
 PropertyExpression variable() :
 {
     Token t;
+    StringBuffer rc = new StringBuffer();
     PropertyExpression left=null;
 }
 {
@@ -597,6 +599,21 @@
         {
             left = new PropertyExpression(t.image);
         }
+        |
+        t = <QUOTED_ID>
+        {
+            // Decode the sting value.
+            String image = t.image;
+            for( int i=1; i < image.length()-1; i++ ) {
+                char c = image.charAt(i);
+                if( c == '"' )
+                    i++;
+                rc.append(c);
+            }
+            return new PropertyExpression(rc.toString());
+        }
+
+
     )
     {
         return left;

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
 Wed Feb 13 06:39:10 2008
@@ -59,7 +59,8 @@
             {
                 //todo this list of valid transports should be enumerated 
somewhere
                 if ((!(transport.equalsIgnoreCase("vm") ||
-                       transport.equalsIgnoreCase("tcp"))))
+                       transport.equalsIgnoreCase("tcp") ||
+                       transport.equalsIgnoreCase("socket"))))
                 {
                     if (transport.equalsIgnoreCase("localhost"))
                     {
@@ -157,7 +158,10 @@
             }
             else
             {
-                setPort(port);
+                if (!_transport.equalsIgnoreCase(SOCKET))
+                {
+                    setPort(port);
+                }
             }
 
             String queryString = connection.getQuery();
@@ -264,13 +268,16 @@
         sb.append(_transport);
         sb.append("://");
 
-        if (!(_transport.equalsIgnoreCase("vm")))
+        if (!(_transport.equalsIgnoreCase(VM)))
         {
             sb.append(_host);
         }
 
-        sb.append(':');
-        sb.append(_port);
+        if (!(_transport.equalsIgnoreCase(SOCKET)))
+        {
+            sb.append(':');
+            sb.append(_port);
+        }
 
         sb.append(printOptionsURL());
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
 Wed Feb 13 06:39:10 2008
@@ -53,7 +53,7 @@
 
     private final CopyOnWriteArraySet _stateListeners = new 
CopyOnWriteArraySet();
     private final Object _stateLock = new Object();
-    private static final long MAXIMUM_STATE_WAIT_TIME = 30000L;
+    private static final long MAXIMUM_STATE_WAIT_TIME = 
Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
 
     public AMQStateManager()
     {

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
 Wed Feb 13 06:39:10 2008
@@ -24,6 +24,7 @@
 import org.apache.mina.common.ConnectFuture;
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
 import org.apache.mina.transport.socket.nio.SocketSessionConfig;
 
@@ -36,6 +37,9 @@
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class SocketTransportConnection implements ITransportConnection
 {
@@ -83,8 +87,34 @@
         _logger.info("send-buffer-size = " + scfg.getSendBufferSize());
         scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize", 
DEFAULT_BUFFER_SIZE));
         _logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
-        final InetSocketAddress address = new 
InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
-        _logger.info("Attempting connection to " + address);
+
+        final InetSocketAddress address;
+
+        if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+        {
+            address = null;
+
+            Socket socket = 
TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+            if (socket != null)
+            {
+                _logger.info("Using existing Socket:" + socket);
+
+                ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
+            }
+            else
+            {
+                throw new IllegalArgumentException("Active Socket must be 
provided for broker " +
+                                                   "with 'socket://<SocketID>' 
transport:" + brokerDetail);
+            }
+        }
+        else
+        {
+            address = new InetSocketAddress(brokerDetail.getHost(), 
brokerDetail.getPort());
+            _logger.info("Attempting connection to " + address);
+        }
+
+
         ConnectFuture future = ioConnector.connect(address, protocolHandler);
 
         // wait for connection to complete

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
 Wed Feb 13 06:39:10 2008
@@ -23,6 +23,7 @@
 import org.apache.mina.common.IoConnector;
 import org.apache.mina.common.IoHandlerAdapter;
 import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
 import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
 import org.apache.mina.transport.socket.nio.SocketConnector;
 import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -36,6 +37,9 @@
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
 
 /**
  * The TransportConnection is a helper class responsible for connecting to an 
AMQ server. It sets up the underlying
@@ -54,11 +58,24 @@
 
     private static final int TCP = 0;
     private static final int VM = 1;
+    private static final int SOCKET = 2;
 
     private static Logger _logger = 
LoggerFactory.getLogger(TransportConnection.class);
 
     private static final String DEFAULT_QPID_SERVER = 
"org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
 
+    private static Map<String, Socket> _openSocketRegister = new 
ConcurrentHashMap<String, Socket>();
+
+    public static void registerOpenSocket(String socketID, Socket openSocket)
+    {
+        _openSocketRegister.put(socketID, openSocket);
+    }
+
+    public static Socket removeOpenSocket(String socketID)
+    {
+        return _openSocketRegister.remove(socketID);
+    }
+
     public static ITransportConnection getInstance(BrokerDetails details) 
throws AMQTransportConnectionException
     {
         int transport = getTransport(details.getTransport());
@@ -87,7 +104,15 @@
 
         switch (transport)
         {
-
+            case SOCKET:
+                _instance = new SocketTransportConnection(new 
SocketTransportConnection.SocketConnectorFactory()
+                {
+                    public IoConnector newSocketConnector()
+                    {
+                        return new ExistingSocketConnector();
+                    }
+                });
+                break;
             case TCP:
                 _instance = new SocketTransportConnection(new 
SocketTransportConnection.SocketConnectorFactory()
                 {
@@ -127,6 +152,11 @@
 
     private static int getTransport(String transport)
     {
+        if (transport.equals(BrokerDetails.SOCKET))
+        {
+            return SOCKET;
+        }
+
         if (transport.equals(BrokerDetails.TCP))
         {
             return TCP;
@@ -294,7 +324,7 @@
         synchronized (_inVmPipeAddress)
         {
             _inVmPipeAddress.clear();
-        }        
+        }
         _acceptor = null;
         _currentInstance = -1;
         _currentVMPort = -1;

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
 Wed Feb 13 06:39:10 2008
@@ -36,6 +36,7 @@
     public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
     public static final int DEFAULT_PORT = 5672;
 
+    public static final String SOCKET = "socket";
     public static final String TCP = "tcp";
     public static final String VM = "vm";
 

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
 Wed Feb 13 06:39:10 2008
@@ -71,7 +71,7 @@
         connection.start();
 
         String selector = null;
-        // selector = "Cost = 2 AND JMSDeliveryMode=" + 
DeliveryMode.NON_PERSISTENT;
+         selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
         // selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND 
JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
 
         _session = (AMQSession) connection.createSession(false, 
AMQSession.NO_ACKNOWLEDGE);
@@ -86,6 +86,7 @@
             Message msg = _session.createTextMessage("Message");
             msg.setJMSPriority(1);
             msg.setIntProperty("Cost", 2);
+            msg.setStringProperty("property-with-hyphen","wibble");
             msg.setJMSType("Special");
 
             _logger.info("Sending Message:" + msg);

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
 Wed Feb 13 06:39:10 2008
@@ -473,6 +473,23 @@
 
     }
 
+    public void testSocketProtocol() throws URLSyntaxException
+    {
+        String url = "amqp://guest:[EMAIL PROTECTED]/test" + 
"?brokerlist='socket:///'";
+
+        try
+        {
+            AMQConnectionURL curl = new AMQConnectionURL(url);
+            assertNotNull(curl);
+            assertEquals(1, curl.getBrokerCount());
+            assertNotNull(curl.getBrokerDetails(0));
+            assertEquals("socket", curl.getBrokerDetails(0).getTransport());
+        }
+        catch (URLSyntaxException e)
+        {
+            fail(e.getMessage());
+        }
+    }
 
     public static junit.framework.Test suite()
     {

Modified: 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
--- 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 (original)
+++ 
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
 Wed Feb 13 06:39:10 2008
@@ -898,13 +898,15 @@
         if (_encodedForm != null)
         {
 
-            if (_encodedForm.position() != 0)
+            ByteBuffer encodedForm = _encodedForm.duplicate();
+
+            if (encodedForm.position() != 0)
             {
-                _encodedForm.flip();
+                encodedForm.flip();
             }
             // _encodedForm.limit((int)getEncodedSize());
 
-            buffer.put(_encodedForm);
+            buffer.put(encodedForm);
         }
         else if (_properties != null)
         {


Reply via email to