Hi,
working with Artemis, I am trying to understand how to reuse a session
and corresponding producer and consumer in a request-reply scenario. I
have started with the RequestReplyExample and tried to make it work with
multiple requests/responses. After all, the example's comments say:
"Of course, in a real world example you would re-use the session,
producer, consumer and temporary queue and not create a new one for each
message!"
Since I know that sessions should not be used by multiple threads, I am
making sure, all messages are sent through the same thread.
(My code is at the very bottom of this message.)
However, I still get exceptions:
Nov 17, 2021 6:42:58 PM
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl startCall
WARN: AMQ212051: Invalid concurrent session usage. Sessions are not
supposed to be used by more than one thread concurrently.
java.lang.Exception: trace
at
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.startCall(ClientSessionImpl.java:1587)
at
org.apache.activemq.artemis.core.client.impl.ClientSessionImpl.acknowledge(ClientSessionImpl.java:1209)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.doAck(ClientConsumerImpl.java:1117)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.acknowledge(ClientConsumerImpl.java:788)
at
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:136)
at
org.apache.activemq.artemis.core.client.impl.ClientMessageImpl.acknowledge(ClientMessageImpl.java:38)
at
org.apache.activemq.artemis.jms.client.JMSMessageListenerWrapper.onMessage(JMSMessageListenerWrapper.java:136)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl.callOnMessage(ClientConsumerImpl.java:1013)
at
org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl$Runner.run(ClientConsumerImpl.java:1133)
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:42)
at
org.apache.activemq.artemis.utils.actors.OrderedExecutor.doTask(OrderedExecutor.java:31)
at
org.apache.activemq.artemis.utils.actors.ProcessorBase.executePendingTasks(ProcessorBase.java:65)
at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at
org.apache.activemq.artemis.utils.ActiveMQThreadFactory$1.run(ActiveMQThreadFactory.java:118)
As soon as I remove the client's MessageListener, the exceptions are
gone. So I assume that sending the ACK (see stack trace) is done by
another thread and that might be the problem. However, I don't know how
to change this.
Can anybody give me a hint how to procede?
Btw. the comment also says:
"Or better still use the correlation id, and just store the requests in
a map, then you don't need a temporary queue at all"
I am very interested but have no idea how this is supposed to work.
Which queue should the responses be sent through?
Any help is greatly appreciated!
Thank you!
Yann
package org.apache.activemq.artemis.jms.example;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import javax.jms.*;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class RequestReplyExample {
public static void main(final String[] args)throws Exception {
new Server().start();
Client client =new Client();
for (int i =0; i <100; i++) {
client.sendJob("message-" + i);
}
}
}
class Client {
private final Executor executor =Executors.newSingleThreadExecutor();
private final Session session;
private final TemporaryQueue replyQueue;
private final MessageProducer producer;
public Client()throws JMSException {
var cf =new
ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
Connection connection =cf.createConnection();
connection.start();
session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue requestQueue =session.createQueue("queue1");
producer =session.createProducer(requestQueue);
replyQueue =session.createTemporaryQueue();
MessageConsumer replyConsumer =session.createConsumer(replyQueue);
replyConsumer.setMessageListener(replyMessage -> {
try {
TextMessage reply = (TextMessage) replyMessage;
System.out.println("Got reply: " +reply.getText());
// use correlationId to correlate with request...
}catch (JMSException e) {
e.printStackTrace();
}
});
}
public void sendJob(String json) {
executor.execute(() -> {// make sure the session is only used by a single
thread! try {
TextMessage msg =session.createTextMessage(json);
msg.setJMSReplyTo(replyQueue);
System.out.println("Sending message: " +json +" with replyTo: "
+replyQueue);
producer.send(msg);
}catch (JMSException e) {
e.printStackTrace();
}
});
}
}
class Server {
private Connection connection;
public void start()throws Exception {
ActiveMQConnectionFactory cf =new
ActiveMQConnectionFactory("tcp://localhost:61616?ha=true&retryInterval=1000&retryIntervalMultiplier=1.0&reconnectAttempts=-1");
connection =cf.createConnection();
connection.start();
Session session =connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
Queue requestQueue =session.createQueue("queue1");
MessageProducer replyProducer =session.createProducer(null);
MessageConsumer requestConsumer =session.createConsumer(requestQueue);
requestConsumer.setMessageListener(request -> {
try {
System.out.println("Received request message: " + ((TextMessage)
request).getText());
Destination replyDestination = request.getJMSReplyTo();
System.out.println("Reply to queue: " +replyDestination);
TextMessage replyMessage =session.createTextMessage("A reply
message");
replyMessage.setJMSCorrelationID(request.getJMSMessageID());
replyProducer.send(replyDestination,replyMessage);
System.out.println("Reply sent");
}catch (JMSException e) {
e.printStackTrace();
}
});
}
public void shutdown()throws JMSException {
connection.close();
}
}