Author: rgodfrey
Date: Wed Oct 17 12:59:58 2007
New Revision: 585655
URL: http://svn.apache.org/viewvc?rev=585655&view=rev
Log:
Merged revisions
573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-581967,581969-582197,582199-582200,582203-582204,582206-582262,582264,582267-583084,583087,583089-583104,583106-583146,583148-583153,583155-583169,583171-583172,583174-583398,583400-583414,583416-583417,583419-583437,583439-583482,583484-583517,583519-583545,583547,583549-
583774,583777-583807,583809-583881,583883-584107,584109-584112,584114-584123,584125-585653
via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r585565 | ritchiem | 2007-10-17 17:39:20 +0100 (Wed, 17 Oct 2007) | 1 line
QPID-643 : CSDM causes duplicate message delivery. Don't deliver messages
that have been taken.
........
r585570 | ritchiem | 2007-10-17 17:48:01 +0100 (Wed, 17 Oct 2007) | 1 line
Update to AMQMessage to reset the deliveredToConsumer flag(false) when the
message is released. This flag is now used by more than the immediate delivery.
It is also used to understand if the message has been delivered so that we can
tell the message should not be purged.
........
r585575 | ritchiem | 2007-10-17 17:59:42 +0100 (Wed, 17 Oct 2007) | 1 line
QPID-647 : Update to ConcurrentSelectorDeliveryManager to restart async
process if a msg is queued that has the potential to be delivered.
........
r585642 | rgodfrey | 2007-10-17 20:42:14 +0100 (Wed, 17 Oct 2007) | 1 line
QPID-645 : TxnBuffer should rethrow exceptions encountered on commit
........
Modified:
incubator/qpid/branches/M2/ (props changed)
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Oct 17 12:59:58 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-584113,584124
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-585653
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Oct 17 12:59:58 2007
@@ -133,7 +133,7 @@
public boolean isReferenced()
{
return _referenceCount.get() > 0;
- }
+ }
/**
* Used to iterate through all the body frames associated with this
message. Will not keep all the data in memory
@@ -558,6 +558,7 @@
taken.set(false);
}
+ _deliveredToConsumer = false;
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, null);
}
@@ -694,7 +695,10 @@
return false;
}
- /** Called when this message is delivered to a consumer. (used to
implement the 'immediate' flag functionality). */
+ /**
+ * Called when this message is delivered to a consumer. (used to implement
the 'immediate' flag functionality).
+ * And for selector efficiency.
+ */
public void setDeliveredToConsumer()
{
_deliveredToConsumer = true;
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Oct 17 12:59:58 2007
@@ -212,6 +212,15 @@
}
/**
+ *
+ * @return the state of the async processor.
+ */
+ public boolean isProcessingAsync()
+ {
+ return _processing.get();
+ }
+
+ /**
* Returns all the messages in the Queue
*
* @return List of messages
@@ -821,6 +830,12 @@
{
addMessageToQueue(msg, deliverFirst);
+ //if we have a non-filtering subscriber but queued
messages && we're not Async && we have other Active subs then something is
wrong!
+ if ((s != null && hasQueuedMessages()) &&
!isProcessingAsync() && _subscriptions.hasActiveSubscribers())
+ {
+ _queue.deliverAsync();
+ }
+
//release lock now message is on queue.
_lock.unlock();
@@ -883,7 +898,36 @@
_log.trace(debugIdentity() + "Delivering Message:"
+ msg.debugIdentity() + " to(" +
System.identityHashCode(s) + ") :" + s);
}
- msg.taken(_queue, s);
+
+ if (msg.taken(_queue, s))
+ {
+ //Message has been delivered so don't redeliver.
+ // This can currently occur because of the
recursive call below
+ // During unit tests the send can occur
+ // client then rejects
+ // this reject then releases the message by the
time the
+ // if(!msg.isTaken()) call is made below
+ // the message has been released so that thread
loops to send the message again
+ // of course by the time it gets back to here. the
thread that released the
+ // message is now ready to send it. Here is a
sample trace for reference
+//1192627162613:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162613:Thread[pool-917-thread-4,5,main]:28398657 Sent :dt:214
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Reject message
by:[channel=Channel: id 1, transaction mode: true, prefetch marks: 2500/5000,
consumerTag=41, session=anonymous(5050419), resendQueue=false]
+//1192627162613:Thread[pool-917-thread-2,5,main]:Releasing Message:(HC:5529738
ID:145 Ref:1)
+//1192627162613:Thread[pool-917-thread-2,5,main]:Msg:Release:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:This:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=41,
session=anonymous(5050419), resendQueue=false]}
+//1192627162613:Thread[pool-917-thread-2,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:suspended:
Message((HC:5529738 ID:145 Ref:1)) has not been taken so recursing!:
Subscriber:28398657
+//1192627162629:Thread[pool-917-thread-4,5,main]:CSDM:delivery:(true)message:Message[(HC:5529738
ID:145 Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]
+//1192627162629:Thread[pool-917-thread-2,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=false} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=null}
+//1192627162629:Thread[pool-917-thread-2,5,main]:25386607 Sent :dt:172
msg:(HC:5529738 ID:145 Ref:1)
+//1192627162629:Thread[pool-917-thread-4,5,main]:Msg:taken:Q:Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326:sub:[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]:this:Message[(HC:5529738 ID:145
Ref:1)]: 145; ref count: 1; taken for queues:
{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=true} by
Subs:{Queue(queue-596fb10e-2968-4e51-a751-1e6643bf9dd6)@16017326=[channel=Channel:
id 1, transaction mode: true, prefetch marks: 2500/5000, consumerTag=33,
session=anonymous(26960027), resendQueue=false]}
+ // Note: In the last request to take the message
from thread 4,5 the message has been
+ // taken by the previous call done by thread 2,5
+
+
+ return;
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -897,6 +941,10 @@
}
}
+ //
+ // Why do we do this? What was the reasoning? We should have a
better approach
+ // than recursion and rejecting if someone else sends it
before we do.
+ //
if (!msg.isTaken(_queue))
{
if (debugEnabled)
@@ -942,6 +990,8 @@
{
public void run()
{
+ String startName = Thread.currentThread().getName();
+ Thread.currentThread().setName("CSDM-AsyncDelivery:" + startName);
boolean running = true;
while (running && !_movingMessages.get())
{
@@ -957,6 +1007,7 @@
_processing.set(false);
}
}
+ Thread.currentThread().setName(startName);
}
}
@@ -983,8 +1034,9 @@
private String currentStatus()
{
- return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(M:H)")
+
- "(" + _messages.size() + ":" +
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize() + ") " +
+ return " Queued:" + (_messages.isEmpty() ? "Empty " : "Contains(H:M)")
+
+ "(" + ((ConcurrentLinkedMessageQueueAtomicSize)
_messages).headSize() +
+ ":" + (_messages.size() -
((ConcurrentLinkedMessageQueueAtomicSize) _messages).headSize()) + ") " +
" Extra: " + (_hasContent.isEmpty() ? "Empty " : "Contains") +
"(" + _hasContent.size() + ":" + _extraMessages.get() + ") " +
" Active:" + _subscriptions.hasActiveSubscribers() +
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
Wed Oct 17 12:59:58 2007
@@ -54,7 +54,7 @@
_ops.clear();
}
- private boolean prepare(StoreContext context)
+ private boolean prepare(StoreContext context) throws AMQException
{
for (int i = 0; i < _ops.size(); i++)
{
@@ -63,19 +63,31 @@
{
op.prepare(context);
}
- catch (Exception e)
+ catch (AMQException e)
{
- //compensate previously prepared ops
- for (int j = 0; j < i; j++)
- {
- _ops.get(j).undoPrepare();
- }
- return false;
+ undoPrepare(i);
+ throw e;
+ }
+ catch (RuntimeException e)
+ {
+ undoPrepare(i);
+ throw e;
}
}
return true;
}
+ private void undoPrepare(int lastPrepared)
+ {
+ //compensate previously prepared ops
+ for (int j = 0; j < lastPrepared; j++)
+ {
+ _ops.get(j).undoPrepare();
+ }
+ }
+
+
+
public void rollback(StoreContext context) throws AMQException
{
for (TxnOp op : _ops)
Modified:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java?rev=585655&r1=585654&r2=585655&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
(original)
+++
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnBufferTest.java
Wed Oct 17 12:59:58 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.server.store.StoreContext;
import java.util.LinkedList;
+import java.util.NoSuchElementException;
public class TxnBufferTest extends TestCase
{
@@ -78,7 +79,16 @@
buffer.enlist(new FailedPrepare());
buffer.enlist(new MockOp());
- buffer.commit(null);
+ try
+ {
+ buffer.commit(null);
+
+ }
+ catch (NoSuchElementException e)
+ {
+
+ }
+
validateOps();
store.validate();
}