Author: steshaw
Date: Tue Nov 28 21:51:43 2006
New Revision: 480423

URL: http://svn.apache.org/viewvc?view=rev&rev=480423
Log:
QPID-137. First stab at porting enough to get AutoAcknowledge mode working.

Modified:
    incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
    incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
    
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Buffer/HeapByteBuffer.cs Tue Nov 28 
21:51:43 2006
@@ -385,6 +385,11 @@
         {
             return new HeapByteBuffer(bytes, length);
         }
+
+        public static HeapByteBuffer wrap(byte[] bytes)
+        {
+            return new HeapByteBuffer(bytes, bytes.Length);
+        }
     }
 }
 

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs 
Tue Nov 28 21:51:43 2006
@@ -35,12 +35,12 @@
 
         const int NUM_ITERATIONS = 10;
         const int NUM_COMMITED_MESSAGES = 10;
-        const int NUM_ROLLEDBACK_MESSAGES = 5;
+        const int NUM_ROLLEDBACK_MESSAGES = 3;
         const int SLEEP_MILLIS = 500;
 
         AMQConnection _connection;
 
-        public void onMessage(IMessage message)
+        public void OnMessage(IMessage message)
         {
             try
             {
@@ -48,7 +48,40 @@
             }
             catch (QpidException e)
             {
-               error(e);
+               Error(e);
+            }
+        }
+
+        class NoWaitConsumer
+        {
+            FailoverTxTest _failoverTxTest;
+            IMessageConsumer _consumer;
+
+            internal NoWaitConsumer(FailoverTxTest failoverTxTest, 
IMessageConsumer channel)
+            {
+                _failoverTxTest = failoverTxTest;
+                _consumer = channel;
+            }
+
+            internal void Run()
+            {
+                int messages = 0;
+                while (messages < NUM_COMMITED_MESSAGES)
+                {
+                    IMessage msg = _consumer.ReceiveNoWait();
+                    if (msg != null)
+                    {
+                        _log.Info("NoWait received message");
+                        ++messages;
+                        _failoverTxTest.OnMessage(msg);
+                    }
+                    else
+                    {
+                        Thread.Sleep(1);
+                    }
+
+                }
+
             }
         }
 
@@ -60,7 +93,7 @@
             _log.Info("connectionInfo = " + connectionInfo);
             _log.Info("connection.asUrl = " + _connection.toURL());
 
-            IChannel receivingChannel = _connection.CreateChannel(false, 
AcknowledgeMode.NoAcknowledge);
+            IChannel receivingChannel = _connection.CreateChannel(false, 
AcknowledgeMode.AutoAcknowledge);
 
             string queueName = receivingChannel.GenerateUniqueName();
 
@@ -70,11 +103,23 @@
             // No need to call Queue.Bind as automatically bound to default 
direct exchange.
             receivingChannel.Bind(queueName, "amq.direct", queueName);
 
-            
receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
+
+            IMessageConsumer consumer = 
receivingChannel.CreateConsumerBuilder(queueName).Create();
+            bool useThread = true;
+            if (useThread)
+            {
+                NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this, 
consumer);
+                new Thread(noWaitConsumer.Run).Start();
+            }
+            else
+            {
+                
//receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new 
MessageReceivedDelegate(onMessage);
+                consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
+            }
 
             _connection.Start();
 
-            publishInTx(queueName);
+            PublishInTx(queueName);
 
             Thread.Sleep(2000); // Wait a while for last messages.
 
@@ -82,7 +127,7 @@
             _log.Info("FailoverTxText complete");
         }
 
-        private void publishInTx(string routingKey)
+        private void PublishInTx(string routingKey)
         {
             _log.Info("sendInTx");
             bool transacted = true;
@@ -113,13 +158,13 @@
             }
         }
 
-        private void error(Exception e)
+        private void Error(Exception e)
         {
             _log.Fatal("Exception received. About to stop.", e);
-            stop();
+            Stop();
         }
 
-        private void stop()
+        private void Stop()
         {
             _log.Info("Stopping...");
             try

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AMQConnection.cs Tue 
Nov 28 21:51:43 2006
@@ -265,11 +265,11 @@
             int _prefetch;
             AMQConnection _connection;
             
-            public CreateChannelFailoverSupport(AMQConnection connection, bool 
transacted, AcknowledgeMode acknowledgementMode, int prefetch)
+            public CreateChannelFailoverSupport(AMQConnection connection, bool 
transacted, AcknowledgeMode acknowledgeMode, int prefetch)
             {
                 _connection = connection;
                 _transacted = transacted;
-                _acknowledgeMode = acknowledgementMode;
+                _acknowledgeMode = acknowledgeMode;
                 _prefetch = prefetch;
             }
 

Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Tue Nov 
28 21:51:43 2006
@@ -124,7 +124,7 @@
                     }
                     else
                     {
-                        consumer.NotifyMessage(message, 
_containingChannel.AcknowledgeMode, _containingChannel.ChannelId);
+                        consumer.NotifyMessage(message, 
_containingChannel.ChannelId);
                     }
                 }
                 else
@@ -595,22 +595,6 @@
             }
         }
         
-        /// <summary>
-        /// Send an acknowledgement for all messages up to a specified number 
on this session.
-        /// <param name="messageNbr">the message number up to an including 
which all messages will be acknowledged.</param>
-        /// </summary>
-        public void SendAcknowledgement(ulong messageNbr)
-        {
-            /*if (_logger.IsDebugEnabled)
-            {
-                _logger.Debug("Channel Ack being sent for channel id " + 
_channelId + " and message number " + messageNbr);
-            }*/
-            /*Channel.Ack frame = new Channel.Ack();
-            frame.channelId = _channelId;
-            frame.messageNbr = messageNbr;
-            _connection.getProtocolHandler().writeFrame(frame);*/
-        }
-
         internal void Start()
         {
             _dispatcher = new Dispatcher(this);
@@ -815,7 +799,7 @@
                 currentTime = DateTime.UtcNow.Ticks;
                 message.Timestamp = currentTime;
             }
-            byte[] payload = message.Data;
+            byte[] payload = message.Data.ToByteArray();
             BasicContentHeaderProperties contentHeaderProperties = 
message.ContentHeaderProperties;
 
             if (timeToLive > 0)
@@ -986,10 +970,10 @@
          * @param multiple    if true will acknowledge all messages up to and 
including the one specified by the
          *                    delivery tag
          */
-        public void AcknowledgeMessage(long deliveryTag, bool multiple)
+        public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
         {
-            // XXX: cast to ulong evil?
-            AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, 
(ulong)deliveryTag, multiple);
+            AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, 
deliveryTag, multiple);
+            _logger.Info("XXX sending ack: " + ackFrame);
             if (_logger.IsDebugEnabled)
             {
                 _logger.Debug("Sending ack for delivery tag " + deliveryTag + 
" on channel " + _channelId);

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs 
Tue Nov 28 21:51:43 2006
@@ -50,15 +50,18 @@
             set { _noLocal = value; }
         }
 
-        AcknowledgeMode _acknowledgeMode = AcknowledgeMode.NoAcknowledge;
-
         public AcknowledgeMode AcknowledgeMode
         {
-            get { return _acknowledgeMode; }
+            get { return _channel.AcknowledgeMode; }
         }
 
         private MessageReceivedDelegate _messageListener;
 
+        private bool IsMessageListenerSet
+        {
+            get { return _messageListener != null; }
+        }
+
         /// <summary>
         /// The consumer tag allows us to close the consumer by sending a 
jmsCancel method to the
         /// broker
@@ -173,12 +176,7 @@
                     o = _synchronousQueue.DequeueBlocking();
                 }
 
-                IMessage m = ReturnMessageOrThrow(o);
-                if (m != null)
-                {
-                    PostDeliver(m);
-                }
-                return m;
+                return ReturnMessageOrThrowAndPostDeliver(o);
             }
             finally
             {
@@ -189,6 +187,16 @@
             }
         }
 
+        private IMessage ReturnMessageOrThrowAndPostDeliver(object o)
+        {
+            IMessage m = ReturnMessageOrThrow(o);
+            if (m != null)
+            {
+                PostDeliver(m);
+            }
+            return m;
+        }
+
         public IMessage Receive()
         {
             return Receive(0);
@@ -211,8 +219,14 @@
 
             try
             {
-                object o = _synchronousQueue.Dequeue();
-                return ReturnMessageOrThrow(o);
+                if (_synchronousQueue.Count > 0)
+                {
+                    return 
ReturnMessageOrThrowAndPostDeliver(_synchronousQueue.Dequeue());
+                }
+                else
+                {
+                    return null;
+                }
             }
             finally
             {
@@ -285,14 +299,73 @@
             }
         }
 
-        /// <summary>
-        /// Called from the AmqChannel when a message has arrived for this 
consumer. This methods handles both the case
-        /// of a message listener or a synchronous receive() caller.
-        /// </summary>
-        /// <param name="messageFrame">the raw unprocessed mesage</param>
-        /// <param name="acknowledgeMode">the acknowledge mode requested for 
this message</param>
-        /// <param name="channelId">channel on which this message was 
sent</param>       
-        internal void NotifyMessage(UnprocessedMessage messageFrame, 
AcknowledgeMode acknowledgeMode, ushort channelId)
+//        /// <summary>
+//        /// Called from the AmqChannel when a message has arrived for this 
consumer. This methods handles both the case
+//        /// of a message listener or a synchronous receive() caller.
+//        /// </summary>
+//        /// <param name="messageFrame">the raw unprocessed mesage</param>
+//        /// <param name="acknowledgeMode">the acknowledge mode requested for 
this message</param>
+//        /// <param name="channelId">channel on which this message was 
sent</param>       
+//        internal void NotifyMessage(UnprocessedMessage messageFrame, 
AcknowledgeMode acknowledgeMode, ushort channelId)
+//        {
+//            _logger.Info("XXX notifyMessage called with message number " + 
messageFrame.DeliverBody.DeliveryTag);
+//            if (_logger.IsDebugEnabled)
+//            {
+//                _logger.Debug("notifyMessage called with message number " + 
messageFrame.DeliverBody.DeliveryTag);
+//            }
+//            try
+//            {
+//                AbstractQmsMessage jmsMessage = 
_messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
+//                                                                             
 messageFrame.DeliverBody.Redelivered,
+//                                                                             
 messageFrame.ContentHeader,
+//                                                                             
 messageFrame.Bodies);
+
+//                /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
+//                {
+//                    
_channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
+//                }*/
+//                if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+//                {
+//                    // we set the session so that when the user calls 
acknowledge() it can call the method on session
+//                    // to send out the appropriate frame
+//                    jmsMessage.Channel = _channel;
+//                }
+
+//                lock (_syncLock)
+//                {
+//                    if (_messageListener != null)
+//                    {
+//#if __MonoCS__
+//                        _messageListener(jmsMessage);
+//#else
+//                        _messageListener.Invoke(jmsMessage);
+//#endif
+//                    }
+//                    else
+//                    {
+//                        _synchronousQueue.Enqueue(jmsMessage);
+//                    }
+//                }
+//                if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+//                {
+//                    
_channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+//                }
+//            }
+//            catch (Exception e)
+//            {
+//                _logger.Error("Caught exception (dump follows) - 
ignoring...", e);
+//            }
+//        }
+
+
+        /**
+         * Called from the AMQSession when a message has arrived for this 
consumer. This methods handles both the case
+         * of a message listener or a synchronous receive() caller.
+         *
+         * @param messageFrame the raw unprocessed mesage
+         * @param channelId    channel on which this message was sent
+         */
+        internal void NotifyMessage(UnprocessedMessage messageFrame, int 
channelId)
         {
             if (_logger.IsDebugEnabled)
             {
@@ -300,48 +373,38 @@
             }
             try
             {
-                AbstractQmsMessage jmsMessage = 
_messageFactory.CreateMessage(messageFrame.DeliverBody.DeliveryTag,
+                AbstractQmsMessage jmsMessage = 
_messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
                                                                               
messageFrame.DeliverBody.Redelivered,
                                                                               
messageFrame.ContentHeader,
                                                                               
messageFrame.Bodies);
 
-                /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
-                {
-                    
_channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
-                }*/
-                if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-                {
-                    // we set the session so that when the user calls 
acknowledge() it can call the method on session
-                    // to send out the appropriate frame
-                    jmsMessage.Channel = _channel;
-                }
+                _logger.Debug("Message is of type: " + 
jmsMessage.GetType().Name);
 
-                lock (_syncLock)
+                PreDeliver(jmsMessage);
+
+                if (IsMessageListenerSet)
                 {
-                    if (_messageListener != null)
-                    {
+                    // We do not need a lock around the test above, and the 
dispatch below as it is invalid
+                    // for an application to alter an installed listener while 
the session is started.
 #if __MonoCS__
                         _messageListener(jmsMessage);
 #else
-                        _messageListener.Invoke(jmsMessage);
+                    _messageListener.Invoke(jmsMessage);
 #endif
-                    }
-                    else
-                    {
-                        _synchronousQueue.Enqueue(jmsMessage);
-                    }
+                    PostDeliver(jmsMessage);
                 }
-                if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
+                else
                 {
-                    
_channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
+                    _synchronousQueue.Enqueue(jmsMessage);
                 }
             }
             catch (Exception e)
             {
-                _logger.Error("Caught exception (dump follows) - ignoring...", 
e);
+                _logger.Error("Caught exception (dump follows) - ignoring...", 
e); // FIXME
             }
         }
 
+
         internal void NotifyError(Exception cause)
         {
             lock (_syncLock)
@@ -416,15 +479,32 @@
         {
             if (_lastDeliveryTag > 0)
             {
-                _channel.AcknowledgeMessage(_lastDeliveryTag, true);
+                _channel.AcknowledgeMessage((ulong)_lastDeliveryTag, true); // 
XXX evil cast
                 _lastDeliveryTag = -1;
             }
         }
 
+        private void PreDeliver(AbstractQmsMessage msg)
+        {
+            switch (AcknowledgeMode)
+            {
+                case AcknowledgeMode.PreAcknowledge:
+                    _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, false);
+                    break;
+
+                case AcknowledgeMode.ClientAcknowledge:
+                    // We set the session so that when the user calls 
acknowledge() it can call the method on session
+                    // to send out the appropriate frame.
+                    //msg.setAMQSession(_session);
+                    msg.Channel = _channel;
+                    break;
+            }
+        }
+
         private void PostDeliver(IMessage m)
         {
             AbstractQmsMessage msg = (AbstractQmsMessage) m;
-            switch (_acknowledgeMode)
+            switch (AcknowledgeMode)
             {
 /* TODO
                 case AcknowledgeMode.DupsOkAcknowledge:
@@ -444,7 +524,7 @@
                     break;
  */
                 case AcknowledgeMode.AutoAcknowledge:
-                    _channel.AcknowledgeMessage(msg.DeliveryTag, false);
+                    _channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
                     break;
                 case AcknowledgeMode.SessionTransacted:
                     _lastDeliveryTag = msg.DeliveryTag;

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs 
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessage.cs 
Tue Nov 28 21:51:43 2006
@@ -39,7 +39,8 @@
             _deliveryTag = deliveryTag;
         }
 
-        public AMQMessage(IContentHeaderProperties properties) : 
this(properties, -1)
+        public AMQMessage(IContentHeaderProperties properties)
+            : this(properties, -1)
         {
         }
 

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
 (original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AMQMessageFactory.cs
 Tue Nov 28 21:51:43 2006
@@ -20,30 +20,73 @@
  */
 using System.Collections;
 using Qpid.Framing;
+using log4net;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
     public abstract class AbstractQmsMessageFactory : IMessageFactory
     {
-        public AbstractQmsMessage CreateMessage(ulong messageNbr, bool 
redelivered, ContentHeaderBody contentHeader, IList bodies)
+        //public AbstractQmsMessage CreateMessage(long messageNbr, bool 
redelivered, ContentHeaderBody contentHeader, IList bodies)
+        //{
+        //    AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, 
contentHeader, bodies);
+        //    msg.Redelivered = redelivered;
+        //    return msg;
+        //}
+
+        public abstract AbstractQmsMessage CreateMessage();
+
+        ///// <summary>
+        ///// 
+        ///// </summary>
+        ///// <param name="messageNbr"></param>
+        ///// <param name="contentHeader"></param>
+        ///// <param name="bodies"></param>
+        ///// <returns></returns>
+        ///// <exception cref="AMQException"></exception>
+        //protected abstract AbstractQmsMessage CreateMessageWithBody(long 
messageNbr,
+        //                                                            
ContentHeaderBody contentHeader,
+        //                                                            IList 
bodies);
+
+        private static readonly ILog _logger = LogManager.GetLogger(typeof 
(AbstractQmsMessageFactory));
+
+        protected abstract AbstractQmsMessage CreateMessage(long messageNbr, 
ByteBuffer data, ContentHeaderBody contentHeader);
+
+        protected AbstractQmsMessage CreateMessageWithBody(long messageNbr,
+                                                           ContentHeaderBody 
contentHeader,
+                                                           IList bodies)
+        {
+            ByteBuffer data;
+
+            // we optimise the non-fragmented case to avoid copying
+            if (bodies != null && bodies.Count == 1)
+            {
+                _logger.Debug("Non-fragmented message body (bodySize=" + 
contentHeader.BodySize +")");
+                data = HeapByteBuffer.wrap(((ContentBody)bodies[0]).Payload);
+            }
+            else
+            {
+                _logger.Debug("Fragmented message body (" + bodies.Count + " 
frames, bodySize=" + contentHeader.BodySize + ")");
+                data = ByteBuffer.Allocate((int)contentHeader.BodySize); // 
XXX: Is cast a problem?
+                foreach (ContentBody body in bodies) {
+                    data.Put(body.Payload);
+                    //body.Payload.Release();
+                }
+
+                data.Flip();
+            }
+            _logger.Debug("Creating message from buffer with position=" + 
data.Position + " and remaining=" + data.Remaining);
+
+            return CreateMessage(messageNbr, data, contentHeader);
+        }
+
+        public AbstractQmsMessage CreateMessage(long messageNbr, bool 
redelivered,
+                                                ContentHeaderBody 
contentHeader,
+                                                IList bodies)
         {
             AbstractQmsMessage msg = CreateMessageWithBody(messageNbr, 
contentHeader, bodies);
             msg.Redelivered = redelivered;
             return msg;
         }
-
-        public abstract AbstractQmsMessage CreateMessage();
-
-        /// <summary>
-        /// 
-        /// </summary>
-        /// <param name="messageNbr"></param>
-        /// <param name="contentHeader"></param>
-        /// <param name="bodies"></param>
-        /// <returns></returns>
-        /// <exception cref="AMQException"></exception>
-        protected abstract AbstractQmsMessage CreateMessageWithBody(ulong 
messageNbr,
-                                                                    
ContentHeaderBody contentHeader,
-                                                                    IList 
bodies);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
 (original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/AbstractQmsMessage.cs
 Tue Nov 28 21:51:43 2006
@@ -24,67 +24,71 @@
 using log4net;
 using Qpid.Framing;
 using Qpid.Messaging;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
-    public class SendOnlyDestination : AMQDestination
+    public abstract class AbstractQmsMessage : AMQMessage, IMessage
     {
-        private static readonly ILog _log = 
LogManager.GetLogger(typeof(string));
+        private static readonly ILog _log = 
LogManager.GetLogger(typeof(AbstractQmsMessage));
 
-        public SendOnlyDestination(string exchangeName, string routingKey)
-            : base(exchangeName, null, null, false, false, routingKey)
-        {
-            _log.Debug(
-                string.Format("Creating SendOnlyDestination with 
exchangeName={0} and routingKey={1}",
-                    exchangeName, routingKey));
-        }
+//        protected long _messageNbr;
 
-        public override string EncodedName
-        {
-            get { return ExchangeName + ":" + QueueName; }
-        }
+        protected bool _redelivered;
 
-        public override string RoutingKey
-        {
-            get { return QueueName; }
-        }
+        protected ByteBuffer _data;
 
-        public override bool IsNameRequired
-        {
-            get { throw new NotImplementedException(); }
-        }
-    }
+        //protected AbstractQmsMessage() : base(new 
BasicContentHeaderProperties())
+        //{           
+        //}
 
-    public abstract class AbstractQmsMessage : AMQMessage, IMessage
-    {
-        private static readonly ILog _log = 
LogManager.GetLogger(typeof(AbstractQmsMessage));
+        //protected AbstractQmsMessage(ulong messageNbr, 
BasicContentHeaderProperties contentHeader)
+        //    : this(contentHeader)
+        //{            
+        //    _messageNbr = messageNbr;
+        //}
 
-        protected ulong _messageNbr;
+        //protected AbstractQmsMessage(BasicContentHeaderProperties 
contentHeader) 
+        //    : base(contentHeader)
+        //{            
+        //}
 
-        protected bool _redelivered;
 
-        protected AbstractQmsMessage() : base(new 
BasicContentHeaderProperties())
-        {           
+#region new_java_ctrs
+
+        protected AbstractQmsMessage(ByteBuffer data)
+            : base(new BasicContentHeaderProperties())
+        {
+            _data = data;
+            if (_data != null)
+            {
+                _data.Acquire();
+            }
         }
 
-        protected AbstractQmsMessage(ulong messageNbr, 
BasicContentHeaderProperties contentHeader)
-            : this(contentHeader)
-        {            
-            _messageNbr = messageNbr;
+        protected AbstractQmsMessage(long deliveryTag, 
BasicContentHeaderProperties contentHeader, ByteBuffer data)
+            : this(contentHeader, deliveryTag)
+        {
+            _data = data;
+            if (_data != null)
+            {
+                _data.Acquire();
+            }
         }
 
-        protected AbstractQmsMessage(BasicContentHeaderProperties 
contentHeader) 
-            : base(contentHeader)
-        {            
+        protected AbstractQmsMessage(BasicContentHeaderProperties 
contentHeader, long deliveryTag) : base(contentHeader, deliveryTag)
+        {
         }
 
+#endregion
+
         public string MessageId
         {
             get
             {
                 if (ContentHeaderProperties.MessageId == null)
                 {
-                    ContentHeaderProperties.MessageId = "ID:" + _messageNbr;
+                    ContentHeaderProperties.MessageId = "ID:" + DeliveryTag;
                 }
                 return ContentHeaderProperties.MessageId;
             }
@@ -92,6 +96,8 @@
             {
                 ContentHeaderProperties.MessageId = value;
             }
+
+
         }        
 
         public long Timestamp
@@ -321,8 +327,11 @@
             // is not specified. In our case, we only set the session field 
where client acknowledge mode is specified.
             if (_channel != null)
             {
-                _channel.SendAcknowledgement(_messageNbr);
+                // we set multiple to true here since acknowledgement implies 
acknowledge of all previous messages
+                // received on the session
+                _channel.AcknowledgeMessage((ulong)DeliveryTag, true);
             }
+
         }
 
         public IHeaders Headers
@@ -344,10 +353,23 @@
         /// the message.
         /// </summary>
         /// <value>a byte array of message data</value>                
-        public abstract byte[] Data
+        public ByteBuffer Data
         {
-            get;
-            set;
+            get
+            {
+                // make sure we rewind the data just in case any method has 
moved the
+                // position beyond the start
+                if (_data != null)
+                {
+                    _data.Rewind();
+                }
+                return _data;
+            }
+
+            set
+            {
+                _data = value;
+            }
         }
 
         public abstract string MimeType
@@ -367,7 +389,7 @@
                 buf.Append("\nQmsDeliveryMode: ").Append(DeliveryMode);
                 buf.Append("\nReplyToExchangeName: 
").Append(ReplyToExchangeName);
                 buf.Append("\nReplyToRoutingKey: ").Append(ReplyToRoutingKey);
-                buf.Append("\nAMQ message number: ").Append(_messageNbr);
+                buf.Append("\nAMQ message number: ").Append(DeliveryTag);
                 buf.Append("\nProperties:");
                 if (ContentHeaderProperties.Headers == null)
                 {
@@ -430,17 +452,17 @@
         /// Get the AMQ message number assigned to this message
         /// </summary>
         /// <returns>the message number</returns>
-        public ulong MessageNbr
-        {
-            get
-            {
-                return _messageNbr;
-            }
-            set
-            {
-                _messageNbr = value;
-            }
-        }        
+        //public ulong MessageNbr
+        //{
+        //    get
+        //    {
+        //        return _messageNbr;
+        //    }
+        //    set
+        //    {
+        //        _messageNbr = value;
+        //    }
+        //}        
 
         public BasicContentHeaderProperties ContentHeaderProperties
         {

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/IMessageFactory.cs 
Tue Nov 28 21:51:43 2006
@@ -34,7 +34,7 @@
         /// <param name="bodies"></param>
         /// <returns></returns>
         /// <exception cref="QpidMessagingException">if the message cannot be 
created</exception>
-        AbstractQmsMessage CreateMessage(ulong messageNbr, bool redelivered,
+        AbstractQmsMessage CreateMessage(long deliverTag, bool redelivered,
                                          ContentHeaderBody contentHeader,
                                          IList bodies);
 

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
 (original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/MessageFactoryRegistry.cs
 Tue Nov 28 21:51:43 2006
@@ -58,7 +58,7 @@
         /// <returns>the message.</returns>
         /// <exception cref="AMQException"/>
         /// <exception cref="QpidException"/>
-        public AbstractQmsMessage CreateMessage(ulong messageNbr, bool 
redelivered,
+        public AbstractQmsMessage CreateMessage(long messageNbr, bool 
redelivered,
                                                 ContentHeaderBody 
contentHeader,
                                                 IList bodies)
         {

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessage.cs 
Tue Nov 28 21:51:43 2006
@@ -23,6 +23,7 @@
 using System.Text;
 using Qpid.Framing;
 using Qpid.Messaging;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
@@ -50,7 +51,7 @@
         /// </summary>
         /// <param name="data">if data is not null, the message is immediately 
in read only mode. if data is null, it is in
         /// write-only mode</param>        
-        QpidBytesMessage(byte[] data) : base()
+        QpidBytesMessage(ByteBuffer data) : base(data)
         {
             // superclass constructor has instantiated a content header at 
this point
             ContentHeaderProperties.ContentType = MIME_TYPE;
@@ -61,22 +62,23 @@
             }
             else
             {
-                _dataStream = new MemoryStream(data);
-                _bodyLength = data.Length;
+                _dataStream = new MemoryStream(data.ToByteArray());
+                _bodyLength = data.ToByteArray().Length;
                 _reader = new BinaryReader(_dataStream);
             }
         }
 
-        public QpidBytesMessage(ulong messageNbr, byte[] data, 
ContentHeaderBody contentHeader)
+        internal QpidBytesMessage(long messageNbr, ContentHeaderBody 
contentHeader, ByteBuffer data)
             // TODO: this casting is ugly. Need to review whole 
ContentHeaderBody idea
-            : base(messageNbr, (BasicContentHeaderProperties) 
contentHeader.Properties)
-        {                        
+            : base(messageNbr, 
(BasicContentHeaderProperties)contentHeader.Properties, data)
+        {
             ContentHeaderProperties.ContentType = MIME_TYPE;
-            _dataStream = new MemoryStream(data);
-            _bodyLength = data.Length;
+            _dataStream = new MemoryStream(data.ToByteArray());
+            _bodyLength = data.ToByteArray().Length;
             _reader = new BinaryReader(_dataStream);
         }
 
+
         public override void ClearBody()
         {
             if (_reader != null)
@@ -119,27 +121,27 @@
             }
         }
 
-        public override byte[] Data
-        {
-            get
-            {
-                if (_dataStream == null)
-                {
-                    return null;
-                }
-                else
-                {
-                    byte[] data = new byte[_dataStream.Length];
-                    _dataStream.Position = 0;
-                    _dataStream.Read(data, 0, (int) _dataStream.Length);
-                    return data;
-                }
-            }
-            set
-            {
-                throw new NotSupportedException("Cannot set data payload 
except during construction");
-            }
-        }
+        //public override byte[] Data
+        //{
+        //    get
+        //    {
+        //        if (_dataStream == null)
+        //        {
+        //            return null;
+        //        }
+        //        else
+        //        {
+        //            byte[] data = new byte[_dataStream.Length];
+        //            _dataStream.Position = 0;
+        //            _dataStream.Read(data, 0, (int) _dataStream.Length);
+        //            return data;
+        //        }
+        //    }
+        //    set
+        //    {
+        //        throw new NotSupportedException("Cannot set data payload 
except during construction");
+        //    }
+        //}
 
         public override string MimeType
         {

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
 (original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidBytesMessageFactory.cs
 Tue Nov 28 21:51:43 2006
@@ -21,40 +21,52 @@
 using System;
 using System.Collections;
 using Qpid.Framing;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
     public class QpidBytesMessageFactory : AbstractQmsMessageFactory
     {
-        protected override AbstractQmsMessage CreateMessageWithBody(ulong 
messageNbr,
-                                                                    
ContentHeaderBody contentHeader,
-                                                                    IList 
bodies)
-        {
-            byte[] data;
+        //protected override AbstractQmsMessage CreateMessageWithBody(long 
messageNbr,
+        //                                                            
ContentHeaderBody contentHeader,
+        //                                                            IList 
bodies)
+        //{
+        //    byte[] data;
+
+        //    // we optimise the non-fragmented case to avoid copying
+        //    if (bodies != null && bodies.Count == 1)
+        //    {
+        //        data = ((ContentBody)bodies[0]).Payload;
+        //    }
+        //    else
+        //    {
+        //        data = new byte[(long)contentHeader.BodySize];
+        //        int currentPosition = 0;
+        //        foreach (ContentBody cb in bodies)
+        //        {
+        //            Array.Copy(cb.Payload, 0, data, currentPosition, 
cb.Payload.Length);
+        //            currentPosition += cb.Payload.Length;
+        //        }
+        //    }
+
+        //    return new QpidBytesMessage(messageNbr, data, contentHeader);
+        //}
 
-            // we optimise the non-fragmented case to avoid copying
-            if (bodies != null && bodies.Count == 1)
-            {
-                data = ((ContentBody)bodies[0]).Payload;
-            }
-            else
-            {
-                data = new byte[(long)contentHeader.BodySize];
-                int currentPosition = 0;
-                foreach (ContentBody cb in bodies)
-                {
-                    Array.Copy(cb.Payload, 0, data, currentPosition, 
cb.Payload.Length);
-                    currentPosition += cb.Payload.Length;
-                }
-            }
+        //public override AbstractQmsMessage CreateMessage()
+        //{
+        //    return new QpidBytesMessage();
+        //}
 
-            return new QpidBytesMessage(messageNbr, data, contentHeader);
+        protected override AbstractQmsMessage CreateMessage(long deliveryTag, 
ByteBuffer data, ContentHeaderBody contentHeader)
+        {
+            return new QpidBytesMessage(deliveryTag, contentHeader, data);
         }
 
         public override AbstractQmsMessage CreateMessage()
         {
             return new QpidBytesMessage();
         }
+
     }
 }
 

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs 
(original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessage.cs 
Tue Nov 28 21:51:43 2006
@@ -22,6 +22,7 @@
 using System.Text;
 using Qpid.Framing;
 using Qpid.Messaging;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
@@ -29,34 +30,58 @@
     {
         private const string MIME_TYPE = "text/plain";
 
-        private byte[] _data;
-
         private string _decodedValue;
 
-        public QpidTextMessage() : this(null, null)
-        {        
+        //public QpidTextMessage() : this(null, null)
+        //{        
+        //}
+
+        //public QpidTextMessage(byte[] data, String encoding) : base()
+        //{
+        //    // the superclass has instantied a content header at this point
+        //    ContentHeaderProperties.ContentType= MIME_TYPE;
+        //    _data = data;
+        //    ContentHeaderProperties.Encoding = encoding;
+        //}
+
+        //public QpidTextMessage(ulong messageNbr, byte[] data, 
BasicContentHeaderProperties contentHeader)
+        //    : base(messageNbr, contentHeader)
+        //{            
+        //    contentHeader.ContentType = MIME_TYPE;
+        //    _data = data;
+        //}
+
+        //public QpidTextMessage(byte[] data) : this(data, null)
+        //{            
+        //}
+
+        //public QpidTextMessage(string text)
+        //{
+        //    Text = text;
+        //}
+
+        internal QpidTextMessage() : this(null, null)
+        {
         }
 
-        public QpidTextMessage(byte[] data, String encoding) : base()
+        QpidTextMessage(ByteBuffer data, String encoding) : base(data)
         {
-            // the superclass has instantied a content header at this point
-            ContentHeaderProperties.ContentType= MIME_TYPE;
-            _data = data;
+            ContentHeaderProperties.ContentType = MIME_TYPE;
             ContentHeaderProperties.Encoding = encoding;
         }
 
-        public QpidTextMessage(ulong messageNbr, byte[] data, 
BasicContentHeaderProperties contentHeader)
-            : base(messageNbr, contentHeader)
-        {            
+        internal QpidTextMessage(long deliveryTag, 
BasicContentHeaderProperties contentHeader, ByteBuffer data)
+            :base(deliveryTag, contentHeader, data)
+        {
             contentHeader.ContentType = MIME_TYPE;
             _data = data;
         }
 
-        public QpidTextMessage(byte[] data) : this(data, null)
-        {            
+        QpidTextMessage(ByteBuffer data) : this(data, null)
+        {
         }
 
-        public QpidTextMessage(string text)
+        QpidTextMessage(String text) : base((ByteBuffer)null)
         {
             Text = text;
         }
@@ -72,18 +97,6 @@
             return Text;
         }
 
-        public override byte[] Data
-        {
-            get
-            {
-                return _data;
-            }
-            set
-            {
-                _data = value;
-            }
-        }
-
         public override string MimeType
         {
             get
@@ -109,27 +122,29 @@
                     if (ContentHeaderProperties.Encoding != null)
                     {
                         // throw ArgumentException if the encoding is not 
supported
-                        _decodedValue = 
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data);
+                        _decodedValue = 
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetString(_data.ToByteArray());
                     }
                     else
                     {
-                        _decodedValue = Encoding.Default.GetString(_data);
+                        _decodedValue = 
Encoding.Default.GetString(_data.ToByteArray());
                     }
                     return _decodedValue;                    
                 }
             }
 
             set
-            {            
+            {
+                byte[] bytes;
                 if (ContentHeaderProperties.Encoding == null)
                 {
-                    _data = Encoding.Default.GetBytes(value);
+                    bytes = Encoding.Default.GetBytes(value);
                 }
                 else
                 {
                     // throw ArgumentException if the encoding is not supported
-                    _data = 
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
+                    bytes = 
Encoding.GetEncoding(ContentHeaderProperties.Encoding).GetBytes(value);
                 }
+                _data = HeapByteBuffer.wrap(bytes, bytes.Length);
                 _decodedValue = value;
             }
         }

Modified: 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs?view=diff&rev=480423&r1=480422&r2=480423
==============================================================================
--- 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
 (original)
+++ 
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/Message/QpidTextMessageFactory.cs
 Tue Nov 28 21:51:43 2006
@@ -21,40 +21,55 @@
 using System;
 using System.Collections;
 using Qpid.Framing;
+using Qpid.Buffer;
 
 namespace Qpid.Client.Message
 {
     public class QpidTextMessageFactory : AbstractQmsMessageFactory
     {
-        protected override AbstractQmsMessage CreateMessageWithBody(ulong 
messageNbr, ContentHeaderBody contentHeader,
-                                                                    IList 
bodies)
-        {
-            byte[] data;
 
-            // we optimise the non-fragmented case to avoid copying
-            if (bodies != null && bodies.Count == 1)
-            {
-                data = ((ContentBody)bodies[0]).Payload;
-            }
-            else
-            {
-                data = new byte[(int)contentHeader.BodySize];
-                int currentPosition = 0;
-                foreach (ContentBody cb in bodies)
-                {                
-                    Array.Copy(cb.Payload, 0, data, currentPosition, 
cb.Payload.Length);
-                    currentPosition += cb.Payload.Length;
-                }
-            }
+    //    protected override AbstractQmsMessage CreateMessageWithBody(long 
messageNbr, ContentHeaderBody contentHeader,
+    //                                                                IList 
bodies)
+    //    {
+    //        byte[] data;
 
-            return new QpidTextMessage(messageNbr, data, 
(BasicContentHeaderProperties)contentHeader.Properties);
-        }
+    //        // we optimise the non-fragmented case to avoid copying
+    //        if (bodies != null && bodies.Count == 1)
+    //        {
+    //            data = ((ContentBody)bodies[0]).Payload;
+    //        }
+    //        else
+    //        {
+    //            data = new byte[(int)contentHeader.BodySize];
+    //            int currentPosition = 0;
+    //            foreach (ContentBody cb in bodies)
+    //            {                
+    //                Array.Copy(cb.Payload, 0, data, currentPosition, 
cb.Payload.Length);
+    //                currentPosition += cb.Payload.Length;
+    //            }
+    //        }
+
+    //        return new QpidTextMessage(messageNbr, data, 
(BasicContentHeaderProperties)contentHeader.Properties);
+    //    }
         
      
+    //    public override AbstractQmsMessage CreateMessage()
+    //    {
+    //        return new QpidTextMessage();
+    //    }      
+
+
+        
         public override AbstractQmsMessage CreateMessage()
         {
             return new QpidTextMessage();
-        }      
+        }
+
+        protected override AbstractQmsMessage CreateMessage(long deliveryTag, 
ByteBuffer data, ContentHeaderBody contentHeader)
+        {
+            return new QpidTextMessage(deliveryTag, 
(BasicContentHeaderProperties) contentHeader.Properties, data);
+        }
+
     }
 }
 


Reply via email to