I'm getting following exception on broker side (5.3 and 5.1):
2009-11-11 21:12:25 org.apache.activemq.broker.TransportConnection serviceException WARNING: Async error occurred: javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck {commandId = 14, responseRequired = false, ackType = 0, consumerId = ID:MAIN-50081-1257970345670-0:0:1:1, firstMessageId = ID:MAIN-50081-1257970345670-0:0:3:1:1, lastMessageId = ID:MAIN-50081-1257970345670-0:0:3:1:1, destination = queue://unordered.ack.asynch.q, transactionId = null, messageCount = 1} javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck {commandId = 14, responseRequired = false, ackType = 0, consumerId = ID:MAIN-50081-1257970345670-0:0:1:1, firstMessageId = ID:MAIN-50081-1257970345670-0:0:3:1:1, lastMessageId = ID:MAIN-50081-1257970345670-0:0:3:1:1, destination = queue://unordered.ack.asynch.q, transactionId = null, messageCount = 1} at org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:315) at org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:369) at org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:470) at org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:194) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74) at org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:85) at org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:449) at org.apache.activemq.command.MessageAck.visit(MessageAck.java:205) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:297) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:175) at org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:68) at org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113) at org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:210) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:203) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:185) at java.lang.Thread.run(Thread.java:619) It happens when I have two or more asynch (onMessage) consumers on one queue using either CLIENT_ACKNOWLEDGE or INDVIDUAL_ACKNOWLEDGE mode and ACK-ing messages in "out of order" way. By "out of order" I mean calling message.acknowledge() inside onMessage() method of the second consumer first and next on the first one. In real world it could happen when the second consumer is much faster than the first one. I leaf through JMS specs and it seems that "out of order" ACK-ing is not forbidden. At least, I didn't found any statement which might prohibit this. I'm attaching sample client code which causes the exception on broker side. Is there anything wrong with my code or is it an AMQ bug? ==== client code === public class UnorderedAckAsynch { private final static String QUEUE_NAME = "unordered.ack.asynch.q"; private final static String SEQ_NUM_PROPERTY = "seqNum"; private final static int TOTAL_MESSAGES_CNT = 2; private final static int CONSUMERS_CNT = 2; private final static int MESSAGE_LENGTH_BYTES = 75000; private final static CountDownLatch LATCH = new CountDownLatch(TOTAL_MESSAGES_CNT); private static Connection connection; /** * AMQ 5.1 - SEVERE: Async error occurred: javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck * AMQ 5.3 - WARNING: Async error occurred: javax.jms.JMSException: Could not correlate acknowledgment with dispatched message: MessageAck * */ public static void main(String[] args) throws Exception { final ConnectionFactory fac = new ActiveMQConnectionFactory("tcp://0.0.0.0:61616"); List<Consumer> consumers = null; Session producerSession = null; try { connection = fac.createConnection(); connection.start(); consumers = new ArrayList<Consumer>(); for (int i = 0; i < CONSUMERS_CNT; i++) { consumers.add(new Consumer()); } producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); final MessageProducer producer = producerSession.createProducer(new ActiveMQQueue(QUEUE_NAME)); producer.setDeliveryMode(DeliveryMode.PERSISTENT); for (int i = 0; i < TOTAL_MESSAGES_CNT; i++) { final Message message = producerSession.createTextMessage(buildLongString()); message.setIntProperty(SEQ_NUM_PROPERTY, i); producer.send(message); } LATCH.await(); } finally { if (producerSession != null) producerSession.close(); if (consumers != null) { for (Consumer c : consumers) { c.close(); } } if (connection != null) connection.close(); } } private static String buildLongString() { final StringBuilder stringBuilder = new StringBuilder(MESSAGE_LENGTH_BYTES); for (int i = 0; i < MESSAGE_LENGTH_BYTES; ++i) { stringBuilder.append((int) (Math.random() * 10)); } return stringBuilder.toString(); } private final static class Consumer implements MessageListener { final Session session; private static final AtomicInteger nextExpectedSeqNum = new AtomicInteger(); private Consumer() { try { session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); //session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); final Queue queue = session.createQueue(QUEUE_NAME+ "?consumer.prefetchSize=1"); final MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(this); } catch (JMSException e) { e.printStackTrace(); throw new RuntimeException(e); } } @Override public void onMessage(Message message) { try { final int seqNum = message.getIntProperty(SEQ_NUM_PROPERTY); if (seqNum == nextExpectedSeqNum.getAndIncrement()) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } message.acknowledge(); } catch (JMSException e) { e.printStackTrace(); throw new RuntimeException(e); } finally { LATCH.countDown(); } } private void close() { if (session != null) { try { session.close(); } catch (JMSException e) { e.printStackTrace(); throw new RuntimeException(e); } } } } } ====== END ==== -- View this message in context: http://old.nabble.com/%22Could-not-correlate-acknowledgment-with-dispatched-message%22-tp26308220p26308220.html Sent from the ActiveMQ - User mailing list archive at Nabble.com.