Author: ritchiem
Date: Fri Oct 5 07:59:21 2007
New Revision: 582296
URL: http://svn.apache.org/viewvc?rev=582296&view=rev
Log:
Merged revisions
573738-573739,573741-574077,574079-574236,574238-574265,574267-574503,574505-574554,574556-574584,574586-574873,574875-574901,574903-575737,575739-575787,575789-575810,575812-577772,577774-577940,577942-578057,578059-578732,578734,578736-578744,578746-578827,578829-578844,578846-579114,579116-579146,579148-579197,579199-579228,579230-579573,579575-579576,579579-579601,579603-579613,579615-579708,579710-580021,580023-580039,580042-580060,580062-580065,580067-580080,580082-580257,580259-580264,580266-580350,580352-580984,580986-580991,580994-581001,581003-581170,581172-581188,581190-581206,581208-581245,581247-581292,581294-581539,581541-581565,581567-581620,581622-581626,581628-581646,581648-582204,582206-582269
via svnmerge from
https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1
........
r581968 | rupertlssmith | 2007-10-04 17:57:40 +0100 (Thu, 04 Oct 2007) | 1
line
Updaded performance tests to better test pub/sub mode with 1:10 fanout.
........
r582198 | ritchiem | 2007-10-05 11:33:14 +0100 (Fri, 05 Oct 2007) | 1 line
QPID-617 : Forgot to commit Test case to validate fix.
........
r582201 | ritchiem | 2007-10-05 11:39:54 +0100 (Fri, 05 Oct 2007) | 1 line
QPID-624: Update to ensure all errors are correctly processed in
BlockingMethodFrameListener.java
........
r582202 | ritchiem | 2007-10-05 11:44:06 +0100 (Fri, 05 Oct 2007) | 1 line
QPID-624 : Forgot to commit updates to test along with
BlockingMethodFrameListener
........
r582263 | ritchiem | 2007-10-05 14:38:13 +0100 (Fri, 05 Oct 2007) | 1 line
Qpid-623 : When only selectors are used on a queue the main _messages queue
causes a leak. Here is a new test provided by Aidan Skinner and a simple fix
that will prevent OOME when only selectors are connected to the Queue.
........
r582265 | ritchiem | 2007-10-05 14:39:03 +0100 (Fri, 05 Oct 2007) | 1 line
Qpid-558 : Patch provided by Aidan Skinner addressing AMQShortString not
autoexpand-ing so when adding content to it would throw an exception
........
r582266 | ritchiem | 2007-10-05 14:39:25 +0100 (Fri, 05 Oct 2007) | 1 line
QPID-551 : Patch provided by Aidan Skinner to address problems in info
logging when stacktraces are short.
........
Added:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
- copied unchanged from r582266,
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/queue/QueueDepthWithSelectorTest.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
- copied unchanged from r582202,
incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
Modified:
incubator/qpid/branches/M2/ (props changed)
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
incubator/qpid/branches/M2/java/perftests/pom.xml
incubator/qpid/branches/M2/java/perftests/src/main/java/org/apache/qpid/ping/PingClient.java
Propchange: incubator/qpid/branches/M2/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Fri Oct 5 07:59:21 2007
@@ -1 +1 @@
-/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-581628,581647,582205
+/incubator/qpid/branches/M2.1:1-573736,573738-577772,577774-578732,578734,578736-578744,578746-578827,578829-582269
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Fri Oct 5 07:59:21 2007
@@ -422,7 +422,7 @@
//If this causes ref count to hit zero then data will be purged so
message.getSize() will NPE.
message.decrementReference(storeContext);
- }
+ }
_lock.unlock();
}
@@ -462,15 +462,15 @@
*/
private AMQMessage getNextMessage() throws AMQException
{
- return getNextMessage(_messages, null);
+ return getNextMessage(_messages, null, false);
}
- private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription
sub) throws AMQException
+ private AMQMessage getNextMessage(Queue<AMQMessage> messages, Subscription
sub, boolean purgeOnly) throws AMQException
{
AMQMessage message = messages.peek();
//while (we have a message) && ((The subscriber is not a browser or
message is taken ) or we are clearing) && (Check message is taken.)
- while (purgeMessage(message, sub))
+ while (purgeMessage(message, sub, purgeOnly))
{
// if we are purging then ensure we mark this message taken for
the current subscriber
// the current subscriber may be null in the case of a get or a
purge but this is ok.
@@ -527,6 +527,24 @@
*/
private boolean purgeMessage(AMQMessage message, Subscription sub) throws
AMQException
{
+ return purgeMessage(message, sub, false);
+ }
+
+ /**
+ * This method will return true if the message is to be purged from the
queue.
+ * \
+ * SIDE-EFFECT: The msg will be taken by the Subscription(sub) for the
current Queue(_queue) when purgeOnly is false
+ *
+ * @param message
+ * @param sub
+ * @param purgeOnly When set to false the message will be taken by the
given Subscription.
+ *
+ * @return if the msg should be purged
+ *
+ * @throws AMQException
+ */
+ private boolean purgeMessage(AMQMessage message, Subscription sub, boolean
purgeOnly) throws AMQException
+ {
//Original.. complicated while loop control
// (message != null
// && (
@@ -561,9 +579,18 @@
}
}
- // if we are purging then ensure we mark this message taken for the
current subscriber
- // the current subscriber may be null in the case of a get or a purge
but this is ok.
- return purge && message.taken(_queue, sub);
+ if (purgeOnly)
+ {
+ // If we are simply purging the queue don't take the message
+ // just purge up to the next non-taken msg.
+ return purge && message.isTaken(_queue);
+ }
+ else
+ {
+ // if we are purging then ensure we mark this message taken for
the current subscriber
+ // the current subscriber may be null in the case of a get or a
purge but this is ok.
+ return purge && message.taken(_queue, sub);
+ }
}
public void sendNextMessage(Subscription sub, AMQQueue
queue)//Queue<AMQMessage> messageQueue)
@@ -594,7 +621,7 @@
{
synchronized (_queueHeadLock)
{
- message = getNextMessage(messageQueue, sub);
+ message = getNextMessage(messageQueue, sub, false);
// message will be null if we have no messages in the
messageQueue.
if (message == null)
@@ -661,7 +688,7 @@
//fixme - we should do the clean up as the message remains
on the _message queue
// this is resulting in the next consumer receiving the
message and then attempting to purge it
//
- _log.info(debugIdentity() + "We should do clean up of the
main _message queue here");
+ cleanMainQueue(sub);
}
}
@@ -677,6 +704,18 @@
_log.error(debugIdentity() + "Unable to release message as it
is null. " + e, e);
}
_log.error(debugIdentity() + "Unable to deliver message as dequeue
failed: " + e, e);
+ }
+ }
+
+ private void cleanMainQueue(Subscription sub)
+ {
+ try
+ {
+ getNextMessage(_messages, sub, true);
+ }
+ catch (AMQException e)
+ {
+ _log.warn("Problem during main queue purge:" + e.getMessage());
}
}
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Fri Oct 5 07:59:21 2007
@@ -514,8 +514,9 @@
{
if (_logger.isInfoEnabled())
{
+ StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
_logger.info("Closing session: " + this + ":"
- +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ + Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1));
}
synchronized (_messageDeliveryLock)
@@ -669,7 +670,7 @@
startDistpatcherIfNecessary(true);
}
- _dispatcher.rejectPending(consumer);
+ _dispatcher.rejectPending(consumer);
}
else
{
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
Fri Oct 5 07:59:21 2007
@@ -480,15 +480,14 @@
{
if (_logger.isTraceEnabled())
{
+ StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
- _logger.trace(_consumerTag + " close():"
- +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
_logger.trace(_consumerTag + " previously:" +
_closedStack.toString());
}
else
{
- _closedStack =
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6);
+ _closedStack = Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1);
}
}
@@ -553,15 +552,16 @@
if (_logger.isTraceEnabled())
{
+ StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
_logger.trace(_consumerTag + " markClosed():"
- +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1));
_logger.trace(_consumerTag + " previously:" +
_closedStack.toString());
}
else
{
- _closedStack =
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ _closedStack = Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1);
}
}
}
@@ -758,15 +758,16 @@
_closed.set(true);
if (_logger.isTraceEnabled())
{
+ StackTraceElement[] stackTrace =
Thread.currentThread().getStackTrace();
if (_closedStack != null)
{
_logger.trace(_consumerTag + " notifyError():"
- +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8));
+ + Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1));
_logger.trace(_consumerTag + " previously" +
_closedStack.toString());
}
else
{
- _closedStack =
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 8);
+ _closedStack = Arrays.asList(stackTrace).subList(3,
stackTrace.length - 1);
}
}
}
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
Fri Oct 5 07:59:21 2007
@@ -78,14 +78,22 @@
/** This flag is used to indicate that the blocked for method has been
received. */
private volatile boolean _ready = false;
+ /** This flag is used to indicate that the received error has been
processed. */
+ private volatile boolean _errorAck = false;
+
/** Used to protect the shared event and ready flag between the producer
and consumer. */
private final ReentrantLock _lock = new ReentrantLock();
-
+
/**
* Used to signal that a method has been received
*/
private final Condition _receivedCondition = _lock.newCondition();
+ /**
+ * Used to signal that a error has been processed
+ */
+ private final Condition _errorConditionAck = _lock.newCondition();
+
/** Used to hold the most recent exception that is passed to the [EMAIL
PROTECTED] #error(Exception)} method. */
private volatile Exception _error;
@@ -142,7 +150,7 @@
_ready = ready;
_receivedCondition.signal();
}
- finally
+ finally
{
_lock.unlock();
}
@@ -174,13 +182,15 @@
public AMQMethodEvent blockForFrame(long timeout) throws AMQException,
FailoverException
{
long nanoTimeout = TimeUnit.MILLISECONDS.toNanos(timeout);
-
+
_lock.lock();
+
try
{
while (!_ready)
{
- try {
+ try
+ {
if (timeout == -1)
{
_receivedCondition.await();
@@ -195,7 +205,7 @@
_ready = true;
}
}
- }
+ }
catch (InterruptedException e)
{
// IGNORE -- //fixme this isn't ideal as being
interrupted isn't equivellant to sucess
@@ -206,29 +216,34 @@
// }
}
}
+
+
+ if (_error != null)
+ {
+ if (_error instanceof AMQException)
+ {
+ throw (AMQException) _error;
+ }
+ else if (_error instanceof FailoverException)
+ {
+ // This should ensure that FailoverException is not
wrapped and can be caught.
+ throw (FailoverException) _error; // needed to expose
FailoverException.
+ }
+ else
+ {
+ throw new AMQException("Woken up due to " +
_error.getClass(), _error);
+ }
+ }
+
}
finally
{
+ _errorAck = true;
+ _errorConditionAck.signal();
+ _error = null;
_lock.unlock();
}
- if (_error != null)
- {
- if (_error instanceof AMQException)
- {
- throw (AMQException) _error;
- }
- else if (_error instanceof FailoverException)
- {
- // This should ensure that FailoverException is not wrapped
and can be caught.
- throw (FailoverException) _error; // needed to expose
FailoverException.
- }
- else
- {
- throw new AMQException("Woken up due to " + _error.getClass(),
_error);
- }
- }
-
return _doneEvt;
}
@@ -242,13 +257,36 @@
{
// set the error so that the thread that is blocking (against
blockForFrame())
// can pick up the exception and rethrow to the caller
- _error = e;
+
_lock.lock();
+
+ if (_error == null)
+ {
+ _error = e;
+ }
+ else
+ {
+ System.err.println("WARNING: new error arrived while old one not
yet processed");
+ }
+
try
{
_ready = true;
_receivedCondition.signal();
+
+ while (!_errorAck)
+ {
+ try
+ {
+ _errorConditionAck.await();
+ }
+ catch (InterruptedException e1)
+ {
+ //
+ }
+ }
+ _errorAck = false;
}
finally
{
Modified:
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
Fri Oct 5 07:59:21 2007
@@ -26,7 +26,10 @@
import org.apache.qpid.AMQTimeoutException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolSession;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.framing.AMQFrame;
@@ -60,7 +63,7 @@
Connection _connection;
private String _brokerlist = "vm://:1";
private Session _session;
- private static final long SYNC_TIMEOUT = 500;
+ private static final long SYNC_TIMEOUT = 5000;
private int TEST = 0;
protected void setUp() throws Exception
@@ -287,7 +290,7 @@
TEST++;
_logger.info("Test creating producer which will use channel id 1");
- Queue queue = _session.createQueue("CCT_test_validation_queue" + TEST);
+ Queue queue = _session.createTemporaryQueue();
MessageConsumer consumer = _session.createConsumer(queue);
@@ -311,7 +314,7 @@
connection.setConnectionListener(this);
- _session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ _session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
connection.start();
@@ -332,31 +335,42 @@
return connection;
}
- private void declareExchange(int channelId, String _type, String _name,
boolean nowait)
- throws AMQException, FailoverException
+ private void declareExchange(final int channelId, final String _type,
final String _name, final boolean nowait)
+ throws AMQException, FailoverException
{
- AMQFrame exchangeDeclare =
- ExchangeDeclareBody.createAMQFrame(channelId,
- ((AMQConnection)
_connection).getProtocolHandler().getProtocolMajorVersion(),
- ((AMQConnection)
_connection).getProtocolHandler().getProtocolMinorVersion(), null, // arguments
- false, // autoDelete
- false, // durable
- new AMQShortString(_name), // exchange
- false, // internal
- nowait, // nowait
- true, // passive
- 0, // ticket
- new AMQShortString(_type)); // type
+// new FailoverRetrySupport<Object, AMQException>(new
FailoverProtectedOperation<Object, AMQException>()
+// {
+// public Object execute() throws AMQException, FailoverException
+// {
+
+ AMQProtocolHandler protocolHandler = ((AMQConnection)
_connection).getProtocolHandler();
+
+ AMQFrame exchangeDeclare =
+ ExchangeDeclareBody.createAMQFrame(channelId,
+
protocolHandler.getProtocolMajorVersion(),
+
protocolHandler.getProtocolMinorVersion(), null, // arguments
+ false, // autoDelete
+ false, // durable
+ new
AMQShortString(_name), // exchange
+ false, // internal
+ nowait, // nowait
+ true, // passive
+ 0, // ticket
+ new
AMQShortString(_type)); // type
+
+ if (nowait)
+ {
+ protocolHandler.writeFrame(exchangeDeclare);
+ }
+ else
+ {
+ protocolHandler.syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class, SYNC_TIMEOUT);
+ }
+
+// return null;
+// }
+// }, (AMQConnection)_connection).execute();
- if (nowait)
- {
- ((AMQConnection)
_connection).getProtocolHandler().writeFrame(exchangeDeclare);
- }
- else
- {
- ((AMQConnection)
_connection).getProtocolHandler().syncWrite(exchangeDeclare,
ExchangeDeclareOkBody.class,
- SYNC_TIMEOUT);
- }
}
private void createChannel(int channelId) throws AMQException,
FailoverException
@@ -375,10 +389,12 @@
}
public void bytesSent(long count)
- { }
+ {
+ }
public void bytesReceived(long count)
- { }
+ {
+ }
public boolean preFailover(boolean redirect)
{
@@ -391,5 +407,6 @@
}
public void failoverComplete()
- { }
+ {
+ }
}
Modified:
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=582296&r1=582295&r2=582296&view=diff
==============================================================================
---
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
(original)
+++
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
Fri Oct 5 07:59:21 2007
@@ -212,6 +212,7 @@
if (size != 0)
{
+ buffer.setAutoExpand(true);
buffer.put((byte) size);
if (_data.buf().hasArray())
{