Author: ritchiem
Date: Wed Jul 25 05:40:24 2007
New Revision: 559427
URL: http://svn.apache.org/viewvc?view=rev&rev=559427
Log:
AMQMessage - added //todo-s and removed unused parameter StoreContext from
expired() method call.
ConcurrentSelectorDeliveryManager - Update to reflect expired() call change.
Created new _reaperContextStore to be used when performing reaper operations
such as message dequeue due to expiration. Removed old commented code.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
Wed Jul 25 05:40:24 2007
@@ -35,6 +35,7 @@
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
+import org.apache.qpid.server.protocol.AMQMinaProtocolSession;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -72,7 +73,7 @@
SaslServer ss = null;
try
- {
+ {
ss = authMgr.createSaslServer(String.valueOf(body.mechanism),
session.getLocalFQDN());
if (ss == null)
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
Wed Jul 25 05:40:24 2007
@@ -204,6 +204,7 @@
if (message instanceof AMQDataBlock)
{
amqProtocolSession.dataBlockReceived((AMQDataBlock) message);
+
}
else if (message instanceof ByteBuffer)
{
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Wed Jul 25 05:40:24 2007
@@ -81,12 +81,17 @@
// private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new
TransientMessageData();
+ //todo: this should be part of a messageOnQueue object
private Set<Subscription> _rejectedBy = null;
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue,
AtomicBoolean>();
+ //todo: this should be part of a messageOnQueue object
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new
HashMap<AMQQueue, Subscription>();
private final int hashcode = System.identityHashCode(this);
+
+ //todo: this should be part of a messageOnQueue object
private long _expiration;
public String debugIdentity()
@@ -652,14 +657,13 @@
/**
* Checks to see if the message has expired. If it has the message is
dequeued.
*
- * @param storecontext
- * @param queue
+ * @param queue The queue to check the expiration against. (Currently not
used)
*
* @return true if the message has expire
*
* @throws AMQException
*/
- public boolean expired(StoreContext storecontext, AMQQueue queue) throws
AMQException
+ public boolean expired(AMQQueue queue) throws AMQException
{
// note: If the storecontext isn't need then we can remove the
getChannel() from Subscription.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=559427&r1=559426&r2=559427
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
Wed Jul 25 05:40:24 2007
@@ -87,6 +87,10 @@
private final Object _queueHeadLock = new Object();
private String _processingThreadName = "";
+
+ /** Used by any reaping thread to purge messages */
+ private StoreContext _reapingStoreContext = new StoreContext();
+
ConcurrentSelectorDeliveryManager(SubscriptionManager subscriptions,
AMQQueue queue)
{
@@ -463,17 +467,19 @@
assert removed == message;
// if the message expired then the _totalMessageSize needs
adjusting
- if (message.expired(sub.getChannel().getStoreContext(), _queue))
+ if (message.expired(_queue))
{
_totalMessageSize.addAndGet(-message.getSize());
- message.dequeue(sub.getChannel().getStoreContext(), _queue);
+ // Use the reapingStoreContext as any sub(if we have one) may
be in a tx.
+ message.dequeue(_reapingStoreContext, _queue);
if (_log.isInfoEnabled())
{
_log.info(debugIdentity() + " Doing clean up of the main
_message queue.");
}
}
+
//else the clean up is not required as the message has already
been taken for this queue therefore
// it was the responsibility of the code that took the message to
ensure the _totalMessageSize was updated.
@@ -513,15 +519,15 @@
// if the message is null then don't purge as we have no messagse.
if (message != null)
{
+ // Check that the message hasn't expired.
+ if (message.expired(_queue))
+ {
+ return true;
+ }
+
// if we have a subscriber perform message checks
if (sub != null)
{
- // Check that the message hasn't expired.
- if (message.expired(sub.getChannel().getStoreContext(),
_queue))
- {
- return true;
- }
-
// if we have a queue browser(we don't purge) so check mark
the message as taken
purge = ((!sub.isBrowser() || message.isTaken(_queue)));
}
@@ -640,7 +646,14 @@
}
catch (AMQException e)
{
- message.release(_queue);
+ if (message != null)
+ {
+ message.release(_queue);
+ }
+ else
+ {
+ _log.error(debugIdentity() + "Unable to release message as it
is null. " + e, e);
+ }
_log.error(debugIdentity() + "Unable to deliver message as dequeue
failed: " + e, e);
}
}
@@ -719,25 +732,6 @@
}
-// private void sendNextMessage(Subscription sub)
-// {
-// if (sub.filtersMessages())
-// {
-// sendNextMessage(sub, sub.getPreDeliveryQueue());
-// if (sub.isAutoClose())
-// {
-// if (sub.getPreDeliveryQueue().isEmpty())
-// {
-// sub.close();
-// }
-// }
-// }
-// else
-// {
-// sendNextMessage(sub, _messages);
-// }
-// }
-
public void deliver(StoreContext context, AMQShortString name, AMQMessage
msg, boolean deliverFirst) throws AMQException
{
@@ -746,8 +740,6 @@
{
_log.debug(debugIdentity() + "deliver :first(" + deliverFirst + ")
:" + msg);
}
- // This shouldn't be done here.
-// msg.release();
//Check if we have someone to deliver the message to.
_lock.lock();