Greetings!

I have recently started enumerating messages in a queue from time to time, 
because we’re trying to figure out if a unit of work is still pending.  But I’m 
getting this error occasionally:

2023-07-27T21:38:30.007 [CanvasState-Abandoned] 
RpdmQueueUtils.browseStartMessages:159 [] ERROR - Error enumerating messages 
for pod 
{"message_type":"pod_specifier","pod_type":"project_execution","memory_mb":2048,"jvm_memory_mb":250}
 java.lang.RuntimeException: AMQ219023: The large message lost connection with 
its session, either because of a rollback or a closed session
at 
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:91)
at 
org.apache.activemq.artemis.jms.client.ActiveMQBytesMessage.readBytes(ActiveMQBytesMessage.java:217)
at 
net.redpoint.rpdm.services.RpdmQueueUtils.browseStartMessages(RpdmQueueUtils.java:155)
at 
net.redpoint.rpdm.services.RpdmQueueUtils.getAllQueuedCanvasIds(RpdmQueueUtils.java:76)
at 
net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl.doCanvasAbandoned(CanvasStateQueryServerImpl.java:282)
at 
net.redpoint.rpdm.canvas_state_query.CanvasStateQueryServerImpl$Abandoned.action(CanvasStateQueryServerImpl.java:265)
at 
net.redpoint.rpdm.services.LazyStartPeriodicThread.run(LazyStartPeriodicThread.java:91)
at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: ActiveMQIllegalStateException[errorType=ILLEGAL_STATE 
message=AMQ219023: The large message lost connection with its session, either 
because of a rollback or a closed session]
at 
org.apache.activemq.artemis.core.client.impl.LargeMessageControllerImpl.saveBuffer(LargeMessageControllerImpl.java:265)
at 
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.checkBuffer(ClientLargeMessageImpl.java:157)
at 
org.apache.activemq.artemis.core.client.impl.ClientLargeMessageImpl.getBodyBuffer(ClientLargeMessageImpl.java:89)


This happens in two parts using JMS.  The first enumerates the queue using 
browseMessages() to collect a List<Message>, and the second processes the 
Messages, reading their bodies and examining them.  The problem occurs on this 
line in browseStartMessages when we read the body bytes
      bm.readBytes(requestBytes);

Despite the error text, the session isn’t closed, or at least this error 
doesn’t always happen despite my not doing anything to re-create the session.  
I suspect that, given the ClientLargeMessageImpl.getBodyBuffer on the stack, 
something is going wrong that is specific to “large” messages.  IIRC the large 
messages use a coat-check pattern to store the message on disk and bypass the 
in-memory queue.

Can you suggest a solution?  I’ve though of two potential changes:
- Raise minLargeMessageSize to something big enough that our messages are never 
“large”
- Read the body bytes immediately, while enumerating the queue, instead of 
later when looping over the List<Message>

I was also trying to see if there is a way to attach “metadata” to a message 
and only read that?  These messages are large, and presumably it is expensive 
to scan them, and I’m really only looking for a UUID embedded in all of that.


JDK: Temurin 17

AMQ broker version: 2.28.0

AMQ JMS client dependency:
<dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>artemis-jms-client-all</artifactId>
                <version>2.28.0</version>
</dependency>


Thanks
John



public List<Message> browseMessages(String queueName) {
  // This helps avoid a known issue with queue message enumeration (in ActiveMQ 
"classic"), where after getting the final message
  // it hangs for a significant time, up to 20 seconds, before the Enumeration 
returns false.
  int count = getQueueEntryCount(queueName);
  List<Message> result = new ArrayList<>();
  synchronized (lock) {
    try {
      Queue queue = getSession().createQueue(queueName);
      try (QueueBrowser browser = getSession().createBrowser(queue)) {
        Enumeration e = browser.getEnumeration();
        while (count-- > 0 && e.hasMoreElements()) {
          try {
            result.add((Message) e.nextElement());
          } catch (Exception ex) {
            LOG.warn("Error browsing queue '" + queueName + "'", ex);
          }
        }
      }
    } catch (Exception ex) {
      LOG.warn("Error browsing queue '" + queueName + "'", ex);
    }
  }
  return result;
}


The second processes the Messages:
public List<RpcRequestPacket> browseStartMessages(PodSpecifier podSpecifier) {
  List<RpcRequestPacket> result = new ArrayList<>();
  for (var message : browseMessages(getStartQueueName(podSpecifier))) {
    var bm = (BytesMessage)message;
    try {
      var requestBytes = new byte[(int) bm.getBodyLength()];
      bm.readBytes(requestBytes);
      result.add(new RpcRequestPacket(requestBytes, new 
JmsRpcMessageMetadata(message.getJMSMessageID(), "")));
    }
    catch (Exception ex) {
      LOG.error("Error enumerating messages for pod " + podSpecifier, ex);
    }
  }
  return result;
}




[rg] <https://www.redpointglobal.com/>

John Lilley

Data Management Chief Architect, Redpoint Global Inc.

888 Worcester Street, Suite 200 Wellesley, MA 02482

M: +1 7209385761<tel:+1%207209385761> | 
john.lil...@redpointglobal.com<mailto:john.lil...@redpointglobal.com>

PLEASE NOTE: This e-mail from Redpoint Global Inc. (“Redpoint”) is confidential 
and is intended solely for the use of the individual(s) to whom it is 
addressed. If you believe you received this e-mail in error, please notify the 
sender immediately, delete the e-mail from your computer and do not copy, print 
or disclose it to anyone else. If you properly received this e-mail as a 
customer, partner or vendor of Redpoint, you should maintain its contents in 
confidence subject to the terms and conditions of your agreement(s) with 
Redpoint.

Reply via email to