Author: rgodfrey
Date: Fri Jun 20 06:17:40 2008
New Revision: 669885
URL: http://svn.apache.org/viewvc?rev=669885&view=rev
Log:
QPID-1101 : Updated Direct Exchange so it does not modify lists of queues
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java?rev=669885&r1=669884&r2=669885&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
Fri Jun 20 06:17:40 2008
@@ -198,7 +198,7 @@
_logger.debug("Publishing message to queue " + queues);
}
- payload.enqueue(queues);
+ payload.enqueue(queues);
}
Modified:
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java?rev=669885&r1=669884&r2=669885&view=diff
==============================================================================
---
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
(original)
+++
incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Index.java
Fri Jun 20 06:17:40 2008
@@ -23,6 +23,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -44,15 +45,15 @@
List<AMQQueue> queues = _index.get(key);
if(queues == null)
{
- queues = new CopyOnWriteArrayList<AMQQueue>();
- //next call is atomic, so there is no race to create the list
- List<AMQQueue> active = _index.putIfAbsent(key, queues);
- if(active != null)
- {
- //someone added the new one in faster than we did, so use
theirs
- queues = active;
- }
+ queues = new ArrayList<AMQQueue>();
+ }
+ else
+ {
+ queues = new ArrayList<AMQQueue>(queues);
}
+ //next call is atomic, so there is no race to create the list
+ _index.put(key, queues);
+
if(queues.contains(queue))
{
return false;
@@ -68,10 +69,18 @@
List<AMQQueue> queues = _index.get(key);
if (queues != null)
{
+ queues = new ArrayList<AMQQueue>(queues);
boolean removed = queues.remove(queue);
- if (queues.size() == 0)
+ if(removed)
{
- _index.remove(key);
+ if (queues.size() == 0)
+ {
+ _index.remove(key);
+ }
+ else
+ {
+ _index.put(key, queues);
+ }
}
return removed;
}