Still no success on this one...
I was able to create the following small testcase.
First, make sure DLQ are enabled in the activemq.xml configuration
file:
<policyEntry queue=">" memoryLimit="5mb">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ."
useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
(I use individual DLQ's only for simplicity, same results with a
shared
DLQ)
When I run the test with CREATE_CONSUMER = false, a message is
placed on
the queue MY.QUEUE and it stays there forever.
When I run the test with CREATE_CONSUMER = true, the expired message
is
moved to the DLQ.MY.QUEUE the moment the consumer is created (not
when
the message actually expires)
Also when you browse the queue MY.QUEUE when it contains an expired
message (e.g. via http://localhost:8161/admin/queues.jsp), the
expired
message is moved to the DLQ the moment the queue is browsed. I
already
tried to create a plugin for activemq which iterates each second over
all messages in all queues to force the expiration of messages. No
success on this one either. It works when there are no consumers at
all
listening on the queue, the moment, another consumer is listening to
the
queue, the expired messages will stay on the queue.
Is there anyone who has suggestions what to try next?
import java.util.Properties;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
public class TestCase {
public final static String INITIAL_CONTEXT_FACTORY =
"org.apache.activemq.jndi.ActiveMQInitialContextFactory";
public final static String PROVIDER_URL =
"tcp://localhost:61616";
public static final String CONNECTION_FACTORY =
"ConnectionFactory";
public final static String QUEUE_NAME = "MY.QUEUE";
public final static int MESSAGE_TTL = 1000; // milliseconds
public final static boolean CREATE_CONSUMER = false;
public static void main(String[] args) {
QueueConnection queueConnection = null;
QueueSession queueSession = null;
Queue queue = null;
try {
QueueConnectionFactory queueConnectionFactory =
(QueueConnectionFactory) jndiLookup(CONNECTION_FACTORY);
queueConnection =
queueConnectionFactory.createQueueConnection();
queueConnection.start();
queueSession =
queueConnection.createQueueSession(false,
DeliveryMode.NON_PERSISTENT);
queue = queueSession.createQueue(QUEUE_NAME);
// Send a message
System.out.println("Creating sender to " +
queue.getQueueName());
QueueSender queueSender =
queueSession.createSender(queue);
Message message =
queueSession.createTextMessage("This is a test");
message.setJMSCorrelationID("CID:1");
queueSender.send(message,
DeliveryMode.PERSISTENT, 5, MESSAGE_TTL);
System.out.println("Message sent to " +
queue.getQueueName());
if (CREATE_CONSUMER) {
// Wait until message is expired
Thread.sleep(MESSAGE_TTL + 100);
// Receive that message --> this should
hang the process since
// the message will be expired at this
moment.
// Check out the queue
(http://localhost:8161/admin/queues.jsp)
System.out.println("Creating receiver on
"
+ queue.getQueueName());
System.out
.println("It will hang
because no message will be available");
MessageConsumer consumer =
queueSession.createConsumer(queue);
Message received = consumer.receive();
if (received != null) {
System.out.println("Received
message unexpectedly!!");
}
} else {
// When no consumer is defined, the
expired message will stay
// forever on the queue instead of being
moved to the DLQ
}
} catch (Exception e) {
System.err.println("Problem occurred: " +
e.getMessage());
e.printStackTrace();
} finally {
if (queueConnection != null) {
try {
queueConnection.stop();
queueConnection.close();
} catch (JMSException e) {
}
}
if (queueSession != null) {
try {
queueSession.close();
} catch (JMSException e) {
}
}
}
}
public static Object jndiLookup(String name) throws
NamingException {
Context ctxt = getJndiContext();
return ctxt.lookup(name);
}
public static Context getJndiContext() throws NamingException {
Context jndiContext = null;
Properties props = new Properties();
props.setProperty(Context.INITIAL_CONTEXT_FACTORY,
INITIAL_CONTEXT_FACTORY);
props.setProperty(Context.PROVIDER_URL, PROVIDER_URL);
jndiContext = new InitialContext(props);
return jndiContext;
}
}
-----Original Message-----
From: Rob Davies [mailto:[EMAIL PROTECTED]
Sent: donderdag 15 mei 2008 19:02
To: users@activemq.apache.org
Subject: Re: Expired messages
will look into it!
cheers,
Rob
http://open.iona.com/products/enterprise-activemq
http://rajdavies.blogspot.com/
On 15 May 2008, at 13:04, Steven Van Loon wrote:
Thanks for the reply Rob.
The DLQ seems to be the queue to look for indeed, but it seems that
the
expired messages are not send to this queue the moment they actually
expire.
I have created the following test scenario:
- I create a receiver for queue://ActiveMQ.DLQ and start it up
- I create a receiver for queue://MY.QUEUE.C which leaves 1000 ms
between two receive's (Thread.sleep(1000) after 1 message reception)
- I create a producer for queue://MY.QUEUE.C with a TTL = 100 ms
which
will send 10 messages
- I start the receiver for queue://MY.QUEUE.C
- I start the producer for queue://MY.QUEUE.C
After running this, I see this:
ActiveMQ.DLQ
Number Of Pending Messages : 0
Number Of Consumers: 1
Messages Sent: 0
Messages Received: 0
MY.QUEUE.C
Number Of Pending Messages : 9
Number Of Consumers: 1
Messages Sent: 10
Messages Received: 1
queue://MY.QUEUE.C contains 9 expired messages after execution, DLQ
has
received nothing.
When I run my test an second time, I get the following stats:
ActiveMQ.DLQ
Number Of Pending Messages : 0
Number Of Consumers: 1
Messages Sent: 9
Messages Received: 9
MY.QUEUE.C
Number Of Pending Messages : 9
Number Of Consumers: 1
Messages Sent: 20
Messages Received: 2
Which means the DLQ consumer has received the expired messages of my
first run and the expired message of the second run are not sent to
the
DLQ. (confirmed by my logging).
Now, how can this be avoided? I want the expired messages to be sent
immediately to the DLQ.
Anybody any ideas?
Thanks!
Steven.
-----Original Message-----
From: Rob Davies [mailto:[EMAIL PROTECTED]
Sent: maandag 12 mei 2008 9:21
To: users@activemq.apache.org
Subject: Re: Expired messages
On 9 May 2008, at 10:35, Steven Van Loon wrote:
Hi,
Does anybody knows whether and how it is possible to act on expired
messages?
Thanks!
Steven.
Expired messages have taken too long to deliver to a consumer - they
are sent to a dead letter queue - see
http://activemq.apache.org/message-redelivery-and-dlq-handling.html
or you can listen for advisories - see
http://activemq.apache.org/advisory-message.html
cheers,
Rob
http://open.iona.com/ -Enterprise Open Integration
http://rajdavies.blogspot.com/