Thanks for answering! The question regarded *Classic*, sorry for not mentioning.
Clebert: Test added at end. 1. AFAIU, useQueueForTopicMessages=true by default. 2. The question with rollback/"nacking": If 100 consumers do subscribe to the topic. One message is delivered, all get it. Then 50 of them eventually nack the delivery - will there come 50 DLQ entries for this message? Retrying: As far as I thought I knew, the retrying of ActiveMQ happens on the client. That is, the broker only delivers the message once to the client, and then it is the client that does the redeliveries. Once the client has exhausted the redelivery policy, it sends a "nack" to the server, which promptly executes the dead letter action. There is a "plugin" to enable broker-side redelivery, but I found this way less than ideal. If you keep the client side redelivery policy in place, you'll get 6 (standard) client side attempts, multiplied by whatever attempts you have on the broker. It felt very much like this was a tack-on solution, and that client side solution is how it really was designed. An aside: It took some time to understand this client-side logic, but finally understood that it is like this to keep message ordering. There is a method to disable this, but that doesn't work with exponential backoff combined with a string of poison message. I've created a bug, with a PR, for this: https://issues.apache.org/jira/browse/AMQ-8617 AFAIU this, the following code should receive the DLQ and exit? It does not. Where is my mistake? public class Test_TopicDLQ { private static final Logger log = LoggerFactory.getLogger(Test_TopicDLQ.class); private static final String DLQ_PREFIX = "DLQ."; @Test public void topicDlq() throws Exception { BrokerService broker = createInVmActiveMqBroker(); ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://" + broker.getBrokerName() + "?create=false"); Connection connection = connectionFactory.createConnection(); connection.start(); String topicName = "Test"; CountDownLatch latch = new CountDownLatch(1); Thread thread = new Thread(() -> { try { Session session = connection.createSession(true, Session.SESSION_TRANSACTED); Topic topic = session.createTopic(topicName); MessageConsumer consumer = session.createConsumer(topic); latch.countDown(); while (true) { log.info("Going into consume"); Message receive = consumer.receive(); log.info("Got message!\n ###" + receive); log.info("Rolling back!"); session.rollback(); } } catch (Throwable t) { log.error("Consumer threw!", t); } }, "Consumer"); thread.start(); latch.await(); log.info("Receiver reached consumer."); // :: Send message to topic log.info("Sending message to topic [" + topicName + "]"); Session sessionProduce = connection.createSession(true, Session.SESSION_TRANSACTED); Topic topic = sessionProduce.createTopic(topicName); MessageProducer producer = sessionProduce.createProducer(topic); TextMessage textMessage = sessionProduce.createTextMessage("Text!"); producer.send(textMessage); sessionProduce.commit(); sessionProduce.close(); // :: Wait for DLQ String dlqName = DLQ_PREFIX + topicName; log.info("Receiving DLQ from [" + dlqName + "]"); Session sessionDlqReceive = connection.createSession(true, Session.SESSION_TRANSACTED); Queue queue = sessionDlqReceive.createQueue(dlqName); MessageConsumer consumer = sessionDlqReceive.createConsumer(queue); Message receive = consumer.receive(); log.info("Got DLQ message!\n" + receive); connection.stop(); broker.stop(); } protected static BrokerService createInVmActiveMqBroker() { BrokerService broker = new BrokerService(); broker.setBrokerName("testbroker"); // :: Disable a bit of stuff for testing: // No need for persistence; No need for persistence across reboots, and don't want KahaDB dirs and files. broker.setPersistent(false); // :: Set Individual DLQ - which you most definitely should do in production. // Hear, hear: https://users.activemq.apache.narkive.com/H7400Mn1/policymap-api-is-really-bad IndividualDeadLetterStrategy individualDeadLetterStrategy = new IndividualDeadLetterStrategy(); individualDeadLetterStrategy.setQueuePrefix(DLQ_PREFIX); individualDeadLetterStrategy.setTopicPrefix(DLQ_PREFIX); individualDeadLetterStrategy.setProcessExpired(true); individualDeadLetterStrategy.setProcessNonPersistent(true); individualDeadLetterStrategy.setUseQueueForTopicMessages(true); // true is also default individualDeadLetterStrategy.setUseQueueForQueueMessages(true); // true is also default. // :: Create policy entry for TOPICS: PolicyEntry allTopicsPolicy = new PolicyEntry(); allTopicsPolicy.setDestination(new ActiveMQTopic(">")); // all topics allTopicsPolicy.setDeadLetterStrategy(individualDeadLetterStrategy); // .. create the PolicyMap containing the two destination policies PolicyMap policyMap = new PolicyMap(); policyMap.put(allTopicsPolicy.getDestination(), allTopicsPolicy); broker.setDestinationPolicy(policyMap); // :: Start the broker. try { broker.start(); } catch (Exception e) { throw new AssertionError("Could not start ActiveMQ BrokerService '" + broker.getBrokerName() + "'.", e); } return broker; } } On Thu, Oct 6, 2022 at 4:51 PM Matt Pavlovich <mattr...@gmail.com> wrote: > Hi Endre- > > 1. Yes, topics can support being DLQ’d to a queue. > > Set useQueueForTopicMessages=“true” in the deadLetterStrategy for the > destinationPolicy you are using. > > ref: https://activemq.apache.org/message-redelivery-and-dlq-handling < > https://activemq.apache.org/message-redelivery-and-dlq-handling> > > 2. When a consumer is subscribed to a topic there is an implied > ’subscription’ for each consumer. This subscription tracks the flow of > messages for each consumer. For a non-durable (default) topic subscription, > when a consumer rolls back messages the broker will attempt a redelivery > _back_ to the same consumer for _n_ number of attempts based on the > redelivery strategy. Once the max redelivery attempts is exhausted, the > _broker_ moves the messages to the DLQ and processes the next available. > > Since every consumer on a topic has their own subscription messages are > not redelivered to other consumers. > > ref: same link as above ;-) > > Pro-tip — consuming from topics can get.. weird. Especially, if you plan > on trying to cluster brokers and are managing expiration, DLQ, eviction, > slow consumers, etc. If this is a new message flow, consider using Virtual > Topics which allow pub-sub but the consumer apps read from queues instead > of topics. Queues are much easier for developer teams to rationalize all > the flows (error handling, etc) and Virtual Topics are easier for admins to > be explicit with the flow of data across multiple brokers— clustering > consumers vs replicate to another zone, etc. > > ref: https://activemq.apache.org/virtual-destinations > > Thanks, > Matt Pavlovich > > > On Oct 6, 2022, at 4:41 AM, Endre Stølsvik <en...@stolsvik.com> wrote: > > > > Hi! > > Is DLQ supported for plain Topics? I can't seem to get that to work. > > > > Also, I wonder what would happen if a topic was subscribed to by 100 > > consumers, and then 50 of them rolled back ("Nack'ed") the delivery? > > > > Thanks, > > Kind regards, > > Endre. > >