Hello All,
I have just started to do some tests with Apache Qpid and i have run into a
problem with maximum number of messages, which can be received by the
subscriber.
I am using a C++ broker under Windows 7 and C# client libraries in my test
application. If I am sending small amount of messages, everything is working
fine, but when i tried to increase the number of messages to more than 9999, I
receive only 9999 messages and no more. After the consumer is restarted I
receive another 9999 messages and it stops receiving again. Can someone tell
me, what I am doing wrong? It seems that there is some limit, but I can not
find where. I have found properties like qpid.max_size and qpid.max_count,
which can be set on QueueDeclare method, but whether I put there some big
number or not, the behavior is still same, only 9999 messages at maximum are
received. However if I set qpid.max_count=10, it behaves correctly, only 10
messages are queued and next messages are rejected. So it seems, that this
parameter does not influence the strange behavior.
Here is the main part of implementation of my publisher:
public void Start()
{
session = connection.CreateSession(500000);
session.ExchangeDeclare(configuration.ExchangeName,
configuration.ExchangeType, String.Empty, null, Option.DURABLE);
}
public void SendMessage(string routingKey, byte[] message)
{
IMessage messageToSent = new Message();
messageToSent.DeliveryProperties.SetDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageToSent.DeliveryProperties.SetRoutingKey(routingKey);
messageToSent.AppendData(message);
session.MessageTransfer(configuration.ExchangeName, messageToSent);
}
public void Stop()
{
session.Close();
}
And here is the main part of implementation of my subsriber:
public void Start()
{
session = connection.CreateSession(50000);
session.ExchangeDeclare(configuration.ExchangeName,
configuration.ExchangeType, String.Empty, null , Option.DURABLE);
session.QueueDeclare(configuration.QueueName, String.Empty, new
Dictionary<string, object>() {{ "qpid.policy_type", "reject" }},
Option.DURABLE);
session.ExchangeBind(configuration.QueueName,
configuration.ExchangeName, configuration.RoutingKey);
session.AttachMessageListener(this, configuration.QueueName);
session.MessageSubscribe(configuration.QueueName);
}
public void Stop()
{
session.Close();
}
#region Implementation of IMessageListener
public void MessageTransfer(IMessage message)
{
BinaryReader reader = new BinaryReader(message.Body, Encoding.UTF8);
byte[] messageBody = new byte[message.Body.Length -
message.Body.Position];
reader.Read(messageBody, 0, messageBody.Length);
Console.WriteLine(message.Id);
session.MessageAccept(new RangeSet() { message.Id });
if (OnMessageReceived != null)
OnMessageReceived(identifier, messageBody);
}
I would be very glad if someone can point me to right direction how to remove
the 9999 message limit.
Thanks a lot in advance.
Kind regards,
Petr
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]