Author: aidan
Date: Wed Feb 13 06:39:10 2008
New Revision: 627427
URL: http://svn.apache.org/viewvc?rev=627427&view=rev
Log:
Merged revisions 620767,620858,620876,627354,627416 via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r620767 | ritchiem | 2008-02-12 11:29:19 +0000 (Tue, 12 Feb 2008) | 9 lines
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries
to use as for connection.
AMQBrokerDetails.java, BrokerDetails.java And ConnectionURLTest.java
augmented to allow new transport type 'socket'
New ExistingSocketConnector, which utises a given Socket() rather than
creating its own from a SocketChannel. This code was taken from the Mina
library v1.0.0.
Changes to AMQConnection.java, SocketTransportConnection.java were required
to allow the new Socket object to be passed through to the
ExistingSocketConnector.
The TransportConnection.java was updated to return an ExistingSocketConnector
when the 'socket' transport is used.
AMQConnection.makeBrokerConnection was changed when the 'socket' transport is
being used. This allows the set Socket to be passed down to the
ExistingSocketConnector for the transport to be run over.
........
r620858 | rgodfrey | 2008-02-12 16:44:59 +0000 (Tue, 12 Feb 2008) | 1 line
QPID-787 : Allow for quoting of identifiers in selectors
........
r620876 | ritchiem | 2008-02-12 17:36:07 +0000 (Tue, 12 Feb 2008) | 2 lines
QPID-784 : Added ability to provide existing Socket to Qpid Client Libraries
to use as for connection.
Modified based on review by Robert Godfrey due to Thread safety around
SocketTransportConnection.java and TransportConnection.java. Now use a safe Map
to store all registered sockets in the TransportConnection.java these are then
removed as used or on request.
........
r627354 | ritchiem | 2008-02-13 11:00:13 +0000 (Wed, 13 Feb 2008) | 1 line
QPID-788 : Changed MAXIMUM_STATE_WAIT_TIME to pickup value via
-Damqj.MaximumStateWait=90000 or default to 30000.
........
r627416 | rgodfrey | 2008-02-13 14:01:26 +0000 (Wed, 13 Feb 2008) | 1 line
QPID-789 : FieldTable putDataInBuffer method not thread safe
........
Added:
incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/
- copied from r627416,
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/
incubator/qpid/branches/thegreatmerge/qpid/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
- copied unchanged from r627416,
incubator/qpid/branches/M2.1/java/client/example/src/main/java/org/apache/qpid/example/transport/ExistingSocketConnectorDemo.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/
- copied from r627416,
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/
- copied from r627416,
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/
- copied from r627416,
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/
- copied from r627416,
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
- copied unchanged from r627416,
incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/mina/transport/socket/nio/ExistingSocketConnector.java
Modified:
incubator/qpid/branches/thegreatmerge/qpid/ (props changed)
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Propchange: incubator/qpid/branches/thegreatmerge/qpid/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/broker/src/main/grammar/SelectorParser.jj
Wed Feb 13 06:39:10 2008
@@ -172,6 +172,7 @@
TOKEN [IGNORE_CASE] :
{
< ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* >
+ | < QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )* "\"" >
}
// ----------------------------------------------------------------------------
@@ -589,6 +590,7 @@
PropertyExpression variable() :
{
Token t;
+ StringBuffer rc = new StringBuffer();
PropertyExpression left=null;
}
{
@@ -597,6 +599,21 @@
{
left = new PropertyExpression(t.image);
}
+ |
+ t = <QUOTED_ID>
+ {
+ // Decode the sting value.
+ String image = t.image;
+ for( int i=1; i < image.length()-1; i++ ) {
+ char c = image.charAt(i);
+ if( c == '"' )
+ i++;
+ rc.append(c);
+ }
+ return new PropertyExpression(rc.toString());
+ }
+
+
)
{
return left;
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/AMQBrokerDetails.java
Wed Feb 13 06:39:10 2008
@@ -59,7 +59,8 @@
{
//todo this list of valid transports should be enumerated
somewhere
if ((!(transport.equalsIgnoreCase("vm") ||
- transport.equalsIgnoreCase("tcp"))))
+ transport.equalsIgnoreCase("tcp") ||
+ transport.equalsIgnoreCase("socket"))))
{
if (transport.equalsIgnoreCase("localhost"))
{
@@ -157,7 +158,10 @@
}
else
{
- setPort(port);
+ if (!_transport.equalsIgnoreCase(SOCKET))
+ {
+ setPort(port);
+ }
}
String queryString = connection.getQuery();
@@ -264,13 +268,16 @@
sb.append(_transport);
sb.append("://");
- if (!(_transport.equalsIgnoreCase("vm")))
+ if (!(_transport.equalsIgnoreCase(VM)))
{
sb.append(_host);
}
- sb.append(':');
- sb.append(_port);
+ if (!(_transport.equalsIgnoreCase(SOCKET)))
+ {
+ sb.append(':');
+ sb.append(_port);
+ }
sb.append(printOptionsURL());
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
Wed Feb 13 06:39:10 2008
@@ -53,7 +53,7 @@
private final CopyOnWriteArraySet _stateListeners = new
CopyOnWriteArraySet();
private final Object _stateLock = new Object();
- private static final long MAXIMUM_STATE_WAIT_TIME = 30000L;
+ private static final long MAXIMUM_STATE_WAIT_TIME =
Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
public AMQStateManager()
{
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/SocketTransportConnection.java
Wed Feb 13 06:39:10 2008
@@ -24,6 +24,7 @@
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnectorConfig;
import org.apache.mina.transport.socket.nio.SocketSessionConfig;
@@ -36,6 +37,9 @@
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class SocketTransportConnection implements ITransportConnection
{
@@ -83,8 +87,34 @@
_logger.info("send-buffer-size = " + scfg.getSendBufferSize());
scfg.setReceiveBufferSize(Integer.getInteger("amqj.receiveBufferSize",
DEFAULT_BUFFER_SIZE));
_logger.info("recv-buffer-size = " + scfg.getReceiveBufferSize());
- final InetSocketAddress address = new
InetSocketAddress(brokerDetail.getHost(), brokerDetail.getPort());
- _logger.info("Attempting connection to " + address);
+
+ final InetSocketAddress address;
+
+ if (brokerDetail.getTransport().equals(BrokerDetails.SOCKET))
+ {
+ address = null;
+
+ Socket socket =
TransportConnection.removeOpenSocket(brokerDetail.getHost());
+
+ if (socket != null)
+ {
+ _logger.info("Using existing Socket:" + socket);
+
+ ((ExistingSocketConnector) ioConnector).setOpenSocket(socket);
+ }
+ else
+ {
+ throw new IllegalArgumentException("Active Socket must be
provided for broker " +
+ "with 'socket://<SocketID>'
transport:" + brokerDetail);
+ }
+ }
+ else
+ {
+ address = new InetSocketAddress(brokerDetail.getHost(),
brokerDetail.getPort());
+ _logger.info("Attempting connection to " + address);
+ }
+
+
ConnectFuture future = ioConnector.connect(address, protocolHandler);
// wait for connection to complete
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
Wed Feb 13 06:39:10 2008
@@ -23,6 +23,7 @@
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoHandlerAdapter;
import org.apache.mina.common.IoServiceConfig;
+import org.apache.mina.transport.socket.nio.ExistingSocketConnector;
import org.apache.mina.transport.socket.nio.MultiThreadSocketConnector;
import org.apache.mina.transport.socket.nio.SocketConnector;
import org.apache.mina.transport.vmpipe.VmPipeAcceptor;
@@ -36,6 +37,9 @@
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.net.Socket;
+
/**
* The TransportConnection is a helper class responsible for connecting to an
AMQ server. It sets up the underlying
@@ -54,11 +58,24 @@
private static final int TCP = 0;
private static final int VM = 1;
+ private static final int SOCKET = 2;
private static Logger _logger =
LoggerFactory.getLogger(TransportConnection.class);
private static final String DEFAULT_QPID_SERVER =
"org.apache.qpid.server.protocol.AMQPFastProtocolHandler";
+ private static Map<String, Socket> _openSocketRegister = new
ConcurrentHashMap<String, Socket>();
+
+ public static void registerOpenSocket(String socketID, Socket openSocket)
+ {
+ _openSocketRegister.put(socketID, openSocket);
+ }
+
+ public static Socket removeOpenSocket(String socketID)
+ {
+ return _openSocketRegister.remove(socketID);
+ }
+
public static ITransportConnection getInstance(BrokerDetails details)
throws AMQTransportConnectionException
{
int transport = getTransport(details.getTransport());
@@ -87,7 +104,15 @@
switch (transport)
{
-
+ case SOCKET:
+ _instance = new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
+ break;
case TCP:
_instance = new SocketTransportConnection(new
SocketTransportConnection.SocketConnectorFactory()
{
@@ -127,6 +152,11 @@
private static int getTransport(String transport)
{
+ if (transport.equals(BrokerDetails.SOCKET))
+ {
+ return SOCKET;
+ }
+
if (transport.equals(BrokerDetails.TCP))
{
return TCP;
@@ -294,7 +324,7 @@
synchronized (_inVmPipeAddress)
{
_inVmPipeAddress.clear();
- }
+ }
_acceptor = null;
_currentInstance = -1;
_currentVMPort = -1;
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/BrokerDetails.java
Wed Feb 13 06:39:10 2008
@@ -36,6 +36,7 @@
public static final String OPTIONS_CONNECT_DELAY = "connectdelay";
public static final int DEFAULT_PORT = 5672;
+ public static final String SOCKET = "socket";
public static final String TCP = "tcp";
public static final String VM = "vm";
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/SelectorTest.java
Wed Feb 13 06:39:10 2008
@@ -71,7 +71,7 @@
connection.start();
String selector = null;
- // selector = "Cost = 2 AND JMSDeliveryMode=" +
DeliveryMode.NON_PERSISTENT;
+ selector = "Cost = 2 AND \"property-with-hyphen\" = 'wibble'";
// selector = "JMSType = Special AND Cost = 2 AND AMQMessageID > 0 AND
JMSDeliveryMode=" + DeliveryMode.NON_PERSISTENT;
_session = (AMQSession) connection.createSession(false,
AMQSession.NO_ACKNOWLEDGE);
@@ -86,6 +86,7 @@
Message msg = _session.createTextMessage("Message");
msg.setJMSPriority(1);
msg.setIntProperty("Cost", 2);
+ msg.setStringProperty("property-with-hyphen","wibble");
msg.setJMSType("Special");
_logger.info("Sending Message:" + msg);
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
Wed Feb 13 06:39:10 2008
@@ -473,6 +473,23 @@
}
+ public void testSocketProtocol() throws URLSyntaxException
+ {
+ String url = "amqp://guest:[EMAIL PROTECTED]/test" +
"?brokerlist='socket:///'";
+
+ try
+ {
+ AMQConnectionURL curl = new AMQConnectionURL(url);
+ assertNotNull(curl);
+ assertEquals(1, curl.getBrokerCount());
+ assertNotNull(curl.getBrokerDetails(0));
+ assertEquals("socket", curl.getBrokerDetails(0).getTransport());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ }
public static junit.framework.Test suite()
{
Modified:
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=627427&r1=627426&r2=627427&view=diff
==============================================================================
---
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
(original)
+++
incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
Wed Feb 13 06:39:10 2008
@@ -898,13 +898,15 @@
if (_encodedForm != null)
{
- if (_encodedForm.position() != 0)
+ ByteBuffer encodedForm = _encodedForm.duplicate();
+
+ if (encodedForm.position() != 0)
{
- _encodedForm.flip();
+ encodedForm.flip();
}
// _encodedForm.limit((int)getEncodedSize());
- buffer.put(_encodedForm);
+ buffer.put(encodedForm);
}
else if (_properties != null)
{