Hello!
I'm using c# client to connect to a qpid c++ broker for publishing and
receiving messages on a topic (amq.topic).
Unfortunately, both publisher and listener sessions/connections are dropped
after 3-5 minutes. No exceptions or events are generated by the c# client,
so I wonder how to handle this situation: I want the listener to be
connected forever to its topic. I've tried to change the connection timeout,
but with no success.
I created a simple publisher and a listener for reproducing the problem. The
first c# console application publishes the lines read from the console,
while the second application reads from the topic.
After a few minutes the two parts are not connected anymore and using the
'list queue' command in the qpid-tool utility I see that the queue has been
destroyed.
Any help is appreciated,
bye Fcolle
/*********************************************************************************/
//-----------------------------------------------
// PUBLISHER CODE
//-----------------------------------------------
using System;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;
namespace Publisher
{
///
/// This program is one of two programs designed to be used
/// together. These programs use the topic exchange.
///
/// Publisher:
///
/// Publishes to a broker, specifying a routing key.
///
/// Listener (this program):
///
/// Reads from a queue on the broker using a message listener.
///
///
class Listener
{
public static int _count = 1;
private static void Main(string[] args)
{
string host = PublisherSettings.Default.Host;
int port = PublisherSettings.Default.Port;
string virtualhost = PublisherSettings.Default.VirtualHost;
string username = PublisherSettings.Default.Username;
string password = PublisherSettings.Default.Password;
Client connection = new Client();
try
{
connection.Connect(host, port, virtualhost, username,
password);
IClientSession session = connection.CreateSession(50000);
publishMessages(session, "myqueue.1");
noMoreMessages(session);
connection.Close();
}
catch (Exception e)
{
Console.WriteLine("Error: \n" + e.StackTrace);
}
}
private static void publishMessages(IClientSession session, string
routing_key)
{
string read = "";
do
{
IMessage message = new Message();
read = Console.ReadLine();
message.ClearData();
message.AppendData(Encoding.UTF8.GetBytes(read));
session.MessageTransfer("amq.topic", routing_key, message);
} while (read != "End");
}
private static void noMoreMessages(IClientSession session)
{
IMessage message = new Message();
// And send a syncrhonous final message to indicate termination.
message.ClearData();
message.AppendData(Encoding.UTF8.GetBytes("End"));
session.MessageTransfer("amq.topic", "control", message);
session.Sync();
}
}
}
/*********************************************************************************/
//-----------------------------------------------
// LISTENER CODE
//-----------------------------------------------
using System;
using System.Configuration;
using System.IO;
using System.Text;
using System.Threading;
using org.apache.qpid.client;
using org.apache.qpid.transport;
namespace Subscriber
{
///
/// This program is one of two programs designed to be used
/// together. These programs use the topic exchange.
///
/// Publisher:
///
/// Publishes to a broker, specifying a routing key.
///
/// Listener (this program):
///
/// Reads from a queue on the broker using a message listener.
///
///
class Listener
{
public static int _count = 1;
private static void Main(string[] args)
{
string host = Settings1.Default.Host;
int port = Settings1.Default.Port;
string virtualhost = Settings1.Default.VirtualHost;
string username = Settings1.Default.Username;
string password = Settings1.Default.Password;
Client connection = new Client();
try
{
connection.Connect(host, port, virtualhost, username,
password);
IClientSession session = connection.CreateSession(50000);
//--------- Main body of program
--------------------------------------------
lock (session)
{
Console.WriteLine("Listening for messages ...");
// Create a listener
prepareQueue("myqueue", "myqueue.#", session);
while (_count > 0)
{
Monitor.Wait(session);
}
}
//---------------------------------------------------------------------------
connection.Close();
}
catch (Exception e)
{
Console.WriteLine("Error: \n" + e.StackTrace);
}
}
private static void prepareQueue(string queue, string routing_key,
IClientSession session)
{
// Create a unique queue name for this consumer by concatenating
// the queue name parameter with the Session ID.
Console.WriteLine("Declaring queue: " + queue);
session.QueueDeclare(queue, Option.EXCLUSIVE,
Option.AUTO_DELETE);
// Route messages to the new queue if they match the routing key.
// Also route any messages to with the "control" routing key to
// this queue so we know when it's time to stop. A publisher
sends
// a message with the content "That's all, Folks!", using the
// "control" routing key, when it is finished.
session.ExchangeBind(queue, "amq.topic", routing_key);
session.ExchangeBind(queue, "amq.topic", "control");
// subscribe the listener to the queue
IMessageListener listener = new MessageListener(session);
session.AttachMessageListener(listener, queue);
session.MessageSubscribe(queue);
}
}
public class MessageListener : IMessageListener
{
private readonly IClientSession _session;
private readonly RangeSet _range = new RangeSet();
public MessageListener(IClientSession session)
{
_session = session;
}
public void MessageTransfer(IMessage m)
{
BinaryReader reader = new BinaryReader(m.Body, Encoding.UTF8);
byte[] body = new byte[m.Body.Length - m.Body.Position];
reader.Read(body, 0, body.Length);
ASCIIEncoding enc = new ASCIIEncoding();
string message = enc.GetString(body);
Console.WriteLine(DateTime.Now.ToString()+"\t-\tMessage:\t[" +
message + "]\tfrom: " + m.Destination);
// Add this message to the list of message to be acknowledged
_range.Add(m.Id);
if (message.Equals("End"))
{
Console.WriteLine("Shutting down listener for " +
m.DeliveryProperties.GetRoutingKey());
Listener._count--;
// Acknowledge all the received messages
_session.MessageAccept(_range);
lock (_session)
{
Monitor.Pulse(_session);
}
}
}
}
}
--
View this message in context:
http://apache-qpid-users.2158936.n2.nabble.com/c-topic-publisher-and-listener-sessions-dropped-tp6272182p6272182.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]