Author: steshaw
Date: Thu Nov 30 10:54:48 2006
New Revision: 481035
URL: http://svn.apache.org/viewvc?view=rev&rev=481035
Log:
QPID-136 Ported Prefetch with PrefetchHigh and PrefetchLow
QPID-137 Ported AcknowledgeModes
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
incubator/qpid/trunk/qpid/dotnet/TODO.txt
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingConsumer.cs
Thu Nov 30 10:54:48 2006
@@ -58,8 +58,9 @@
_channel.Bind(queueName, _serviceName, null,
CreatePatternAsFieldTable());
IMessageConsumer consumer =
_channel.CreateConsumerBuilder(queueName)
- .withPrefetch(100)
- .withNoLocal(true)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(500)
+ .WithNoLocal(true)
.Create();
consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/HeadersExchange/HeadersMatchingProducer.cs
Thu Nov 30 10:54:48 2006
@@ -43,8 +43,8 @@
try
{
_publisher = _channel.CreatePublisherBuilder()
- .withExchangeName(_commandExchangeName)
- .withMandatory(true)
+ .WithExchangeName(_commandExchangeName)
+ .WithMandatory(true)
.Create();
// Disabling timestamps - a performance optimisation where
timestamps and TTL/expiration
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/MultiConsumer/ProducerMultiConsumer.cs
Thu Nov 30 10:54:48 2006
@@ -70,8 +70,8 @@
{
base.Init();
_publisher = _channel.CreatePublisherBuilder()
- .withRoutingKey(_commandQueueName)
- .withExchangeName(ExchangeNameDefaults.TOPIC)
+ .WithRoutingKey(_commandQueueName)
+ .WithExchangeName(ExchangeNameDefaults.TOPIC)
.Create();
_publisher.DisableMessageTimestamp = true;
@@ -85,7 +85,7 @@
_channel.Bind(queueName, ExchangeNameDefaults.TOPIC,
_commandQueueName);
_consumers[i] = _channel.CreateConsumerBuilder(queueName)
- .withPrefetch(100).Create();
+ .WithPrefetchLow(100).Create();
_consumers[i].OnMessage = new
MessageReceivedDelegate(OnMessage);
}
_connection.Start();
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTest.cs
Thu Nov 30 10:54:48 2006
@@ -78,8 +78,8 @@
// _publisher = _channel.CreatePublisher(exchangeName,
exchangeClass, routingKey);
_publisher = _channel.CreatePublisherBuilder()
- .withRoutingKey(routingKey)
- .withExchangeName(exchangeName)
+ .WithRoutingKey(routingKey)
+ .WithExchangeName(exchangeName)
.Create();
_publisher.Send(msg);
}
@@ -206,8 +206,8 @@
//return _channel.CreatePublisher(exchangeName, exchangeClass,
routingKey);
return _session.CreatePublisherBuilder()
- .withExchangeName(exchangeName)
- .withRoutingKey(routingKey)
+ .WithExchangeName(exchangeName)
+ .WithRoutingKey(routingKey)
.Create();
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/failover/FailoverTxTest.cs
Thu Nov 30 10:54:48 2006
@@ -36,8 +36,11 @@
const int NUM_ITERATIONS = 10;
const int NUM_COMMITED_MESSAGES = 10;
const int NUM_ROLLEDBACK_MESSAGES = 3;
- const int SLEEP_MILLIS = 500;
+ const int SLEEP_MILLIS = 50;
+ // AutoAcknowledge, ClientAcknowledge, DupsOkAcknowledge,
NoAcknowledge, PreAcknowledge
+ AcknowledgeMode _acknowledgeMode = AcknowledgeMode.DupsOkAcknowledge;
+ const bool _noWait = true; // use Receive or ReceiveNoWait
AMQConnection _connection;
public void OnMessage(IMessage message)
@@ -45,6 +48,11 @@
try
{
_log.Info("Received: " + ((ITextMessage) message).Text);
+ if (_acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
+ {
+ _log.Info("client acknowledging");
+ message.Acknowledge();
+ }
}
catch (QpidException e)
{
@@ -56,11 +64,13 @@
{
FailoverTxTest _failoverTxTest;
IMessageConsumer _consumer;
+ private bool _noWait;
- internal NoWaitConsumer(FailoverTxTest failoverTxTest,
IMessageConsumer channel)
+ internal NoWaitConsumer(FailoverTxTest failoverTxTest,
IMessageConsumer channel, bool noWait)
{
_failoverTxTest = failoverTxTest;
_consumer = channel;
+ _noWait = noWait;
}
internal void Run()
@@ -68,7 +78,9 @@
int messages = 0;
while (messages < NUM_COMMITED_MESSAGES)
{
- IMessage msg = _consumer.ReceiveNoWait();
+ IMessage msg;
+ if (_noWait) msg = _consumer.ReceiveNoWait();
+ else msg = _consumer.Receive();
if (msg != null)
{
_log.Info("NoWait received message");
@@ -93,7 +105,8 @@
_log.Info("connectionInfo = " + connectionInfo);
_log.Info("connection.asUrl = " + _connection.toURL());
- IChannel receivingChannel = _connection.CreateChannel(false,
AcknowledgeMode.AutoAcknowledge);
+ _log.Info("AcknowledgeMode is " + _acknowledgeMode);
+ IChannel receivingChannel = _connection.CreateChannel(false,
_acknowledgeMode);
string queueName = receivingChannel.GenerateUniqueName();
@@ -103,17 +116,17 @@
// No need to call Queue.Bind as automatically bound to default
direct exchange.
receivingChannel.Bind(queueName, "amq.direct", queueName);
-
- IMessageConsumer consumer =
receivingChannel.CreateConsumerBuilder(queueName).Create();
- bool useThread = true;
+ IMessageConsumer consumer =
receivingChannel.CreateConsumerBuilder(queueName)
+ .WithPrefetchLow(30)
+ .WithPrefetchHigh(60).Create();
+ bool useThread = false;
if (useThread)
{
- NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this,
consumer);
+ NoWaitConsumer noWaitConsumer = new NoWaitConsumer(this,
consumer, _noWait);
new Thread(noWaitConsumer.Run).Start();
}
else
{
-
//receivingChannel.CreateConsumerBuilder(queueName).Create().OnMessage = new
MessageReceivedDelegate(onMessage);
consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
}
@@ -133,7 +146,7 @@
bool transacted = true;
IChannel publishingChannel = _connection.CreateChannel(transacted,
AcknowledgeMode.NoAcknowledge);
IMessagePublisher publisher =
publishingChannel.CreatePublisherBuilder()
- .withRoutingKey(routingKey)
+ .WithRoutingKey(routingKey)
.Create();
for (int i = 1; i <= NUM_ITERATIONS; ++i)
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceProvidingClient.cs
Thu Nov 30 10:54:48 2006
@@ -60,8 +60,9 @@
_channel.DeclareQueue(_serviceName, false, false, false);
IMessageConsumer consumer =
_channel.CreateConsumerBuilder(_serviceName)
- .withPrefetch(100)
- .withNoLocal(true)
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(500)
+ .WithNoLocal(true)
.Create();
consumer.OnMessage = new MessageReceivedDelegate(OnMessage);
}
@@ -100,8 +101,8 @@
// Console.WriteLine("ReplyTo.RoutingKey = " +
_replyToRoutingKey);
_destinationPublisher = _channel.CreatePublisherBuilder()
- .withExchangeName(_replyToExchangeName)
- .withRoutingKey(_replyToRoutingKey)
+ .WithExchangeName(_replyToExchangeName)
+ .WithRoutingKey(_replyToRoutingKey)
.Create();
_destinationPublisher.DisableMessageTimestamp = true;
_destinationPublisher.DeliveryMode =
DeliveryMode.NonPersistent;
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/requestreply1/ServiceRequestingClient.cs
Thu Nov 30 10:54:48 2006
@@ -53,7 +53,7 @@
try
{
_publisher = _channel.CreatePublisherBuilder()
- .withRoutingKey(_commandQueueName)
+ .WithRoutingKey(_commandQueueName)
.Create();
_publisher.DisableMessageTimestamp = true; // XXX: need a
"with" for this in builder?
_publisher.DeliveryMode = DeliveryMode.NonPersistent; // XXX:
need a "with" for this in builder?
@@ -74,9 +74,10 @@
_channel.DeclareQueue(replyQueueName, false, true, true);
IMessageConsumer messageConsumer =
_channel.CreateConsumerBuilder(replyQueueName)
- .withPrefetch(100)
- .withNoLocal(true)
- .withExclusive(true).Create();
+ .WithPrefetchLow(100)
+ .WithPrefetchHigh(200)
+ .WithNoLocal(true)
+ .WithExclusive(true).Create();
_startTime = DateTime.Now.Ticks;
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Client.Tests/undeliverable/UndeliverableTest.cs
Thu Nov 30 10:54:48 2006
@@ -81,11 +81,11 @@
// Send a test message to a non-existant queue on the default
exchange. See if message is returned!
MessagePublisherBuilder builder = _channel.CreatePublisherBuilder()
- .withRoutingKey("Non-existant route key!")
- .withMandatory(true);
+ .WithRoutingKey("Non-existant route key!")
+ .WithMandatory(true);
if (exchangeName != null)
{
- builder.withExchangeName(exchangeName);
+ builder.WithExchangeName(exchangeName);
}
IMessagePublisher publisher = builder.Create();
publisher.Send(_channel.CreateTextMessage("Hiya!"));
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/AmqChannel.cs Thu Nov
30 10:54:48 2006
@@ -43,7 +43,7 @@
// Used in the consume method. We generate the consume tag on the
client so that we can use the nowait feature.
private int _nextConsumerNumber = 1;
- internal const int DEFAULT_PREFETCH = 5000;
+ internal const int DEFAULT_PREFETCH =
MessageConsumerBuilder.DEFAULT_PREFETCH_HIGH;
private AMQConnection _connection;
@@ -273,6 +273,7 @@
public void Commit()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a
transacted session
@@ -297,6 +298,7 @@
public void Rollback()
{
+ // FIXME: Fail over safety. Needs FailoverSupport?
CheckNotClosed();
CheckTransacted(); // throws IllegalOperationException if not a
transacted session
@@ -489,25 +491,26 @@
}
public IMessageConsumer CreateConsumer(string queueName,
- int prefetch,
+ int prefetchLow,
+ int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
string subscriptionName)
{
- _logger.Debug(String.Format("CreateConsumer queueName={0}
prefetch={1} noLocal={2} exclusive={3} durable={4} subscriptionName={5}",
- queueName, prefetch, noLocal, exclusive,
durable, subscriptionName));
- return CreateConsumerImpl(queueName, prefetch, noLocal, exclusive,
durable, subscriptionName);
+ _logger.Debug(String.Format("CreateConsumer queueName={0}
prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} durable={5}
subscriptionName={6}",
+ queueName, prefetchLow, prefetchHigh,
noLocal, exclusive, durable, subscriptionName));
+ return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh,
noLocal, exclusive, durable, subscriptionName);
}
private IMessageConsumer CreateConsumerImpl(string queueName,
- int prefetch,
- bool noLocal,
- bool exclusive,
- bool durable,
- string subscriptionName)
+ int prefetchLow,
+ int prefetchHigh,
+ bool noLocal,
+ bool exclusive,
+ bool durable,
+ string subscriptionName)
{
-
if (durable || subscriptionName != null)
{
throw new NotImplementedException(); // TODO: durable
subscriptions.
@@ -518,7 +521,8 @@
CheckNotClosed();
BasicMessageConsumer consumer = new
BasicMessageConsumer(_channelId, queueName, noLocal,
-
_messageFactoryRegistry, this);
+
_messageFactoryRegistry, this,
+
prefetchHigh, prefetchLow, exclusive);
try
{
RegisterConsumer(consumer);
@@ -710,9 +714,8 @@
/// <param name="consumer"></param>
void RegisterConsumer(BasicMessageConsumer consumer)
{
- String consumerTag = ConsumeFromQueue(consumer.QueueName,
consumer.Prefetch, consumer.NoLocal,
+ String consumerTag = ConsumeFromQueue(consumer.QueueName,
consumer.NoLocal,
consumer.Exclusive,
consumer.AcknowledgeMode);
-
consumer.ConsumerTag = consumerTag;
_consumers.Add(consumerTag, consumer);
}
@@ -744,8 +747,7 @@
}
}
- private String ConsumeFromQueue(String queueName, int prefetch,
- bool noLocal, bool exclusive,
AcknowledgeMode acknowledgeMode)
+ private String ConsumeFromQueue(String queueName, bool noLocal, bool
exclusive, AcknowledgeMode acknowledgeMode)
{
// Need to generate a consumer tag on the client so we can exploit
the nowait flag.
String tag = string.Format("{0}-{1}", _sessionNumber,
_nextConsumerNumber++);
@@ -973,7 +975,6 @@
public void AcknowledgeMessage(ulong deliveryTag, bool multiple)
{
AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId,
deliveryTag, multiple);
- _logger.Info("XXX sending ack: " + ackFrame);
if (_logger.IsDebugEnabled)
{
_logger.Debug("Sending ack for delivery tag " + deliveryTag +
" on channel " + _channelId);
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Client/Client/BasicMessageConsumer.cs
Thu Nov 30 10:54:48 2006
@@ -82,10 +82,15 @@
/// </summary>
private readonly object _syncLock = new object();
- /**
- * We store the prefetch field in order to be able to reuse it when
resubscribing in the event of failover
- */
- private int _prefetch;
+ /// <summary>
+ /// We store the high water prefetch field in order to be able to
reuse it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchHigh;
+
+ /// <summary>
+ /// We store the low water prefetch field in order to be able to reuse
it when resubscribing in the event of failover
+ /// </summary>
+ private int _prefetchLow;
/// <summary>
/// When true indicates that either a message listener is set or that
@@ -108,8 +113,20 @@
/// </summary>
private long _lastDeliveryTag;
- public BasicMessageConsumer(ushort channelId, string queueName, bool
noLocal,
- MessageFactoryRegistry messageFactory,
AmqChannel channel)
+ /// <summary>
+ /// Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
+ /// </summary>
+ private int _outstanding;
+
+ /// <summary>
+ /// Switch to enable sending of acknowledgements when using
DUPS_OK_ACKNOWLEDGE mode.
+ /// Enabled when _outstannding number of msgs >= _prefetchHigh and
disabled at < _prefetchLow
+ /// </summary>
+ private bool _dups_ok_acknowledge_send;
+
+ internal BasicMessageConsumer(ushort channelId, string queueName, bool
noLocal,
+ MessageFactoryRegistry messageFactory,
AmqChannel channel,
+ int prefetchHigh, int prefetchLow, bool
exclusive)
{
_channelId = channelId;
_queueName = queueName;
@@ -117,6 +134,9 @@
_messageFactory = messageFactory;
_channel = channel;
_acknowledgeMode = _channel.AcknowledgeMode;
+ _prefetchHigh = prefetchHigh;
+ _prefetchLow = prefetchLow;
+ _exclusive = exclusive;
}
#region IMessageConsumer Members
@@ -302,65 +322,6 @@
}
}
-// /// <summary>
-// /// Called from the AmqChannel when a message has arrived for this
consumer. This methods handles both the case
-// /// of a message listener or a synchronous receive() caller.
-// /// </summary>
-// /// <param name="messageFrame">the raw unprocessed mesage</param>
-// /// <param name="acknowledgeMode">the acknowledge mode requested for
this message</param>
-// /// <param name="channelId">channel on which this message was
sent</param>
-// internal void NotifyMessage(UnprocessedMessage messageFrame,
AcknowledgeMode acknowledgeMode, ushort channelId)
-// {
-// _logger.Info("XXX notifyMessage called with message number " +
messageFrame.DeliverBody.DeliveryTag);
-// if (_logger.IsDebugEnabled)
-// {
-// _logger.Debug("notifyMessage called with message number " +
messageFrame.DeliverBody.DeliveryTag);
-// }
-// try
-// {
-// AbstractQmsMessage jmsMessage =
_messageFactory.CreateMessage((long)messageFrame.DeliverBody.DeliveryTag,
-//
messageFrame.DeliverBody.Redelivered,
-//
messageFrame.ContentHeader,
-//
messageFrame.Bodies);
-
-// /*if (acknowledgeMode == AcknowledgeMode.PreAcknowledge)
-// {
-//
_channel.sendAcknowledgement(messageFrame.deliverBody.deliveryTag);
-// }*/
-// if (acknowledgeMode == AcknowledgeMode.ClientAcknowledge)
-// {
-// // we set the session so that when the user calls
acknowledge() it can call the method on session
-// // to send out the appropriate frame
-// jmsMessage.Channel = _channel;
-// }
-
-// lock (_syncLock)
-// {
-// if (_messageListener != null)
-// {
-//#if __MonoCS__
-// _messageListener(jmsMessage);
-//#else
-// _messageListener.Invoke(jmsMessage);
-//#endif
-// }
-// else
-// {
-// _synchronousQueue.Enqueue(jmsMessage);
-// }
-// }
-// if (acknowledgeMode == AcknowledgeMode.AutoAcknowledge)
-// {
-//
_channel.SendAcknowledgement(messageFrame.DeliverBody.DeliveryTag);
-// }
-// }
-// catch (Exception e)
-// {
-// _logger.Error("Caught exception (dump follows) -
ignoring...", e);
-// }
-// }
-
-
/**
* Called from the AMQSession when a message has arrived for this
consumer. This methods handles both the case
* of a message listener or a synchronous receive() caller.
@@ -465,11 +426,6 @@
DeregisterConsumer();
}
- public int Prefetch
- {
- get { return _prefetch; }
- }
-
public string QueueName
{
get { return _queueName; }
@@ -509,7 +465,6 @@
AbstractQmsMessage msg = (AbstractQmsMessage) m;
switch (AcknowledgeMode)
{
-/* TODO
case AcknowledgeMode.DupsOkAcknowledge:
if (++_outstanding >= _prefetchHigh)
{
@@ -519,16 +474,16 @@
{
_dups_ok_acknowledge_send = false;
}
-
if (_dups_ok_acknowledge_send)
{
- _channel.AcknowledgeMessage(msg.getDeliveryTag(),
true);
+ _channel.AcknowledgeMessage((ulong)msg.DeliveryTag,
true);
}
break;
- */
+
case AcknowledgeMode.AutoAcknowledge:
_channel.AcknowledgeMessage((ulong)msg.DeliveryTag, true);
break;
+
case AcknowledgeMode.SessionTransacted:
_lastDeliveryTag = msg.DeliveryTag;
break;
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/BlockingQueue.cs
Thu Nov 30 10:54:48 2006
@@ -24,7 +24,7 @@
namespace Qpid.Collections
{
public abstract class BlockingQueue : Queue
- {
+ {
/**
* Inserts the specified element into this queue if it is possible to
do
* so immediately without violating capacity restrictions, returning
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
---
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
(original)
+++
incubator/qpid/trunk/qpid/dotnet/Qpid.Common/Collections/SynchronousQueue.cs
Thu Nov 30 10:54:48 2006
@@ -373,4 +373,3 @@
}
}
}
-
Modified: incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs (original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/IChannel.cs Thu Nov 30
10:54:48 2006
@@ -60,7 +60,8 @@
MessageConsumerBuilder CreateConsumerBuilder(string queueName);
IMessageConsumer CreateConsumer(string queueName,
- int prefetch,
+ int prefetchLow,
+ int prefetchHigh,
bool noLocal,
bool exclusive,
bool durable,
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessageConsumerBuilder.cs
Thu Nov 30 10:54:48 2006
@@ -22,13 +22,16 @@
{
public class MessageConsumerBuilder
{
- private int _prefetch = 0;
+ public const int DEFAULT_PREFETCH_HIGH = 5000;
+
private bool _noLocal = false;
private bool _exclusive = false;
private bool _durable = false;
private string _subscriptionName = null;
private IChannel _channel;
private readonly string _queueName;
+ private int _prefetchLow = 2500;
+ private int _prefetchHigh = DEFAULT_PREFETCH_HIGH;
public MessageConsumerBuilder(IChannel channel, string queueName)
{
@@ -36,31 +39,37 @@
_queueName = queueName;
}
- public MessageConsumerBuilder withPrefetch(int prefetch)
+ public MessageConsumerBuilder WithPrefetchLow(int prefetchLow)
+ {
+ _prefetchLow = prefetchLow;
+ return this;
+ }
+
+ public MessageConsumerBuilder WithPrefetchHigh(int prefetchHigh)
{
- _prefetch = prefetch;
+ _prefetchHigh = prefetchHigh;
return this;
}
- public MessageConsumerBuilder withNoLocal(bool noLocal)
+ public MessageConsumerBuilder WithNoLocal(bool noLocal)
{
_noLocal = noLocal;
return this;
}
- public MessageConsumerBuilder withExclusive(bool exclusive)
+ public MessageConsumerBuilder WithExclusive(bool exclusive)
{
_exclusive = exclusive;
return this;
}
- public MessageConsumerBuilder withDurable(bool durable)
+ public MessageConsumerBuilder WithDurable(bool durable)
{
_durable = durable;
return this;
}
- public MessageConsumerBuilder withSubscriptionName(string
subscriptionName)
+ public MessageConsumerBuilder WithSubscriptionName(string
subscriptionName)
{
_subscriptionName = subscriptionName;
return this;
@@ -68,7 +77,7 @@
public IMessageConsumer Create()
{
- return _channel.CreateConsumer(_queueName, _prefetch, _noLocal,
_exclusive, _durable, _subscriptionName);
+ return _channel.CreateConsumer(_queueName, _prefetchLow,
_prefetchHigh, _noLocal, _exclusive, _durable, _subscriptionName);
}
}
}
Modified:
incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
(original)
+++ incubator/qpid/trunk/qpid/dotnet/Qpid.Messaging/MessagePublisherBuilder.cs
Thu Nov 30 10:54:48 2006
@@ -47,37 +47,37 @@
_channel = channel;
}
- public MessagePublisherBuilder withRoutingKey(string routingKey)
+ public MessagePublisherBuilder WithRoutingKey(string routingKey)
{
_routingKey = routingKey;
return this;
}
- public MessagePublisherBuilder withExchangeName(string exchangeName)
+ public MessagePublisherBuilder WithExchangeName(string exchangeName)
{
_exchangeName = exchangeName;
return this;
}
- public MessagePublisherBuilder withDeliveryMode(DeliveryMode
deliveryMode)
+ public MessagePublisherBuilder WithDeliveryMode(DeliveryMode
deliveryMode)
{
_deliveryMode = deliveryMode;
return this;
}
- public MessagePublisherBuilder withTimeToLive(long timeToLive)
+ public MessagePublisherBuilder WithTimeToLive(long timeToLive)
{
_timeToLive = timeToLive;
return this;
}
- public MessagePublisherBuilder withImmediate(bool immediate)
+ public MessagePublisherBuilder WithImmediate(bool immediate)
{
_immediate = immediate;
return this;
}
- public MessagePublisherBuilder withMandatory(bool mandatory)
+ public MessagePublisherBuilder WithMandatory(bool mandatory)
{
_mandatory = mandatory;
return this;
Modified: incubator/qpid/trunk/qpid/dotnet/TODO.txt
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/dotnet/TODO.txt?view=diff&rev=481035&r1=481034&r2=481035
==============================================================================
--- incubator/qpid/trunk/qpid/dotnet/TODO.txt (original)
+++ incubator/qpid/trunk/qpid/dotnet/TODO.txt Thu Nov 30 10:54:48 2006
@@ -1,13 +1,4 @@
-https://issues.apache.org/jira/browse/QPID-136
-* createSession with prefetch (warning: prefetch partly added)
- * Do the BasicQos message after opening channel (sets up prefetch).
-
-https://issues.apache.org/jira/browse/QPID-137
-* .NET currently only supports no-ack mode. Allow acknowledgement support.
- * Implement the PreAcknowledge ack mode. Add preDeliver/postDeliver methods
in AmqSession like the Java client.
- * Implement Recover() with Basic.Recover.
-
* Port Connection URL support.
* Implement durable subscriptions.