Author: ritchiem
Date: Sun Dec 17 08:23:38 2006
New Revision: 488002
URL: http://svn.apache.org/viewvc?view=rev&rev=488002
Log:
AMQMessage.java - added take/release
AMQQueue.java - enabled parameterisation of DeliveryManager
Subscription.java - added PreDeliveryQueues
SubscriptionImpl.java - Implementeted PDQs and added Selector/filter hooks
SubscriptionManager.java - added ability to get all Subscriptions
SubscriptionSet.java - updated getNextSub*Impl to take filters in to account.
SynchronizedDeliveryManager.java - Fixed Logger Class
AMQSession.java - Fieldtable Changes
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Sun Dec 17 08:23:38 2006
@@ -35,6 +35,7 @@
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.ConcurrentHashMap;
/**
@@ -89,6 +90,7 @@
*/
private boolean _deliveredToConsumer;
private ConcurrentHashMap<String, MessageDecorator> _decodedMessages;
+ private AtomicBoolean _taken;
public AMQMessage(MessageStore messageStore, BasicPublishBody publishBody)
@@ -104,6 +106,7 @@
_contentBodies = new LinkedList<ContentBody>();
_decodedMessages = new ConcurrentHashMap<String, MessageDecorator>();
_storeWhenComplete = storeWhenComplete;
+ _taken = new AtomicBoolean(false);
}
public AMQMessage(MessageStore store, long messageId, BasicPublishBody
publishBody,
@@ -371,12 +374,22 @@
/**
* Called when this message is delivered to a consumer. (used to
* implement the 'immediate' flag functionality).
+ * And by selectors to determin if the message has already been sent
*/
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
}
+ /**
+ * Called selectors to determin if the message has already been sent
+ * @return _deliveredToConsumer
+ */
+ public boolean getDeliveredToConsumer()
+ {
+ return _deliveredToConsumer;
+ }
+
public MessageDecorator getDecodedMessage(String type)
{
@@ -410,5 +423,15 @@
}
return msgdec;
+ }
+
+ public boolean taken()
+ {
+ return _taken.getAndSet(true);
+ }
+
+ public void release()
+ {
+ _taken.set(false);
}
}
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
Sun Dec 17 08:23:38 2006
@@ -46,6 +46,7 @@
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Properties;
import java.util.concurrent.Executor;
/**
@@ -518,16 +519,30 @@
_subscriptionFactory = subscriptionFactory;
//fixme - Pick one.
- if (Boolean.getBoolean("concurrentdeliverymanager"))
+ if (System.getProperties().getProperty("deliverymanager") != null)
{
- _logger.info("Using ConcurrentDeliveryManager");
- _deliveryMgr = new ConcurrentDeliveryManager(_subscribers, this);
+ if
(System.getProperties().getProperty("deliverymanager").equals("ConcurrentSelectorDeliveryManager"))
+ {
+ _logger.warn("Using ConcurrentSelectorDeliveryManager");
+ _deliveryMgr = new
ConcurrentSelectorDeliveryManager(_subscribers, this);
+ }
+ else if
(System.getProperties().getProperty("deliverymanager").equals("ConcurrentDeliveryManager"))
+ {
+ _logger.warn("Using ConcurrentDeliveryManager");
+ _deliveryMgr = new ConcurrentDeliveryManager(_subscribers,
this);
+ }
+ else
+ {
+ _logger.warn("Using SynchronizedDeliveryManager");
+ _deliveryMgr = new SynchronizedDeliveryManager(_subscribers,
this);
+ }
}
else
{
- _logger.info("Using SynchronizedDeliveryManager");
+ _logger.warn("Using SynchronizedDeliveryManager");
_deliveryMgr = new SynchronizedDeliveryManager(_subscribers, this);
}
+
}
private AMQQueueMBean createMBean() throws AMQException
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
Sun Dec 17 08:23:38 2006
@@ -23,6 +23,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.filter.MessageFilter;
+import java.util.Queue;
+
public interface Subscription
{
void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
@@ -34,4 +36,9 @@
boolean hasFilters();
boolean hasInterest(AMQMessage msg);
+
+ Queue<AMQMessage> getPreDeliveryQueue();
+
+ void enqueueForPreDelivery(AMQMessage msg);
+
}
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
Sun Dec 17 08:23:38 2006
@@ -23,6 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.BasicDeliverBody;
@@ -32,6 +33,8 @@
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import java.util.Queue;
+
/**
* Encapsulation of a supscription to a queue.
* <p/>
@@ -51,6 +54,9 @@
private final Object sessionKey;
+ private Queue<AMQMessage> _messages;
+
+
/**
* True if messages need to be acknowledged
*/
@@ -100,6 +106,16 @@
sessionKey = protocolSession.getKey();
_acks = acks;
_filters = FilterManagerFactory.createManager(filters);
+
+ if (_filters != null)
+ {
+ _messages = new ConcurrentLinkedQueueAtomicSize<AMQMessage>();
+ }
+ else
+ {
+ // Reference the DeliveryManager
+ _messages = null;
+ }
}
@@ -198,6 +214,22 @@
{
return _filters.allAllow(msg);
}
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return _messages;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ if (_messages != null)
+ {
+ _messages.offer(msg);
+ }
+ }
+
+
+
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String
routingKey, String exchange)
{
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionManager.java
Sun Dec 17 08:23:38 2006
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.server.queue;
+import java.util.List;
+
/**
* Abstraction of actor that will determine the subscriber to whom
* a message will be sent.
*/
public interface SubscriptionManager
{
+ public List<Subscription> getSubscriptions();
public boolean hasActiveSubscribers();
public Subscription nextSubscriber(AMQMessage msg);
}
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
Sun Dec 17 08:23:38 2006
@@ -139,30 +139,27 @@
if (!subscription.isSuspended())
{
-
- if ((!subscription.hasFilters()) || (subscription.hasFilters()
&& subscription.hasInterest(msg)))
+ if (!subscription.hasFilters())
{
return subscription;
}
- // 2006-12-04 : It is fairer to simply skip the person who
isn't interested.
- // Although it does need to be looked at again.
-
-// else
-// {
-// //Don't take penalise a subscriber for not wanting this
message.
-// // This would introduce unfairness sticking with the
current subscriber
-// // will allow the next message to match.. although could
lead to unfairness if:
-// // subscribers: a(bin) b(text) c(text)
-// // msgs : 1(text) 2(text) 3(bin)
-// // subscriber c won't get any messages. as the first two
text msgs will go to b and then a will get
-// // the bin msg.
-// // Never said this was fair round-robin-ing.
-// //FIXME - Make a fair round robin.
-//
-// --_currentSubscriber;
-// }
+ else
+ {
+ if (subscription.hasInterest(msg))
+ {
+ // if the queue is not empty then this client is ready
to receive a message.
+ //FIXME the queue could be full of sent messages.
+ // Either need to clean all PDQs after sending a
message
+ // OR have a clean up thread that runs the PDQs
expunging the messages.
+ if (subscription.getPreDeliveryQueue().isEmpty())
+ {
+ return subscription;
+ }
+ }
+ }
}
}
+
return null;
}
@@ -176,6 +173,11 @@
public boolean isEmpty()
{
return _subscriptions.isEmpty();
+ }
+
+ public List<Subscription> getSubscriptions()
+ {
+ return _subscriptions;
}
public boolean hasActiveSubscribers()
Modified:
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
Sun Dec 17 08:23:38 2006
@@ -35,7 +35,7 @@
*/
class SynchronizedDeliveryManager implements DeliveryManager
{
- private static final Logger _log =
Logger.getLogger(ConcurrentDeliveryManager.class);
+ private static final Logger _log =
Logger.getLogger(SynchronizedDeliveryManager.class);
/**
* Holds any queued messages
Modified:
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Sun Dec 17 08:23:38 2006
@@ -942,7 +942,7 @@
//need to generate a consumer tag on the client so we can exploit the
nowait flag
String tag = Integer.toString(_nextTag++);
- FieldTable ft = new FieldTable();
+ FieldTable ft = FieldTableFactory.newFieldTable();
if (messageSelector != null)
{
Modified:
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/ClusteredSubscriptionManager.java
Sun Dec 17 08:23:38 2006
@@ -23,6 +23,8 @@
import org.apache.log4j.Logger;
import org.apache.qpid.server.cluster.util.LogMessage;
+import java.util.List;
+
class ClusteredSubscriptionManager extends SubscriptionSet
{
private static final Logger _logger =
Logger.getLogger(ClusteredSubscriptionManager.class);
@@ -80,6 +82,11 @@
public int getWeight()
{
return ClusteredSubscriptionManager.this.getWeight();
+ }
+
+ public List<Subscription> getSubscriptions()
+ {
+ return ClusteredSubscriptionManager.super.getSubscriptions();
}
public boolean hasActiveSubscribers()
Modified:
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/NestedSubscriptionManager.java
Sun Dec 17 08:23:38 2006
@@ -21,12 +21,12 @@
package org.apache.qpid.server.queue;
import java.util.List;
+import java.util.LinkedList;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Distributes messages among a list of subsscription managers, using their
* weighting.
- *
*/
class NestedSubscriptionManager implements SubscriptionManager
{
@@ -44,11 +44,24 @@
_subscribers.remove(s);
}
+
+ public List<Subscription> getSubscriptions()
+ {
+ List<Subscription> allSubs = new LinkedList<Subscription>();
+
+ for (WeightedSubscriptionManager subMans : _subscribers)
+ {
+ allSubs.addAll(subMans.getSubscriptions());
+ }
+
+ return allSubs;
+ }
+
public boolean hasActiveSubscribers()
{
- for(WeightedSubscriptionManager s : _subscribers)
+ for (WeightedSubscriptionManager s : _subscribers)
{
- if(s.hasActiveSubscribers())
+ if (s.hasActiveSubscribers())
{
return true;
}
@@ -59,9 +72,9 @@
public Subscription nextSubscriber(AMQMessage msg)
{
WeightedSubscriptionManager start = current();
- for(WeightedSubscriptionManager s = start; s != null; s = next(start))
+ for (WeightedSubscriptionManager s = start; s != null; s = next(start))
{
- if(hasMore(s))
+ if (hasMore(s))
{
return nextSubscriber(s);
}
@@ -94,7 +107,7 @@
private WeightedSubscriptionManager next()
{
_iterations = 0;
- if(++_index >= _subscribers.size())
+ if (++_index >= _subscribers.size())
{
_index = 0;
}
Modified:
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteSubscriptionImpl.java
Sun Dec 17 08:23:38 2006
@@ -25,6 +25,9 @@
import org.apache.qpid.server.cluster.SimpleSendable;
import org.apache.qpid.AMQException;
+import java.util.Queue;
+import java.util.List;
+
class RemoteSubscriptionImpl implements Subscription,
WeightedSubscriptionManager
{
private final GroupManager _groupMgr;
@@ -76,6 +79,11 @@
return _count;
}
+ public List<Subscription> getSubscriptions()
+ {
+ return null; //To change body of implemented methods use File |
Settings | File Templates.
+ }
+
public boolean hasActiveSubscribers()
{
return getWeight() == 0;
@@ -102,5 +110,20 @@
public boolean hasInterest(AMQMessage msg)
{
return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented on RemoteSubscriptions then
look at SubscriptionImpl
+ }
+
+ public void sendNextMessage(AMQQueue queue)
+ {
+
}
}
Modified:
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SendPerfTest.java
Sun Dec 17 08:23:38 2006
@@ -90,7 +90,8 @@
/**
* Delivers messages to a number of queues.
- * @param count the number of messages to deliver
+ *
+ * @param count the number of messages to deliver
* @param queues the list of queues
* @throws NoConsumersException
*/
@@ -121,7 +122,7 @@
q.bind("routingKey", exchange);
try
{
- q.registerProtocolSession(createSession(), 1, "1", false);
+ q.registerProtocolSession(createSession(), 1, "1", false,
null);
}
catch (Exception e)
{
@@ -135,7 +136,7 @@
static AMQQueue createQueue(String name) throws AMQException
{
return new AMQQueue(name, false, null, false,
ApplicationRegistry.getInstance().getQueueRegistry(),
- new OnCurrentThreadExecutor());
+ new OnCurrentThreadExecutor());
}
static AMQProtocolSession createSession() throws Exception
Modified:
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/SynchronizedDeliveryManagerTest.java
Sun Dec 17 08:23:38 2006
@@ -23,7 +23,6 @@
import org.apache.qpid.server.queue.SynchronizedDeliveryManager;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
-import org.apache.qpid.server.queue.ConcurrentDeliveryManager;
import org.apache.qpid.server.queue.DeliveryManagerTest;
import org.apache.qpid.AMQException;
Modified:
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java?view=diff&rev=488002&r1=488001&r2=488002
==============================================================================
---
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
(original)
+++
incubator/qpid/branches/jmsselectors/java/systests/src/test/java/org/apache/qpid/server/queue/TestSubscription.java
Sun Dec 17 08:23:38 2006
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
public class TestSubscription implements Subscription
{
@@ -78,6 +79,16 @@
public boolean hasInterest(AMQMessage msg)
{
return true;
+ }
+
+ public Queue<AMQMessage> getPreDeliveryQueue()
+ {
+ return null;
+ }
+
+ public void enqueueForPreDelivery(AMQMessage msg)
+ {
+ //no-op -- if selectors are implemented here then look at
SubscriptionImpl
}
public int hashCode()