Hi,

working with Artemis, I am trying to understand how to reuse a session and corresponding producer and consumer in a request-reply scenario. I have started with the RequestReplyExample and tried to make it work with multiple requests/responses. After all, the example's comments say:

"Of course, in a real world example you would re-use the session, producer, consumer and temporary queue and not create a new one for each message!"

Since I know that sessions should not be used by multiple threads, I am making sure, all messages are sent through the same thread.

(My code is at the very bottom of this message.)

However, I still get exceptions:

Nov 17, 2021 6:42:58 PM org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall WARN: AMQ212051: Invalid concurrent session usage. Sessions are not supposed to be used by more than one thread concurrently.
java.lang.Exception: trace
    at org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)     at org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)     at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)     at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)     at org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)     at org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)     at org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)     at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)     at org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)     at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)     at org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)     at org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)     at org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)


As soon as I remove the client's MessageListener, the exceptions are gone. So I assume that sending the ACK (see stack trace) is done by another thread and that might be the problem. However, I don't know how to change this.

Can anybody give me a hint how to procede?

Btw. the comment also says:

"Or better still use the correlation id, and just store the requests in a map, then you don't need a temporary queue at all"

I am very interested but have no idea how this is supposed to work. Which queue should the responses be sent through?

Any help is greatly appreciated!

Thank you!

Yann


package org.apache.activemq.artemis.jms.example;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

public class RequestReplyExample {

   public static void main(final String[] args)throws Exception {
      new Server().start();
      Client client =new Client();
      for (int i =0; i <100; i++) {
         client.sendJob("message-" + i);
      }
   }
}

class Client {

   private final Executor executor =Executors.newSingleThreadExecutor();
   private final Session session;
   private final TemporaryQueue replyQueue;
   private final MessageProducer producer;

   public Client()throws JMSException {
      var cf =new 
ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
      Connection connection =cf.createConnection();
      connection.start();
      session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);

      Queue requestQueue =session.createQueue("queue1");
      producer =session.createProducer(requestQueue);

      replyQueue =session.createTemporaryQueue();
      MessageConsumer replyConsumer =session.createConsumer(replyQueue);
      replyConsumer.setMessageListener(replyMessage -> {
         try {
            TextMessage reply = (TextMessage) replyMessage;
            System.out.println("Got reply: " +reply.getText());
            // use correlationId to correlate with request...
         }catch (JMSException e) {
            e.printStackTrace();
         }
      });
   }

   public void sendJob(String json) {
      executor.execute(() -> {// make sure the session is only used by a single 
thread! try {
            TextMessage msg =session.createTextMessage(json);
            msg.setJMSReplyTo(replyQueue);
            System.out.println("Sending message: " +json +" with replyTo: " 
+replyQueue);
            producer.send(msg);
         }catch (JMSException e) {
            e.printStackTrace();
         }
      });
   }

}

class Server {
   private Connection connection;

   public void start()throws Exception {
      ActiveMQConnectionFactory cf =new 
ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
      connection =cf.createConnection();

      connection.start();
      Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
      Queue requestQueue =session.createQueue("queue1");
      MessageProducer replyProducer =session.createProducer(null);
      MessageConsumer requestConsumer =session.createConsumer(requestQueue);
      requestConsumer.setMessageListener(request -> {
         try {
            System.out.println("Received request message: " + ((TextMessage) 
request).getText());
            Destination replyDestination = request.getJMSReplyTo();
            System.out.println("Reply to queue: " +replyDestination);
            TextMessage replyMessage =session.createTextMessage("A reply 
message");
            replyMessage.setJMSCorrelationID(request.getJMSMessageID());
            replyProducer.send(replyDestination,replyMessage);
            System.out.println("Reply sent");
         }catch (JMSException e) {
            e.printStackTrace();
         }
      });
   }

   public void shutdown()throws JMSException {
      connection.close();
   }
}


Reply via email to