Author: steshaw
Date: Tue Nov 28 21:51:43 2006
New Revision: 480423
URL: http://svn.apache.org/viewvc?view=rev&rev=480423
Log:
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs Tue Nov 28
21:51:43 2006
@@ -385,6 +385,11 @@
{
return new HeapByteBuffer(bytes, length);
}
+
+ public static HeapByteBuffer wrap(byte[] bytes)
+ {
+ return new HeapByteBuffer(bytes, bytes.Length);
+ }
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
Tue Nov 28 21:51:43 2006
@@ -35,12 +35,12 @@
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
- const int NUM_ROLLEDBACK_MESSAGES = 5;
+ const int NUM_ROLLEDBACK_MESSAGES = 3;
const int SLEEP_MILLIS = 500;
AMQConnection _connection;
- public void onMessage(IMessage message)
+ public void OnMessage(IMessage message)
{
try
{
@@ -48,7 +48,40 @@
}
catch (QpidException e)
{
- error(e);
+ Error(e);
+ }
+ }
+
+ class NoWaitConsumer
+ {
+ FailoverTxTest _failoverTxTest;
+ IMessageConsumer _consumer;
+
+ internal NoWaitConsumer(FailoverTxTest failoverTxTest,
IMessageConsumer channel)
+ {
+ _failoverTxTest = failoverTxTest;
+ _consumer = channel;
+ }
+
+ internal void Run()
+ {
+ int messages = 0;
+ while (messages < NUM_COMMITED_MESSAGES)
+ {
+ IMessage msg = _consumer.ReceiveNoWait();
+ if (msg != null)
+ {
+ _log.Info("NoWait received message");
+ ++messages;
+ _failoverTxTest.OnMessage(msg);
+ }
+ else
+ {
+ Thread.Sleep(1);
+ }
+
+ }
+
}
}
@@ -60,7 +93,7 @@
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.asUrl = " + _connection.toURL());
- IChannel receivingChannel = _connection.CreateChannel(false,
AcknowledgeMode.NoAcknowledge);
+ IChannel receivingChannel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge);
string queueName = receivingChannel.GenerateUniqueName();
@@ -70,11 +103,23 @@
// No need to call Queue.Bind as automatically bound to default
direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
-
receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new
MessageReceivedDelegate(onMessage);
+
+ IMessageConsumer consumer =
receivingChannel.CreateConsumerBuilder(queueName).Create();
+ bool useThread = true;
+ if (useThread)
+ {
+ NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this,
consumer);
+ new Thread(noWaitConsumer.Run).Start();
+ }
+ else
+ {
+
//receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new
MessageReceivedDelegate(onMessage);
+ consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+ }
_connection.Start();
- publishInTx(queueName);
+ PublishInTx(queueName);
Thread.Sleep(2000); // Wait a while for last messages.
@@ -82,7 +127,7 @@
_log.Info("FailoverTxText complete");
}
- private void publishInTx(string routingKey)
+ private void PublishInTx(string routingKey)
{
_log.Info("sendInTx");
bool transacted = true;
@@ -113,13 +158,13 @@
}
}
- private void error(Exception e)
+ private void Error(Exception e)
{
_log.Fatal("Exception received. About to stop.", e);
- stop();
+ Stop();
}
- private void stop()
+ private void Stop()
{
_log.Info("Stopping...");
try
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue
Nov 28 21:51:43 2006
@@ -265,11 +265,11 @@
int _prefetch;
AMQConnection _connection;
- public CreateChannelFailoverSupport(AMQConnection connection, bool
transacted, AcknowledgeMode acknowledgementMode, int prefetch)
+ public CreateChannelFailoverSupport(AMQConnection connection, bool
transacted, AcknowledgeMode acknowledgeMode, int prefetch)
{
_connection = connection;
_transacted = transacted;
- _acknowledgeMode = acknowledgementMode;
+ _acknowledgeMode = acknowledgeMode;
_prefetch = prefetch;
}
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov
28 21:51:43 2006
@@ -124,7 +124,7 @@
}
else
{
- consumer.NotifyMessage(message,
_containingChannel.AcknowledgeMode, _containingChannel.ChannelId);
+ consumer.NotifyMessage(message,
_containingChannel.ChannelId);
}
}
else
@@ -595,22 +595,6 @@
}
}
- /// <summary>
- /// Send an acknowledgement for all messages up to a specified number
on this session.
- /// <param name="messageNbr">the message number up to an including
which all messages will be acknowledged.</param>
- /// </summary>
- public void SendAcknowledgement(ulong messageNbr)
- {
- /*if (_logger.IsDebugEnabled)
- {
- _logger.Debug("Channel Ack being sent for channel id " +
_channelId + " and message number " + messageNbr);
- }*/
- /*Channel.Ack frame = new Channel.Ack();
- frame.channelId = _channelId;
- frame.messageNbr = messageNbr;
- _connection.getProtocolHandler().writeFrame(frame);*/
- }
-
internal void Start()
{
_dispatcher = new Dispatcher(this);
@@ -815,7 +799,7 @@
currentTime = DateTime.UtcNow.Ticks;
message.Timestamp = currentTime;
}
- byte[] payload = message.Data;
+ byte[] payload = message.Data.ToByteArray();
BasicContentHeaderProperties contentHeaderProperties =
message.ContentHeaderProperties;
if (timeToLive > 0)
@@ -986,10 +970,10 @@
* @param multiple if true will acknowledge all messages up to and
including the one specified by the
* delivery tag
*/
- public void AcknowledgeMessage(long deliveryTag, bool multiple)
+ public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
- // XXX: cast to ulong evil?
- AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId,
(ulong)deliveryTag, multiple);
+ AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId,
deliveryTag, multiple);
+ _logger.Info("XXX sending ack: " + ackFrame);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag +
" on channel " + _channelId);
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Tue Nov 28 21:51:43 2006
@@ -50,15 +50,18 @@
set { _noLocal = value; }
}
- AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge;
-
public AcknowledgeMode AcknowledgeMode
{
- get { return _acknowledgeMode; }
+ get { return _channel.AcknowledgeMode; }
}
private MessageReceivedDelegate _messageListener;
+ private bool IsMessageListenerSet
+ {
+ get { return _messageListener != null; }
+ }
+
/// <summary>
/// The consumer tag allows us to close the consumer by sending a
jmsCancel method to the
/// broker
@@ -173,12 +176,7 @@
o = _synchronousQueue.DequeueBlocking();
}
- IMessage m = ReturnMessageOrThrow(o);
- if (m != null)
- {
- PostDeliver(m);
- }
- return m;
+ return ReturnMessageOrThrowAndPostDeliver(o);
}
finally
{
@@ -189,6 +187,16 @@
}
}
+ private IMessage ReturnMessageOrThrowAndPostDeliver(object o)
+ {
+ IMessage m = ReturnMessageOrThrow(o);
+ if (m != null)
+ {
+ PostDeliver(m);
+ }
+ return m;
+ }
+
public IMessage Receive()
{
return Receive(0);
@@ -211,8 +219,14 @@
try
{
- object o = _synchronousQueue.Dequeue();
- return ReturnMessageOrThrow(o);
+ if (_synchronousQueue.Count > 0)
+ {
+ return
ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
+ }
+ else
+ {
+ return null;
+ }
}
finally
{
@@ -285,14 +299,73 @@
}
}
- /// <summary>
- /// Called from the AmqChannel when a message has arrived for this
consumer. This methods handles both the case
- /// of a message listener or a synchronous receive() caller.
- /// </summary>
- /// <param name="messageFrame">the raw unprocessed mesage</param>
- /// <param name="acknowledgeMode">the acknowledge mode requested for
this message</param>
- /// <param name="channelId">channel on which this message was
sent</param>
- internal void NotifyMessage(UnprocessedMessage messageFrame,
AcknowledgeMode acknowledgeMode, ushort channelId)
+// /// <summary>
+// /// Called from the AmqChannel when a message has arrived for this
consumer. This methods handles both the case
+// /// of a message listener or a synchronous receive() caller.
+// /// </summary>
+// /// <param name="messageFrame">the raw unprocessed mesage</param>
+// /// <param name="acknowledgeMode">the acknowledge mode requested for
this message</param>
+// /// <param name="channelId">channel on which this message was
sent</param>
+// internal void NotifyMessage(UnprocessedMessage messageFrame,
AcknowledgeMode acknowledgeMode, ushort channelId)
+// {
+// _logger.Info("XXX notifyMessage called with message number " +
messageFrame.DeliverBody.DeliveryTag);
+// if (_logger.IsDebugEnabled)
+// {
+// _logger.Debug("notifyMessage called with message number " +
messageFrame.DeliverBody.DeliveryTag);
+// }
+// try
+// {
+// AbstractQmsMessage jmsMessage =
_messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
+//
messageFrame.DeliverBody.Redelivered,
+//
messageFrame.ContentHeader,
+//
messageFrame.Bodies);
+
+// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
+// {
+//
_channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
+// }*/
+// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+// {
+// // we set the session so that when the user calls
acknowledge() it can call the method on session
+// // to send out the appropriate frame
+// jmsMessage.Channel = _channel;
+// }
+
+// lock (_syncLock)
+// {
+// if (_messageListener != null)
+// {
+//#if __MonoCS__
+// _messageListener(jmsMessage);
+//#else
+// _messageListener.Invoke(jmsMessage);
+//#endif
+// }
+// else
+// {
+// _synchronousQueue.Enqueue(jmsMessage);
+// }
+// }
+// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+// {
+//
_channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+// }
+// }
+// catch (Exception e)
+// {
+// _logger.Error("Caught exception (dump follows) -
ignoring...", e);
+// }
+// }
+
+
+ /**
+ * Called from the AMQSession when a message has arrived for this
consumer. This methods handles both the case
+ * of a message listener or a synchronous receive() caller.
+ *
+ * @param messageFrame the raw unprocessed mesage
+ * @param channelId channel on which this message was sent
+ */
+ internal void NotifyMessage(UnprocessedMessage messageFrame, int
channelId)
{
if (_logger.IsDebugEnabled)
{
@@ -300,48 +373,38 @@
}
try
{
- AbstractQmsMessage jmsMessage =
_messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag,
+ AbstractQmsMessage jmsMessage =
_messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
messageFrame.DeliverBody.Redelivered,
messageFrame.ContentHeader,
messageFrame.Bodies);
- /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
- {
-
_channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
- }*/
- if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
- {
- // we set the session so that when the user calls
acknowledge() it can call the method on session
- // to send out the appropriate frame
- jmsMessage.Channel = _channel;
- }
+ _logger.Debug("Message is of type: " +
jmsMessage.GetType().Name);
- lock (_syncLock)
+ PreDeliver(jmsMessage);
+
+ if (IsMessageListenerSet)
{
- if (_messageListener != null)
- {
+ // We do not need a lock around the test above, and the
dispatch below as it is invalid
+ // for an application to alter an installed listener while
the session is started.
#if __MonoCS__
_messageListener(jmsMessage);
#else
- _messageListener.Invoke(jmsMessage);
+ _messageListener.Invoke(jmsMessage);
#endif
- }
- else
- {
- _synchronousQueue.Enqueue(jmsMessage);
- }
+ PostDeliver(jmsMessage);
}
- if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+ else
{
-
_channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+ _synchronousQueue.Enqueue(jmsMessage);
}
}
catch (Exception e)
{
- _logger.Error("Caught exception (dump follows) - ignoring...",
e);
+ _logger.Error("Caught exception (dump follows) - ignoring...",
e); // FIXME
}
}
+
internal void NotifyError(Exception cause)
{
lock (_syncLock)
@@ -416,15 +479,32 @@
{
if (_lastDeliveryTag > 0)
{
- _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+ _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); //
XXX evil cast
_lastDeliveryTag = -1;
}
}
+ private void PreDeliver(AbstractQmsMessage msg)
+ {
+ switch (AcknowledgeMode)
+ {
+ case AcknowledgeMode.PreAcknowledge:
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false);
+ break;
+
+ case AcknowledgeMode.ClientAcknowledge:
+ // We set the session so that when the user calls
acknowledge() it can call the method on session
+ // to send out the appropriate frame.
+ //msg.setAMQSession(_session);
+ msg.Channel = _channel;
+ break;
+ }
+ }
+
private void PostDeliver(IMessage m)
{
AbstractQmsMessage msg = (AbstractQmsMessage) m;
- switch (_acknowledgeMode)
+ switch (AcknowledgeMode)
{
/* TODO
case AcknowledgeMode.DupsOkAcknowledge:
@@ -444,7 +524,7 @@
break;
*/
case AcknowledgeMode.AutoAcknowledge:
- _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
break;
case AcknowledgeMode.SessionTransacted:
_lastDeliveryTag = msg.DeliveryTag;
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
Tue Nov 28 21:51:43 2006
@@ -39,7 +39,8 @@
_deliveryTag = deliveryTag;
}
- public AMQMessage(IContentHeaderProperties properties) :
this(properties, -1)
+ public AMQMessage(IContentHeaderProperties properties)
+ : this(properties, -1)
{
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
Tue Nov 28 21:51:43 2006
@@ -20,30 +20,73 @@
*/
using System.Collections;
using Qpid.Framing;
+using log4net;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public abstract class AbstractQmsMessageFactory : IMessageFactory
{
- public AbstractQmsMessage CreateMessage(ulong messageNbr, bool
redelivered, ContentHeaderBody contentHeader, IList bodies)
+ //public AbstractQmsMessage CreateMessage(long messageNbr, bool
redelivered, ContentHeaderBody contentHeader, IList bodies)
+ //{
+ // AbstractQmsMessage msg = CreateMessageWithBody(messageNbr,
contentHeader, bodies);
+ // msg.Redelivered = redelivered;
+ // return msg;
+ //}
+
+ public abstract AbstractQmsMessage CreateMessage();
+
+ ///// <summary>
+ /////
+ ///// </summary>
+ ///// <param name="messageNbr"></param>
+ ///// <param name="contentHeader"></param>
+ ///// <param name="bodies"></param>
+ ///// <returns></returns>
+ ///// <exception cref="AMQException"></exception>
+ //protected abstract AbstractQmsMessage CreateMessageWithBody(long
messageNbr,
+ //
ContentHeaderBody contentHeader,
+ // IList
bodies);
+
+ private static readonly ILog _logger = LogManager.GetLogger(typeof
(AbstractQmsMessageFactory));
+
+ protected abstract AbstractQmsMessage CreateMessage(long messageNbr,
ByteBuffer data, ContentHeaderBody contentHeader);
+
+ protected AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+ ContentHeaderBody
contentHeader,
+ IList bodies)
+ {
+ ByteBuffer data;
+
+ // we optimise the non-fragmented case to avoid copying
+ if (bodies != null && bodies.Count == 1)
+ {
+ _logger.Debug("Non-fragmented message body (bodySize=" +
contentHeader.BodySize +")");
+ data = HeapByteBuffer.wrap(((ContentBody)bodies[0]).Payload);
+ }
+ else
+ {
+ _logger.Debug("Fragmented message body (" + bodies.Count + "
frames, bodySize=" + contentHeader.BodySize + ")");
+ data = ByteBuffer.Allocate((int)contentHeader.BodySize); //
XXX: Is cast a problem?
+ foreach (ContentBody body in bodies) {
+ data.Put(body.Payload);
+ //body.Payload.Release();
+ }
+
+ data.Flip();
+ }
+ _logger.Debug("Creating message from buffer with position=" +
data.Position + " and remaining=" + data.Remaining);
+
+ return CreateMessage(messageNbr, data, contentHeader);
+ }
+
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool
redelivered,
+ ContentHeaderBody
contentHeader,
+ IList bodies)
{
AbstractQmsMessage msg = CreateMessageWithBody(messageNbr,
contentHeader, bodies);
msg.Redelivered = redelivered;
return msg;
}
-
- public abstract AbstractQmsMessage CreateMessage();
-
- /// <summary>
- ///
- /// </summary>
- /// <param name="messageNbr"></param>
- /// <param name="contentHeader"></param>
- /// <param name="bodies"></param>
- /// <returns></returns>
- /// <exception cref="AMQException"></exception>
- protected abstract AbstractQmsMessage CreateMessageWithBody(ulong
messageNbr,
-
ContentHeaderBody contentHeader,
- IList
bodies);
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
Tue Nov 28 21:51:43 2006
@@ -24,67 +24,71 @@
using log4net;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
- public class SendOnlyDestination : AMQDestination
+ public abstract class AbstractQmsMessage : AMQMessage, IMessage
{
- private static readonly ILog _log =
LogManager.GetLogger(typeof(string));
+ private static readonly ILog _log =
LogManager.GetLogger(typeof(AbstractQmsMessage));
- public SendOnlyDestination(string exchangeName, string routingKey)
- : base(exchangeName, null, null, false, false, routingKey)
- {
- _log.Debug(
- string.Format("Creating SendOnlyDestination with
exchangeName={0} and routingKey={1}",
- exchangeName, routingKey));
- }
+// protected long _messageNbr;
- public override string EncodedName
- {
- get { return ExchangeName + ":" + QueueName; }
- }
+ protected bool _redelivered;
- public override string RoutingKey
- {
- get { return QueueName; }
- }
+ protected ByteBuffer _data;
- public override bool IsNameRequired
- {
- get { throw new NotImplementedException(); }
- }
- }
+ //protected AbstractQmsMessage() : base(new
BasicContentHeaderProperties())
+ //{
+ //}
- public abstract class AbstractQmsMessage : AMQMessage, IMessage
- {
- private static readonly ILog _log =
LogManager.GetLogger(typeof(AbstractQmsMessage));
+ //protected AbstractQmsMessage(ulong messageNbr,
BasicContentHeaderProperties contentHeader)
+ // : this(contentHeader)
+ //{
+ // _messageNbr = messageNbr;
+ //}
- protected ulong _messageNbr;
+ //protected AbstractQmsMessage(BasicContentHeaderProperties
contentHeader)
+ // : base(contentHeader)
+ //{
+ //}
- protected bool _redelivered;
- protected AbstractQmsMessage() : base(new
BasicContentHeaderProperties())
- {
+#region new_java_ctrs
+
+ protected AbstractQmsMessage(ByteBuffer data)
+ : base(new BasicContentHeaderProperties())
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(ulong messageNbr,
BasicContentHeaderProperties contentHeader)
- : this(contentHeader)
- {
- _messageNbr = messageNbr;
+ protected AbstractQmsMessage(long deliveryTag,
BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ : this(contentHeader, deliveryTag)
+ {
+ _data = data;
+ if (_data != null)
+ {
+ _data.Acquire();
+ }
}
- protected AbstractQmsMessage(BasicContentHeaderProperties
contentHeader)
- : base(contentHeader)
- {
+ protected AbstractQmsMessage(BasicContentHeaderProperties
contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+ {
}
+#endregion
+
public string MessageId
{
get
{
if (ContentHeaderProperties.MessageId == null)
{
- ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+ ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
}
return ContentHeaderProperties.MessageId;
}
@@ -92,6 +96,8 @@
{
ContentHeaderProperties.MessageId = value;
}
+
+
}
public long Timestamp
@@ -321,8 +327,11 @@
// is not specified. In our case, we only set the session field
where client acknowledge mode is specified.
if (_channel != null)
{
- _channel.SendAcknowledgement(_messageNbr);
+ // we set multiple to true here since acknowledgement implies
acknowledge of all previous messages
+ // received on the session
+ _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
}
+
}
public IHeaders Headers
@@ -344,10 +353,23 @@
/// the message.
/// </summary>
/// <value>a byte array of message data</value>
- public abstract byte[] Data
+ public ByteBuffer Data
{
- get;
- set;
+ get
+ {
+ // make sure we rewind the data just in case any method has
moved the
+ // position beyond the start
+ if (_data != null)
+ {
+ _data.Rewind();
+ }
+ return _data;
+ }
+
+ set
+ {
+ _data = value;
+ }
}
public abstract string MimeType
@@ -367,7 +389,7 @@
buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
buf.Append("\nReplyToExchangeName:
").Append(ReplyToExchangeName);
buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
- buf.Append("\nAMQ message number: ").Append(_messageNbr);
+ buf.Append("\nAMQ message number: ").Append(DeliveryTag);
buf.Append("\nProperties:");
if (ContentHeaderProperties.Headers == null)
{
@@ -430,17 +452,17 @@
/// Get the AMQ message number assigned to this message
/// </summary>
/// <returns>the message number</returns>
- public ulong MessageNbr
- {
- get
- {
- return _messageNbr;
- }
- set
- {
- _messageNbr = value;
- }
- }
+ //public ulong MessageNbr
+ //{
+ // get
+ // {
+ // return _messageNbr;
+ // }
+ // set
+ // {
+ // _messageNbr = value;
+ // }
+ //}
public BasicContentHeaderProperties ContentHeaderProperties
{
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
Tue Nov 28 21:51:43 2006
@@ -34,7 +34,7 @@
/// <param name="bodies"></param>
/// <returns></returns>
/// <exception cref="QpidMessagingException">if the message cannot be
created</exception>
- AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+ AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
ContentHeaderBody contentHeader,
IList bodies);
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
Tue Nov 28 21:51:43 2006
@@ -58,7 +58,7 @@
/// <returns>the message.</returns>
/// <exception cref="AMQException"/>
/// <exception cref="QpidException"/>
- public AbstractQmsMessage CreateMessage(ulong messageNbr, bool
redelivered,
+ public AbstractQmsMessage CreateMessage(long messageNbr, bool
redelivered,
ContentHeaderBody
contentHeader,
IList bodies)
{
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
Tue Nov 28 21:51:43 2006
@@ -23,6 +23,7 @@
using System.Text;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
@@ -50,7 +51,7 @@
/// </summary>
/// <param name="data">if data is not null, the message is immediately
in read only mode. if data is null, it is in
/// write-only mode</param>
- QpidBytesMessage(byte[] data) : base()
+ QpidBytesMessage(ByteBuffer data) : base(data)
{
// superclass constructor has instantiated a content header at
this point
ContentHeaderProperties.ContentType = MIME_TYPE;
@@ -61,22 +62,23 @@
}
else
{
- _dataStream = new MemoryStream(data);
- _bodyLength = data.Length;
+ _dataStream = new MemoryStream(data.ToByteArray());
+ _bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
}
- public QpidBytesMessage(ulong messageNbr, byte[] data,
ContentHeaderBody contentHeader)
+ internal QpidBytesMessage(long messageNbr, ContentHeaderBody
contentHeader, ByteBuffer data)
// TODO: this casting is ugly. Need to review whole
ContentHeaderBody idea
- : base(messageNbr, (BasicContentHeaderProperties)
contentHeader.Properties)
- {
+ : base(messageNbr,
(BasicContentHeaderProperties)contentHeader.Properties, data)
+ {
ContentHeaderProperties.ContentType = MIME_TYPE;
- _dataStream = new MemoryStream(data);
- _bodyLength = data.Length;
+ _dataStream = new MemoryStream(data.ToByteArray());
+ _bodyLength = data.ToByteArray().Length;
_reader = new BinaryReader(_dataStream);
}
+
public override void ClearBody()
{
if (_reader != null)
@@ -119,27 +121,27 @@
}
}
- public override byte[] Data
- {
- get
- {
- if (_dataStream == null)
- {
- return null;
- }
- else
- {
- byte[] data = new byte[_dataStream.Length];
- _dataStream.Position = 0;
- _dataStream.Read(data, 0, (int) _dataStream.Length);
- return data;
- }
- }
- set
- {
- throw new NotSupportedException("Cannot set data payload
except during construction");
- }
- }
+ //public override byte[] Data
+ //{
+ // get
+ // {
+ // if (_dataStream == null)
+ // {
+ // return null;
+ // }
+ // else
+ // {
+ // byte[] data = new byte[_dataStream.Length];
+ // _dataStream.Position = 0;
+ // _dataStream.Read(data, 0, (int) _dataStream.Length);
+ // return data;
+ // }
+ // }
+ // set
+ // {
+ // throw new NotSupportedException("Cannot set data payload
except during construction");
+ // }
+ //}
public override string MimeType
{
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
Tue Nov 28 21:51:43 2006
@@ -21,40 +21,52 @@
using System;
using System.Collections;
using Qpid.Framing;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public class QpidBytesMessageFactory : AbstractQmsMessageFactory
{
- protected override AbstractQmsMessage CreateMessageWithBody(ulong
messageNbr,
-
ContentHeaderBody contentHeader,
- IList
bodies)
- {
- byte[] data;
+ //protected override AbstractQmsMessage CreateMessageWithBody(long
messageNbr,
+ //
ContentHeaderBody contentHeader,
+ // IList
bodies)
+ //{
+ // byte[] data;
+
+ // // we optimise the non-fragmented case to avoid copying
+ // if (bodies != null && bodies.Count == 1)
+ // {
+ // data = ((ContentBody)bodies[0]).Payload;
+ // }
+ // else
+ // {
+ // data = new byte[(long)contentHeader.BodySize];
+ // int currentPosition = 0;
+ // foreach (ContentBody cb in bodies)
+ // {
+ // Array.Copy(cb.Payload, 0, data, currentPosition,
cb.Payload.Length);
+ // currentPosition += cb.Payload.Length;
+ // }
+ // }
+
+ // return new QpidBytesMessage(messageNbr, data, contentHeader);
+ //}
- // we optimise the non-fragmented case to avoid copying
- if (bodies != null && bodies.Count == 1)
- {
- data = ((ContentBody)bodies[0]).Payload;
- }
- else
- {
- data = new byte[(long)contentHeader.BodySize];
- int currentPosition = 0;
- foreach (ContentBody cb in bodies)
- {
- Array.Copy(cb.Payload, 0, data, currentPosition,
cb.Payload.Length);
- currentPosition += cb.Payload.Length;
- }
- }
+ //public override AbstractQmsMessage CreateMessage()
+ //{
+ // return new QpidBytesMessage();
+ //}
- return new QpidBytesMessage(messageNbr, data, contentHeader);
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag,
ByteBuffer data, ContentHeaderBody contentHeader)
+ {
+ return new QpidBytesMessage(deliveryTag, contentHeader, data);
}
public override AbstractQmsMessage CreateMessage()
{
return new QpidBytesMessage();
}
+
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
Tue Nov 28 21:51:43 2006
@@ -22,6 +22,7 @@
using System.Text;
using Qpid.Framing;
using Qpid.Messaging;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
@@ -29,34 +30,58 @@
{
private const string MIME_TYPE = "text/plain";
- private byte[] _data;
-
private string _decodedValue;
- public QpidTextMessage() : this(null, null)
- {
+ //public QpidTextMessage() : this(null, null)
+ //{
+ //}
+
+ //public QpidTextMessage(byte[] data, String encoding) : base()
+ //{
+ // // the superclass has instantied a content header at this point
+ // ContentHeaderProperties.ContentType= MIME_TYPE;
+ // _data = data;
+ // ContentHeaderProperties.Encoding = encoding;
+ //}
+
+ //public QpidTextMessage(ulong messageNbr, byte[] data,
BasicContentHeaderProperties contentHeader)
+ // : base(messageNbr, contentHeader)
+ //{
+ // contentHeader.ContentType = MIME_TYPE;
+ // _data = data;
+ //}
+
+ //public QpidTextMessage(byte[] data) : this(data, null)
+ //{
+ //}
+
+ //public QpidTextMessage(string text)
+ //{
+ // Text = text;
+ //}
+
+ internal QpidTextMessage() : this(null, null)
+ {
}
- public QpidTextMessage(byte[] data, String encoding) : base()
+ QpidTextMessage(ByteBuffer data, String encoding) : base(data)
{
- // the superclass has instantied a content header at this point
- ContentHeaderProperties.ContentType= MIME_TYPE;
- _data = data;
+ ContentHeaderProperties.ContentType = MIME_TYPE;
ContentHeaderProperties.Encoding = encoding;
}
- public QpidTextMessage(ulong messageNbr, byte[] data,
BasicContentHeaderProperties contentHeader)
- : base(messageNbr, contentHeader)
- {
+ internal QpidTextMessage(long deliveryTag,
BasicContentHeaderProperties contentHeader, ByteBuffer data)
+ :base(deliveryTag, contentHeader, data)
+ {
contentHeader.ContentType = MIME_TYPE;
_data = data;
}
- public QpidTextMessage(byte[] data) : this(data, null)
- {
+ QpidTextMessage(ByteBuffer data) : this(data, null)
+ {
}
- public QpidTextMessage(string text)
+ QpidTextMessage(String text) : base((ByteBuffer)null)
{
Text = text;
}
@@ -72,18 +97,6 @@
return Text;
}
- public override byte[] Data
- {
- get
- {
- return _data;
- }
- set
- {
- _data = value;
- }
- }
-
public override string MimeType
{
get
@@ -109,27 +122,29 @@
if (ContentHeaderProperties.Encoding != null)
{
// throw ArgumentException if the encoding is not
supported
- _decodedValue =
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data);
+ _decodedValue =
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data.ToByteArray());
}
else
{
- _decodedValue = Encoding.Default.GetString(_data);
+ _decodedValue =
Encoding.Default.GetString(_data.ToByteArray());
}
return _decodedValue;
}
}
set
- {
+ {
+ byte[] bytes;
if (ContentHeaderProperties.Encoding == null)
{
- _data = Encoding.Default.GetBytes(value);
+ bytes = Encoding.Default.GetBytes(value);
}
else
{
// throw ArgumentException if the encoding is not supported
- _data =
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+ bytes =
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
}
+ _data = HeapByteBuffer.wrap(bytes, bytes.Length);
_decodedValue = value;
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
Tue Nov 28 21:51:43 2006
@@ -21,40 +21,55 @@
using System;
using System.Collections;
using Qpid.Framing;
+using Qpid.Buffer;
namespace Qpid.Client.Message
{
public class QpidTextMessageFactory : AbstractQmsMessageFactory
{
- protected override AbstractQmsMessage CreateMessageWithBody(ulong
messageNbr, ContentHeaderBody contentHeader,
- IList
bodies)
- {
- byte[] data;
- // we optimise the non-fragmented case to avoid copying
- if (bodies != null && bodies.Count == 1)
- {
- data = ((ContentBody)bodies[0]).Payload;
- }
- else
- {
- data = new byte[(int)contentHeader.BodySize];
- int currentPosition = 0;
- foreach (ContentBody cb in bodies)
- {
- Array.Copy(cb.Payload, 0, data, currentPosition,
cb.Payload.Length);
- currentPosition += cb.Payload.Length;
- }
- }
+ // protected override AbstractQmsMessage CreateMessageWithBody(long
messageNbr, ContentHeaderBody contentHeader,
+ // IList
bodies)
+ // {
+ // byte[] data;
- return new QpidTextMessage(messageNbr, data,
(BasicContentHeaderProperties)contentHeader.Properties);
- }
+ // // we optimise the non-fragmented case to avoid copying
+ // if (bodies != null && bodies.Count == 1)
+ // {
+ // data = ((ContentBody)bodies[0]).Payload;
+ // }
+ // else
+ // {
+ // data = new byte[(int)contentHeader.BodySize];
+ // int currentPosition = 0;
+ // foreach (ContentBody cb in bodies)
+ // {
+ // Array.Copy(cb.Payload, 0, data, currentPosition,
cb.Payload.Length);
+ // currentPosition += cb.Payload.Length;
+ // }
+ // }
+
+ // return new QpidTextMessage(messageNbr, data,
(BasicContentHeaderProperties)contentHeader.Properties);
+ // }
+ // public override AbstractQmsMessage CreateMessage()
+ // {
+ // return new QpidTextMessage();
+ // }
+
+
+
public override AbstractQmsMessage CreateMessage()
{
return new QpidTextMessage();
- }
+ }
+
+ protected override AbstractQmsMessage CreateMessage(long deliveryTag,
ByteBuffer data, ContentHeaderBody contentHeader)
+ {
+ return new QpidTextMessage(deliveryTag,
(BasicContentHeaderProperties) contentHeader.Properties, data);
+ }
+
}
}