Author: rupertlssmith
Date: Mon Jul 2 07:17:45 2007
New Revision: 552499
URL: http://svn.apache.org/viewvc?view=rev&rev=552499
Log:
Added some documentation.
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
incubator/qpid/branches/M2/java/perftests/pom.xml
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
Mon Jul 2 07:17:45 2007
@@ -20,45 +20,44 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.log4j.Logger;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
-/** Combines the information that make up a deliverable message into a more
manageable form. */
-import org.apache.log4j.Logger;
-
-import java.util.Arrays;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import java.util.Set;
-import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-/** Combines the information that make up a deliverable message into a more
manageable form. */
+/**
+ * A deliverable message.
+ */
public class AMQMessage
{
+ /** Used for debugging purposes. */
private static final Logger _log = Logger.getLogger(AMQMessage.class);
- /** Used in clustering */
+ /** Used in clustering. @todo What for? */
private Set<Object> _tokens;
- /** Only use in clustering - should ideally be removed? */
+ /** Only use in clustering. @todo What for? */
private AMQProtocolSession _publisher;
private final Long _messageId;
@@ -67,33 +66,27 @@
private AMQMessageHandle _messageHandle;
- // TODO: ideally this should be able to go into the transient message date
- check this! (RG)
+ /** Holds the transactional context in which this message is being
processed. */
private TransactionalContext _txnContext;
/**
- * Flag to indicate whether message has been delivered to a consumer. Used
in implementing return functionality for
- * messages published with the 'immediate' flag.
+ * Flag to indicate whether this message has been delivered to a consumer.
Used in implementing return functionality
+ * for messages published with the 'immediate' flag.
*/
private boolean _deliveredToConsumer;
- /**
- * We need to keep track of whether the message was 'immediate' as in
extreme circumstances, when the
- * checkDelieveredToConsumer is called, the message may already have been
received and acknowledged, and the body
- * removed from the store.
- */
+
+ /** Flag to indicate that this message requires 'immediate' delivery. */
private boolean _immediate;
- // private Subscription _takenBySubcription;
- // private AtomicBoolean _taken = new AtomicBoolean(false);
+ // private Subscription _takenBySubcription;
+ // private AtomicBoolean _taken = new AtomicBoolean(false);
private TransientMessageData _transientMessageData = new
TransientMessageData();
-
private Set<Subscription> _rejectedBy = null;
-
private Map<AMQQueue, AtomicBoolean> _takenMap = new HashMap<AMQQueue,
AtomicBoolean>();
private Map<AMQQueue, Subscription> _takenBySubcriptionMap = new
HashMap<AMQQueue, Subscription>();
-
private final int hashcode = System.identityHashCode(this);
private long _expiration;
@@ -104,8 +97,10 @@
public void setExpiration()
{
- long expiration = ((BasicContentHeaderProperties)
_transientMessageData.getContentHeaderBody().properties).getExpiration();
- long timestamp = ((BasicContentHeaderProperties)
_transientMessageData.getContentHeaderBody().properties).getTimestamp();
+ long expiration =
+ ((BasicContentHeaderProperties)
_transientMessageData.getContentHeaderBody().properties).getExpiration();
+ long timestamp =
+ ((BasicContentHeaderProperties)
_transientMessageData.getContentHeaderBody().properties).getTimestamp();
if
(ApplicationRegistry.getInstance().getConfiguration().getBoolean("advanced.synced-clocks",
false))
{
@@ -118,10 +113,10 @@
{
if (timestamp != 0L)
{
- //todo perhaps use arrival time
+ // todo perhaps use arrival time
long diff = (System.currentTimeMillis() - timestamp);
- if (diff > 1000L || diff < 1000L)
+ if ((diff > 1000L) || (diff < 1000L))
{
_expiration = expiration + diff;
}
@@ -152,11 +147,12 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),
_messageId) - 1;
+ return _index <
(_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Unable to get body count: " + e, e);
+
return false;
}
}
@@ -166,7 +162,10 @@
try
{
- AMQBody cb =
getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
_messageId, ++_index));
+ AMQBody cb =
+
getProtocolVersionMethodConverter().convertToBody(_messageHandle.getContentChunk(getStoreContext(),
+ _messageId, ++_index));
+
return new AMQFrame(_channel, cb);
}
catch (AMQException e)
@@ -202,11 +201,12 @@
{
try
{
- return _index < _messageHandle.getBodyCount(getStoreContext(),
_messageId) - 1;
+ return _index <
(_messageHandle.getBodyCount(getStoreContext(), _messageId) - 1);
}
catch (AMQException e)
{
_log.error("Error getting body count: " + e, e);
+
return false;
}
}
@@ -229,8 +229,7 @@
}
}
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext)
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext)
{
_messageId = messageId;
_txnContext = txnContext;
@@ -250,7 +249,8 @@
*
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory
factory, TransactionalContext txnConext) throws AMQException
+ public AMQMessage(Long messageId, MessageStore store, MessageHandleFactory
factory, TransactionalContext txnConext)
+ throws AMQException
{
_messageId = messageId;
_messageHandle = factory.createMessageHandle(messageId, store, true);
@@ -266,8 +266,8 @@
* @param txnContext
* @param contentHeader
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext, ContentHeaderBody
contentHeader) throws AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext,
+ ContentHeaderBody contentHeader) throws AMQException
{
this(messageId, info, txnContext);
setContentHeaderBody(contentHeader);
@@ -285,11 +285,9 @@
*
* @throws AMQException
*/
- public AMQMessage(Long messageId, MessagePublishInfo info,
- TransactionalContext txnContext,
- ContentHeaderBody contentHeader, List<AMQQueue>
destinationQueues,
- List<ContentChunk> contentBodies, MessageStore
messageStore, StoreContext storeContext,
- MessageHandleFactory messageHandleFactory) throws
AMQException
+ public AMQMessage(Long messageId, MessagePublishInfo info,
TransactionalContext txnContext,
+ ContentHeaderBody contentHeader, List<AMQQueue> destinationQueues,
List<ContentChunk> contentBodies,
+ MessageStore messageStore, StoreContext storeContext,
MessageHandleFactory messageHandleFactory) throws AMQException
{
this(messageId, info, txnContext, contentHeader);
_transientMessageData.setDestinationQueues(destinationQueues);
@@ -331,13 +329,13 @@
}
}
- public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
- throws AMQException
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
throws AMQException
{
_transientMessageData.setContentHeaderBody(contentHeaderBody);
}
- public void routingComplete(MessageStore store, StoreContext storeContext,
MessageHandleFactory factory) throws AMQException
+ public void routingComplete(MessageStore store, StoreContext storeContext,
MessageHandleFactory factory)
+ throws AMQException
{
final boolean persistent = isPersistent();
_messageHandle = factory.createMessageHandle(_messageId, store,
persistent);
@@ -368,6 +366,7 @@
if (allContentReceived)
{
deliver(storeContext);
+
return true;
}
else
@@ -392,7 +391,8 @@
*/
public AMQMessage takeReference()
{
- incrementReference();// _referenceCount.incrementAndGet();
+ incrementReference(); // _referenceCount.incrementAndGet();
+
return this;
}
@@ -400,10 +400,10 @@
protected void incrementReference()
{
_referenceCount.incrementAndGet();
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Ref count on message " + debugIdentity() + "
incremented " +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Ref count on message " + debugIdentity() + "
incremented " +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
}
/**
@@ -427,10 +427,10 @@
{
try
{
-// if (_log.isDebugEnabled())
-// {
-// _log.debug("Decremented ref count on message " +
debugIdentity() + " is zero; removing message" +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
-// }
+ // if (_log.isDebugEnabled())
+ // {
+ // _log.debug("Decremented ref count on message " +
debugIdentity() + " is zero; removing message" +
Arrays.asList(Thread.currentThread().getStackTrace()).subList(3, 6));
+ // }
// must check if the handle is null since there may be cases
where we decide to throw away a message
// and the handle has not yet been constructed
@@ -441,7 +441,7 @@
}
catch (AMQException e)
{
- //to maintain consistency, we revert the count
+ // to maintain consistency, we revert the count
incrementReference();
throw new MessageCleanupException(_messageId, e);
}
@@ -450,7 +450,8 @@
{
if (count < 0)
{
- throw new MessageCleanupException("Reference count for message
id " + debugIdentity() + " has gone below 0.");
+ throw new MessageCleanupException("Reference count for message
id " + debugIdentity()
+ + " has gone below 0.");
}
}
}
@@ -477,7 +478,7 @@
public boolean isTaken(AMQQueue queue)
{
- //return _taken.get();
+ // return _taken.get();
synchronized (this)
{
@@ -494,15 +495,15 @@
public boolean taken(AMQQueue queue, Subscription sub)
{
-// if (_taken.getAndSet(true))
-// {
-// return true;
-// }
-// else
-// {
-// _takenBySubcription = sub;
-// return false;
-// }
+ // if (_taken.getAndSet(true))
+ // {
+ // return true;
+ // }
+ // else
+ // {
+ // _takenBySubcription = sub;
+ // return false;
+ // }
synchronized (this)
{
@@ -520,6 +521,7 @@
{
_takenMap.put(queue, taken);
_takenBySubcriptionMap.put(queue, sub);
+
return false;
}
}
@@ -532,9 +534,8 @@
_log.trace("Releasing Message:" + debugIdentity());
}
-// _taken.set(false);
-// _takenBySubcription = null;
-
+ // _taken.set(false);
+ // _takenBySubcription = null;
synchronized (this)
{
@@ -568,6 +569,7 @@
else
{
_tokens.add(token);
+
return false;
}
}
@@ -629,6 +631,7 @@
{
pb = _messageHandle.getMessagePublishInfo(getStoreContext(),
_messageId);
}
+
return pb;
}
@@ -659,7 +662,7 @@
*/
public boolean expired(StoreContext storecontext, AMQQueue queue) throws
AMQException
{
- //note: If the storecontext isn't need then we can remove the
getChannel() from Subscription.
+ // note: If the storecontext isn't need then we can remove the
getChannel() from Subscription.
if (_expiration != 0L)
{
@@ -668,6 +671,7 @@
if (now > _expiration)
{
dequeue(storecontext, queue);
+
return true;
}
}
@@ -690,12 +694,13 @@
{
_log.debug("Delivering message " + debugIdentity() + " to " +
destinationQueues);
}
+
try
{
// first we allow the handle to know that the message has been
fully received. This is useful if it is
// maintaining any calculated values based on content chunks
- _messageHandle.setPublishAndContentHeaderBody(storeContext,
_messageId, _transientMessageData.getMessagePublishInfo(),
-
_transientMessageData.getContentHeaderBody());
+ _messageHandle.setPublishAndContentHeaderBody(storeContext,
_messageId,
+ _transientMessageData.getMessagePublishInfo(),
_transientMessageData.getContentHeaderBody());
// we then allow the transactional context to do something with
the message content
// now that it has all been received, before we attempt delivery
@@ -705,9 +710,9 @@
for (AMQQueue q : destinationQueues)
{
- //Increment the references to this message for each queue
delivery.
+ // Increment the references to this message for each queue
delivery.
incrementReference();
- //normal deliver so add this message at the end.
+ // normal deliver so add this message at the end.
_txnContext.deliver(this, q, false);
}
}
@@ -719,182 +724,181 @@
}
}
-/*
- public void writeDeliver(AMQProtocolSession protocolSession, int
channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- ByteBuffer deliver = createEncodedDeliverFrame(protocolSession,
channelId, deliveryTag, consumerTag);
- AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
-
getContentHeaderBody());
-
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),
_messageId);
- if (bodyCount == 0)
+ /*
+ public void writeDeliver(AMQProtocolSession protocolSession, int
channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new
SmallCompositeAMQDataBlock(deliver,
-
contentHeader);
+ ByteBuffer deliver = createEncodedDeliverFrame(protocolSession,
channelId, deliveryTag, consumerTag);
+ AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
+
getContentHeaderBody());
+
+ final int bodyCount =
_messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new
SmallCompositeAMQDataBlock(deliver,
+
contentHeader);
+
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+
+ //
+ // Optimise the case where we have a single content body. In
that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and
body with a single network writeDeliver.
+ //
+ ContentChunk cb =
_messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(),
_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
+
+
+ }
+
- protocolSession.writeFrame(compositeBlock);
}
- else
+
+ public void writeGetOk(AMQProtocolSession protocolSession, int
channelId, long deliveryTag, int queueSize) throws AMQException
{
+ ByteBuffer deliver = createEncodedGetOkFrame(protocolSession,
channelId, deliveryTag, queueSize);
+ AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
+
getContentHeaderBody());
+
+ final int bodyCount =
_messageHandle.getBodyCount(getStoreContext(), _messageId);
+ if (bodyCount == 0)
+ {
+ SmallCompositeAMQDataBlock compositeBlock = new
SmallCompositeAMQDataBlock(deliver,
+
contentHeader);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
- //
- // Optimise the case where we have a single content body. In that
case we create a composite block
- // so that we can writeDeliver out the deliver, header and body
with a single network writeDeliver.
- //
- ContentChunk cb =
_messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+ //
+ // Optimise the case where we have a single content body. In
that case we create a composite block
+ // so that we can writeDeliver out the deliver, header and
body with a single network writeDeliver.
+ //
+ ContentChunk cb =
_messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
+
+ AMQDataBlock firstContentBody = new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
+ AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(deliver, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+
+ //
+ // Now start writing out the other content bodies
+ //
+ for (int i = 1; i < bodyCount; i++)
+ {
+ cb = _messageHandle.getContentChunk(getStoreContext(),
_messageId, i);
+ protocolSession.writeFrame(new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ }
- AMQDataBlock firstContentBody = new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- //
- // Now start writing out the other content bodies
- //
- for (int i = 1; i < bodyCount; i++)
- {
- cb = _messageHandle.getContentChunk(getStoreContext(),
_messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
}
}
- }
-
- public void writeGetOk(AMQProtocolSession protocolSession, int channelId,
long deliveryTag, int queueSize) throws AMQException
- {
- ByteBuffer deliver = createEncodedGetOkFrame(protocolSession,
channelId, deliveryTag, queueSize);
- AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
-
getContentHeaderBody());
+ private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
+
deliveryTag, pb.getExchange(), _messageHandle.isRedelivered(),
+
pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int)
deliverFrame.getSize()); // XXX: Could cast be a problem?
+ deliverFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
+
+ private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ MessagePublishInfo pb = getMessagePublishInfo();
+ AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
+
protocolSession.getProtocolMajorVersion(),
+
protocolSession.getProtocolMinorVersion(),
+ deliveryTag,
pb.getExchange(),
+ queueSize,
+
_messageHandle.isRedelivered(),
+
pb.getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize());
// XXX: Could cast be a problem?
+ getOkFrame.writePayload(buf);
+ buf.flip();
+ return buf;
+ }
- final int bodyCount = _messageHandle.getBodyCount(getStoreContext(),
_messageId);
- if (bodyCount == 0)
+ private ByteBuffer createEncodedReturnFrame(AMQProtocolSession
protocolSession, int channelId, int replyCode, AMQShortString replyText) throws
AMQException
{
- SmallCompositeAMQDataBlock compositeBlock = new
SmallCompositeAMQDataBlock(deliver,
-
contentHeader);
- protocolSession.writeFrame(compositeBlock);
+ AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
+
protocolSession.getProtocolMajorVersion(),
+
protocolSession.getProtocolMinorVersion(),
+
getMessagePublishInfo().getExchange(),
+ replyCode,
replyText,
+
getMessagePublishInfo().getRoutingKey());
+ ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize());
// XXX: Could cast be a problem?
+ returnFrame.writePayload(buf);
+ buf.flip();
+ return buf;
}
- else
+
+ public void writeReturn(AMQProtocolSession protocolSession, int
channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
{
+ ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession,
channelId, replyCode, replyText);
+ AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
+
getContentHeaderBody());
+
+ Iterator<AMQDataBlock> bodyFrameIterator =
getBodyFrameIterator(protocolSession, channelId);
//
// Optimise the case where we have a single content body. In that
case we create a composite block
// so that we can writeDeliver out the deliver, header and body
with a single network writeDeliver.
//
- ContentChunk cb =
_messageHandle.getContentChunk(getStoreContext(), _messageId, 0);
-
- AMQDataBlock firstContentBody = new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb));
- AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(deliver, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
+ if (bodyFrameIterator.hasNext())
+ {
+ AMQDataBlock firstContentBody = bodyFrameIterator.next();
+ AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
+ CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
+ protocolSession.writeFrame(compositeBlock);
+ }
+ else
+ {
+ CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(returnFrame,
+
new AMQDataBlock[]{contentHeader});
+ protocolSession.writeFrame(compositeBlock);
+ }
//
// Now start writing out the other content bodies
+ // TODO: MINA needs to be fixed so the the pending writes buffer
is not unbounded
//
- for (int i = 1; i < bodyCount; i++)
+ while (bodyFrameIterator.hasNext())
{
- cb = _messageHandle.getContentChunk(getStoreContext(),
_messageId, i);
- protocolSession.writeFrame(new AMQFrame(channelId,
protocolSession.getRegistry().getProtocolVersionMethodConverter().convertToBody(cb)));
+ protocolSession.writeFrame(bodyFrameIterator.next());
}
-
-
- }
-
-
- }
-
-
- private ByteBuffer createEncodedDeliverFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, AMQShortString consumerTag)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channelId,
protocolSession.getProtocolMajorVersion(), (byte) 0, consumerTag,
- deliveryTag,
pb.getExchange(), _messageHandle.isRedelivered(),
-
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); //
XXX: Could cast be a problem?
- deliverFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedGetOkFrame(AMQProtocolSession
protocolSession, int channelId, long deliveryTag, int queueSize)
- throws AMQException
- {
- MessagePublishInfo pb = getMessagePublishInfo();
- AMQFrame getOkFrame = BasicGetOkBody.createAMQFrame(channelId,
-
protocolSession.getProtocolMajorVersion(),
-
protocolSession.getProtocolMinorVersion(),
- deliveryTag,
pb.getExchange(),
- queueSize,
-
_messageHandle.isRedelivered(),
-
pb.getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) getOkFrame.getSize()); //
XXX: Could cast be a problem?
- getOkFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- private ByteBuffer createEncodedReturnFrame(AMQProtocolSession
protocolSession, int channelId, int replyCode, AMQShortString replyText) throws
AMQException
- {
- AMQFrame returnFrame = BasicReturnBody.createAMQFrame(channelId,
-
protocolSession.getProtocolMajorVersion(),
-
protocolSession.getProtocolMinorVersion(),
-
getMessagePublishInfo().getExchange(),
- replyCode,
replyText,
-
getMessagePublishInfo().getRoutingKey());
- ByteBuffer buf = ByteBuffer.allocate((int) returnFrame.getSize()); //
XXX: Could cast be a problem?
- returnFrame.writePayload(buf);
- buf.flip();
- return buf;
- }
-
- public void writeReturn(AMQProtocolSession protocolSession, int channelId,
int replyCode, AMQShortString replyText)
- throws AMQException
- {
- ByteBuffer returnFrame = createEncodedReturnFrame(protocolSession,
channelId, replyCode, replyText);
-
- AMQDataBlock contentHeader =
ContentHeaderBody.createAMQFrame(channelId,
-
getContentHeaderBody());
-
- Iterator<AMQDataBlock> bodyFrameIterator =
getBodyFrameIterator(protocolSession, channelId);
- //
- // Optimise the case where we have a single content body. In that case
we create a composite block
- // so that we can writeDeliver out the deliver, header and body with a
single network writeDeliver.
- //
- if (bodyFrameIterator.hasNext())
- {
- AMQDataBlock firstContentBody = bodyFrameIterator.next();
- AMQDataBlock[] headerAndFirstContent = new
AMQDataBlock[]{contentHeader, firstContentBody};
- CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(returnFrame, headerAndFirstContent);
- protocolSession.writeFrame(compositeBlock);
- }
- else
- {
- CompositeAMQDataBlock compositeBlock = new
CompositeAMQDataBlock(returnFrame,
-
new AMQDataBlock[]{contentHeader});
- protocolSession.writeFrame(compositeBlock);
- }
-
- //
- // Now start writing out the other content bodies
- // TODO: MINA needs to be fixed so the the pending writes buffer is
not unbounded
- //
- while (bodyFrameIterator.hasNext())
- {
- protocolSession.writeFrame(bodyFrameIterator.next());
}
- }
-*/
+ */
public AMQMessageHandle getMessageHandle()
{
return _messageHandle;
}
-
public long getSize()
{
try
@@ -906,12 +910,12 @@
catch (AMQException e)
{
_log.error(e.toString(), e);
+
return 0;
}
}
-
public void restoreTransientMessageData() throws AMQException
{
TransientMessageData transientMessageData = new TransientMessageData();
@@ -921,25 +925,23 @@
_transientMessageData = transientMessageData;
}
-
public void clearTransientMessageData()
{
_transientMessageData = null;
}
-
public String toString()
{
-// return "Message[" + debugIdentity() + "]: " + _messageId + "; ref
count: " + _referenceCount + "; taken : " +
-// _taken + " by :" + _takenBySubcription;
+ // return "Message[" + debugIdentity() + "]: " + _messageId + "; ref
count: " + _referenceCount + "; taken : " +
+ // _taken + " by :" + _takenBySubcription;
- return "Message[" + debugIdentity() + "]: " + _messageId + "; ref
count: " + _referenceCount + "; taken for queues: " +
- _takenMap.toString() + " by Subs:" +
_takenBySubcriptionMap.toString();
+ return "Message[" + debugIdentity() + "]: " + _messageId + "; ref
count: " + _referenceCount + "; taken for queues: "
+ + _takenMap.toString() + " by Subs:" +
_takenBySubcriptionMap.toString();
}
public Subscription getDeliveredSubscription(AMQQueue queue)
{
-// return _takenBySubcription;
+ // return _takenBySubcription;
synchronized (this)
{
return _takenBySubcriptionMap.get(queue);
@@ -967,7 +969,7 @@
{
boolean rejected = _rejectedBy != null;
- if (rejected) // We have subscriptions that rejected this message
+ if (rejected) // We have subscriptions that rejected this message
{
return _rejectedBy.contains(subscription);
}
Modified:
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
---
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
(original)
+++
incubator/qpid/branches/M2/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
Mon Jul 2 07:17:45 2007
@@ -28,24 +28,144 @@
import org.apache.qpid.server.store.StoreContext;
/**
- * @author Robert Greig ([EMAIL PROTECTED])
+ * TransactionalContext provides a context in which transactional operations
on [EMAIL PROTECTED] AMQMessage}s are performed.
+ * Different levels of transactional support for the delivery of messages may
be provided by different implementations
+ * of this interface.
+ *
+ * <p/>The fundamental transactional operations that can be performed on a
message queue are 'enqueue' and 'dequeue'.
+ * In this interface, these have been recast as the [EMAIL PROTECTED]
#messageFullyReceived} and [EMAIL PROTECTED] #acknowledgeMessage}
+ * operations. This interface essentially provides a way to make enqueueing
and dequeuing transactional.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities
+ * <tr><td> Explicitly accept a transaction start notification.
+ * <tr><td> Commit all pending operations in a transaction.
+ * <tr><td> Rollback all pending operations in a transaction.
+ * <tr><td> Deliver a message to a queue as part of a transaction.
+ * <tr><td> Redeliver a message to a queue as part of a transaction.
+ * <tr><td> Mark a message as acknowledged as part of a transaction.
+ * <tr><td> Accept notification that a message has been completely received as
part of a transaction.
+ * <tr><td> Accept notification that a message has been fully processed as
part of a transaction.
+ * <tr><td> Associate a message store context with this transaction context.
+ * </table>
+ *
+ * @todo The 'fullyReceived' and 'messageProcessed' events sit uncomfortably
in the responsibilities of a transactional
+ * context. They are non-transactional operations, used to trigger other
side-effects. Consider moving them
+ * somewhere else, a seperate interface for example.
+ *
+ * @todo This transactional context could be written as a wrapper extension to
a Queue implementation, that provides
+ * transactional management of the enqueue and dequeue operations, with
added commit/rollback methods. Any
+ * queue implementation could be made transactional by wrapping it as a
transactional queue. This would mean
+ * that the enqueue/dequeue operations do not need to be recast as
deliver/acknowledge operations, which may be
+ * conceptually neater.
+ *
+ * For example:
+ * <pre>
+ * public interface Transactional
+ * {
+ * public void commit();
+ * public void rollback();
+ * }
+ *
+ * public interface TransactionalQueue<E> extends Transactional,
SizeableQueue<E>
+ * {}
+ *
+ * public class Queues
+ * {
+ * ...
+ * // For transactional messaging, take a transactional view onto the queue.
+ * public static <E> TransactionalQueue<E>
getTransactionalQueue(SizeableQueue<E> queue) { ... }
+ *
+ * // For non-transactional messaging, take a non-transactional view onto
the queue.
+ * public static <E> TransactionalQueue<E>
getNonTransactionalQueue(SizeableQueue<E> queue) { ... }
+ * }
+ * </pre>
*/
public interface TransactionalContext
{
+ /**
+ * Explicitly begins the transaction, if it has not already been started.
[EMAIL PROTECTED] #commit} or [EMAIL PROTECTED] #rollback}
+ * should automatically begin the next transaction in the chain.
+ *
+ * @throws AMQException If the transaction cannot be started for any
reason.
+ */
void beginTranIfNecessary() throws AMQException;
+ /**
+ * Makes all pending operations on the transaction permanent and visible.
+ *
+ * @throws AMQException If the transaction cannot be committed for any
reason.
+ */
void commit() throws AMQException;
+ /**
+ * Erases all pending operations on the transaction.
+ *
+ * @throws AMQException If the transaction cannot be committed for any
reason.
+ */
void rollback() throws AMQException;
+ /**
+ * Delivers the specified message to the specified queue. A 'deliverFirst'
flag may be set if the message is a
+ * redelivery, and should be placed on the front of the queue.
+ *
+ * <p/>This is an 'enqueue' operation.
+ *
+ * @param message The message to deliver.
+ * @param queue The queue to deliver the message to.
+ * @param deliverFirst <tt>true</tt> to place the message on the front of
the queue for redelivery, <tt>false</tt>
+ * for normal FIFO message ordering.
+ *
+ * @throws AMQException If the message cannot be delivered for any reason.
+ */
void deliver(AMQMessage message, AMQQueue queue, boolean deliverFirst)
throws AMQException;
+ /**
+ * Acknowledges a message or many messages as delivered. All messages up
to a specified one, may be acknowledged by
+ * setting the 'multiple' flag. It is also possible for the acknowledged
message id to be zero, when the 'multiple'
+ * flag is set, in which case an acknowledgement up to the latest
delivered message should be done.
+ *
+ * <p/>This is a 'dequeue' operation.
+ *
+ * @param deliveryTag The id of the message to acknowledge,
or zero, if using multiple acknowledgement
+ * up to the latest message.
+ * @param lastDeliveryTag The latest message delivered.
+ * @param multiple <tt>true</tt> if all message ids up the
acknowledged one or latest delivered, are
+ * to be acknowledged, <tt>false</tt>
otherwise.
+ * @param unacknowledgedMessageMap The unacknowledged messages in the
transaction, to remove the acknowledged message
+ * from.
+ *
+ * @throws AMQException If the message cannot be acknowledged for any
reason.
+ */
void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean
multiple,
- UnacknowledgedMessageMap unacknowledgedMessageMap)
throws AMQException;
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been fully
received. The actual message that was received
+ * is not specified. This event may be used to trigger a process related
to the receipt of the message, for example,
+ * flushing its data to disk.
+ *
+ * @param persistent <tt>true</tt> if the received message is persistent,
<tt>false</tt> otherwise.
+ *
+ * @throws AMQException If the fully received event cannot be processed
for any reason.
+ */
void messageFullyReceived(boolean persistent) throws AMQException;
+ /**
+ * Notifies the transactional context that a message has been delivered,
succesfully or otherwise. The actual
+ * message that was delivered is not specified. This event may be used to
trigger a process related to the
+ * outcome of the delivery of the message, for example, cleaning up failed
deliveries.
+ *
+ * @param protocolSession The protocol session of the deliverable message.
+ *
+ * @throws AMQException If the message processed event cannot be handled
for any reason.
+ */
void messageProcessed(AMQProtocolSession protocolSession) throws
AMQException;
+ /**
+ * Gets the message store context associated with this transactional
context.
+ *
+ * @return The message store context associated with this transactional
context.
+ */
StoreContext getStoreContext();
}
Modified: incubator/qpid/branches/M2/java/perftests/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/perftests/pom.xml?view=diff&rev=552499&r1=552498&r2=552499
==============================================================================
--- incubator/qpid/branches/M2/java/perftests/pom.xml (original)
+++ incubator/qpid/branches/M2/java/perftests/pom.xml Mon Jul 2 07:17:45 2007
@@ -201,8 +201,8 @@
<TQC-Qpid-02>-n TQC-Qpid-02 -d1M -s[1000]
-c[1,30],samples=30 -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=false commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=1 rate=1000 maxPending=1000000 </TQC-Qpid-02>
<TQC-Qpid-03>-n TQC-Qpid-03 -d10M -s[1000]
-c[10] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=true commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-03>
<TQC-Qpid-04>-n TQC-Qpid-04 -d10M -s[1000]
-c[10] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=false commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=1000000 </TQC-Qpid-04>
- <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000]
-c[100] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=true commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05>
- <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000]
-c[100] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=false commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06>
+ <TQC-Qpid-05>-n TQC-Qpid-05 -d10M -s[1000]
-c[100] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=true commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-05>
+ <TQC-Qpid-06>-n TQC-Qpid-06 -d10M -s[1000]
-c[100] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=false commitBatchSize=100 batchSize=1000 messageSize=256
destinationsCount=10 rate=0 maxPending=100000 </TQC-Qpid-06>
<TQM-Qpid-01-512b>-n TQM-Qpid-01-512b -d10M -s[1000]
-c[1] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=true commitBatchSize=10 batchSize=1000 messageSize=512
destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-01-512b>
<TQM-Qpid-02-512b>-n TQM-Qpid-02-512b -d10M -s[1000]
-c[1] -o $QPID_WORK/results -t testAsyncPingOk
org.apache.qpid.ping.PingAsyncTestPerf persistent=false pubsub=false
transacted=false commitBatchSize=10 batchSize=1000 messageSize=512
destinationsCount=1 rate=0 maxPending=20000000</TQM-Qpid-02-512b>