Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java Tue Jan 16 04:13:19 2007 @@ -21,12 +21,12 @@ package org.apache.qpid.server.queue; import java.util.List; +import java.util.LinkedList; import java.util.concurrent.CopyOnWriteArrayList; /** * Distributes messages among a list of subsscription managers, using their * weighting. - * */ class NestedSubscriptionManager implements SubscriptionManager { @@ -44,11 +44,24 @@ _subscribers.remove(s); } + + public List<Subscription> getSubscriptions() + { + List<Subscription> allSubs = new LinkedList<Subscription>(); + + for (WeightedSubscriptionManager subMans : _subscribers) + { + allSubs.addAll(subMans.getSubscriptions()); + } + + return allSubs; + } + public boolean hasActiveSubscribers() { - for(WeightedSubscriptionManager s : _subscribers) + for (WeightedSubscriptionManager s : _subscribers) { - if(s.hasActiveSubscribers()) + if (s.hasActiveSubscribers()) { return true; } @@ -59,9 +72,9 @@ public Subscription nextSubscriber(AMQMessage msg) { WeightedSubscriptionManager start = current(); - for(WeightedSubscriptionManager s = start; s != null; s = next(start)) + for (WeightedSubscriptionManager s = start; s != null; s = next(start)) { - if(hasMore(s)) + if (hasMore(s)) { return nextSubscriber(s); } @@ -94,7 +107,7 @@ private WeightedSubscriptionManager next() { _iterations = 0; - if(++_index >= _subscribers.size()) + if (++_index >= _subscribers.size()) { _index = 0; }
Modified: incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java Tue Jan 16 04:13:19 2007 @@ -25,6 +25,9 @@ import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.AMQException; +import java.util.Queue; +import java.util.List; + class RemoteSubscriptionImpl implements Subscription, WeightedSubscriptionManager { private final GroupManager _groupMgr; @@ -76,6 +79,11 @@ return _count; } + public List<Subscription> getSubscriptions() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public boolean hasActiveSubscribers() { return getWeight() == 0; @@ -88,9 +96,49 @@ public void queueDeleted(AMQQueue queue) { - if(queue instanceof ClusteredQueue) + if (queue instanceof ClusteredQueue) { ((ClusteredQueue) queue).removeAllRemoteSubscriber(_peer); } + } + + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op -- if selectors are implemented on RemoteSubscriptions then look at SubscriptionImpl + } + + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public void sendNextMessage(AMQQueue queue) + { + } } Modified: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/framing/PropertyFieldTable.java Tue Jan 16 04:13:19 2007 @@ -129,7 +129,7 @@ } catch (Exception e) { - _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat, e); + _logger.warn("Unable to decode PropertyFieldTable format:" + textFormat); throw new IllegalArgumentException("Unable to decode PropertyFieldTable format:" + textFormat); } } @@ -483,7 +483,7 @@ { return _properties.containsKey(name) && (_properties.get(name) == null) && _propertyNamesTypeMap.get(name).equals(Prefix.AMQP_NULL_STRING_PROPERTY_PREFIX); - + } @@ -606,7 +606,8 @@ // AMQ start character if (!(Character.isLetter(propertyName.charAt(0)) || propertyName.charAt(0) == '$' - || propertyName.charAt(0) == '#')) + || propertyName.charAt(0) == '#' + || propertyName.charAt(0) == '_')) // Not official AMQP added for JMS. { throw new IllegalArgumentException("Identifier '" + propertyName + "' does not start with a valid AMQP start character"); } @@ -1156,9 +1157,9 @@ if (type == null) { String msg = "Field '" + key + "' - unsupported field table type: " + type + "."; - //some extra trace information... - msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; - throw new AMQFrameDecodingException(msg); + //some extra trace information... + msg += " (" + iType + "), length=" + length + ", sizeRead=" + sizeRead + ", sizeRemaining=" + sizeRemaining; + throw new AMQFrameDecodingException(msg); } Object value; @@ -1203,7 +1204,7 @@ value = EncodingUtils.readBytes(buffer); break; default: - String msg = "Internal error, the following type identifier is not handled: " + type; + String msg = "Internal error, the following type identifier is not handled: " + type; throw new AMQFrameDecodingException(msg); } Modified: incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Tue Jan 16 04:13:19 2007 @@ -55,7 +55,7 @@ { return _name; } - + public static final AMQConstant FRAME_MIN_SIZE = new AMQConstant(4096, "frame min size", true); public static final AMQConstant FRAME_END = new AMQConstant(206, "frame end", true); @@ -73,6 +73,8 @@ public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true); public static final AMQConstant CONTEXT_UNKNOWN = new AMQConstant(321, "context unknown", true); + + public static final AMQConstant INVALID_SELECTOR = new AMQConstant(322, "selector invalid", true); public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true); Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Jan 16 04:13:19 2007 @@ -43,8 +43,6 @@ TestQueue q6 = bindDefault("F0000=Aardvark", "F0001=Bear"); TestQueue q7 = bindDefault("F0000", "F0001=Bear"); TestQueue q8 = bindDefault("F0000=Aardvark", "F0001"); - TestQueue q9 = bindDefault("F0000=Apple", "F0001=Banana"); - TestQueue q10 = bindDefault("F0000=Apple", "F0001"); routeAndTest(new Message("Message1", "F0000"), q1); routeAndTest(new Message("Message2", "F0000=Aardvark"), q1, q2); @@ -74,7 +72,6 @@ TestQueue q2 = bindDefault("F0000=Aardvark", "F0001=Bear", "X-match=any"); TestQueue q3 = bindDefault("F0000", "F0001=Bear", "X-match=any"); TestQueue q4 = bindDefault("F0000=Aardvark", "F0001", "X-match=any"); - TestQueue q5 = bindDefault("F0000=Apple", "F0001=Banana", "X-match=any"); TestQueue q6 = bindDefault("F0000=Apple", "F0001", "X-match=any"); routeAndTest(new Message("Message1", "F0000"), q1, q3); @@ -87,16 +84,14 @@ public void testMandatory() throws AMQException { - TestQueue q1 = bindDefault("F0000"); + bindDefault("F0000"); Message m1 = new Message("Message1", "XXXXX"); Message m2 = new Message("Message2", "F0000"); BasicPublishBody pb1 = m1.getPublishBody(); pb1.mandatory = true; - BasicPublishBody pb2 = m1.getPublishBody(); + BasicPublishBody pb2 = m2.getPublishBody(); pb2.mandatory = true; routeAndTest(m1,true); - - } public static junit.framework.Test suite() Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue Jan 16 04:13:19 2007 @@ -5,8 +5,8 @@ import org.apache.qpid.test.VMBrokerSetup; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.util.TestApplicationRegistry; -import org.apache.qpid.server.store.TestableMemoryMessageStore; import org.apache.qpid.client.*; +import org.apache.qpid.client.transport.TransportConnection; import org.apache.qpid.url.AMQBindingURL; import org.apache.qpid.url.BindingURL; import org.apache.qpid.exchange.ExchangeDefaults; @@ -39,12 +39,14 @@ protected void setUp() throws Exception { super.setUp(); + TransportConnection.createVMBroker(1); ApplicationRegistry.initialise(new TestApplicationRegistry(), 1); } protected void tearDown() throws Exception { super.tearDown(); + TransportConnection.killAllVMBrokers(); } /** @@ -57,17 +59,14 @@ _bouncedMessageList.clear(); Connection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "/test"); - TestableMemoryMessageStore store = (TestableMemoryMessageStore) ApplicationRegistry.getInstance().getMessageStore(); AMQSession consumerSession = (AMQSession) con.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS+"://"+ExchangeDefaults.HEADERS_EXCHANGE_NAME+"/test/queue1?"+ BindingURL.OPTION_ROUTING_KEY+"='F0000=1'")); + AMQHeadersExchange queue = new AMQHeadersExchange(new AMQBindingURL(ExchangeDefaults.HEADERS_EXCHANGE_CLASS + "://" + ExchangeDefaults.HEADERS_EXCHANGE_NAME + "/test/queue1?" + BindingURL.OPTION_ROUTING_KEY + "='F0000=1'")); FieldTable ft = new PropertyFieldTable(); - ft.setString("F1000","1"); - MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String)null, ft); + ft.setString("F1000", "1"); + MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft); - //force synch to ensure the consumer has resulted in a bound queue ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS); @@ -81,49 +80,45 @@ con2.start(); - MessageProducer nonMandatoryProducer = producerSession.createProducer(queue,false,false); + MessageProducer nonMandatoryProducer = producerSession.createProducer(queue, false, false); MessageProducer mandatoryProducer = producerSession.createProducer(queue); - // First test - should neither be bounced nor routed _logger.info("Sending non-routable non-mandatory message"); - TextMessage msg1 = producerSession.createTextMessage("msg1"); + TextMessage msg1 = producerSession.createTextMessage("msg1"); nonMandatoryProducer.send(msg1); // Second test - should be bounced _logger.info("Sending non-routable mandatory message"); - TextMessage msg2 = producerSession.createTextMessage("msg2"); + TextMessage msg2 = producerSession.createTextMessage("msg2"); mandatoryProducer.send(msg2); // Third test - should be routed _logger.info("Sending routable message"); - TextMessage msg3 = producerSession.createTextMessage("msg3"); - msg3.setStringProperty("F1000","1"); + TextMessage msg3 = producerSession.createTextMessage("msg3"); + msg3.setStringProperty("F1000", "1"); mandatoryProducer.send(msg3); - _logger.info("Starting consumer connection"); con.start(); TextMessage tm = (TextMessage) consumer.receive(1000L); - assertTrue("No message routed to receiver",tm != null); - assertTrue("Wrong message routed to receiver: "+tm.getText(),"msg3".equals(tm.getText())); + assertTrue("No message routed to receiver", tm != null); + assertTrue("Wrong message routed to receiver: " + tm.getText(), "msg3".equals(tm.getText())); try { Thread.sleep(1000L); } - catch(InterruptedException e) + catch (InterruptedException e) { ; } - assertTrue("Wrong number of messages bounced (expect 1): "+_bouncedMessageList.size(),_bouncedMessageList.size()==1); + assertTrue("Wrong number of messages bounced (expect 1): " + _bouncedMessageList.size(), _bouncedMessageList.size() == 1); Message m = _bouncedMessageList.get(0); - assertTrue("Wrong message bounced: "+m.toString(),m.toString().contains("msg2")); - - + assertTrue("Wrong message bounced: " + m.toString(), m.toString().contains("msg2")); con.close(); @@ -134,18 +129,23 @@ public static junit.framework.Test suite() { - return new VMBrokerSetup(new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class)); + return new junit.framework.TestSuite(ReturnUnroutableMandatoryMessageTest.class); } public void onException(JMSException jmsException) { - _logger.warn("Caught exception on producer: ",jmsException); + Exception linkedException = jmsException.getLinkedException(); - if(linkedException instanceof AMQNoRouteException) + if (linkedException instanceof AMQNoRouteException) { AMQNoRouteException noRoute = (AMQNoRouteException) linkedException; Message bounced = (Message) noRoute.getUndeliveredMessage(); _bouncedMessageList.add(bounced); + _logger.info("Caught expected NoRouteException"); + } + else + { + _logger.warn("Caught exception on producer: ", jmsException); } } } Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/protocol/MockIoSession.java Tue Jan 16 04:13:19 2007 @@ -25,6 +25,7 @@ import org.apache.mina.common.support.DefaultWriteFuture; import java.net.SocketAddress; +import java.net.InetSocketAddress; import java.util.Set; public class MockIoSession implements IoSession @@ -151,7 +152,7 @@ public SocketAddress getRemoteAddress() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return new InetSocketAddress("127.0.0.1", 1234); //To change body of implemented methods use File | Settings | File Templates. } public SocketAddress getLocalAddress() Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/MockProtocolSession.java Tue Jan 16 04:13:19 2007 @@ -22,6 +22,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQDataBlock; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.store.MessageStore; @@ -119,6 +120,15 @@ } public void setSaslServer(SaslServer saslServer) + { + } + + public FieldTable getClientProperties() + { + return null; + } + + public void setClientProperties(FieldTable clientProperties) { } } Modified: incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java (original) +++ incubator/qpid/branches/perftesting/qpid/java/systests/src/test/java/org/apache/qpid/server/queue/SubscriptionTestHelper.java Tue Jan 16 04:13:19 2007 @@ -22,6 +22,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.Queue; public class SubscriptionTestHelper implements Subscription { @@ -68,6 +69,41 @@ public void queueDeleted(AMQQueue queue) { + } + + public boolean hasFilters() + { + return false; + } + + public boolean hasInterest(AMQMessage msg) + { + return true; + } + + public Queue<AMQMessage> getPreDeliveryQueue() + { + return null; + } + + public void enqueueForPreDelivery(AMQMessage msg) + { + //no-op + } + + public boolean isAutoClose() + { + return false; + } + + public void close() + { + //no-op + } + + public boolean isBrowser() + { + return false; //To change body of implemented methods use File | Settings | File Templates. } public int hashCode() Modified: incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml?view=diff&rev=496666&r1=496665&r2=496666 ============================================================================== --- incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml (original) +++ incubator/qpid/branches/perftesting/qpid/specs/amqp-8.0.xml Tue Jan 16 04:13:19 2007 @@ -2105,6 +2105,14 @@ method it will raise a channel or connection exception. </doc> </field> + + <field name="arguments" type="table" label="arguments for consuming"> + <doc> + A set of arguments for the consume. The syntax and semantics + of these arguments depends on the server implementation. This + field is ignored if passive is 1. + </doc> + </field> </method> <method name = "consume-ok" synchronous = "1" index = "21">
