Hi Filip,
I think I might have had the same problem as you in the past because the
methods (I think) are required are not available on the JMS interface. My
solution was a rather hacky approach of casting the JMS instance to the
actual Qpid type, as in the following examples:
private void method(String method, Map arguments, String address)
throws JMSException { Destination reply_to =
session.createTemporaryQueue(); MessageConsumer receiver =
session.createConsumer(reply_to); JmsMapMessage request =
(JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", "_method_request");
request.setStringProperty("method", "request"); Map
object_id_map = new HashMap();
object_id_map.put("_object_name", address);
request.setString("_method_name", method); // Can't set map
directly on object, so we'll hack it here JmsMapMessageFacade
facade = (JmsMapMessageFacade)request.getFacade();
facade.put("_object_id", object_id_map);
facade.put("_arguments", arguments); int correlation_id =
sendMessage(request, reply_to); List<Message> response =
awaitResponse(receiver, correlation_id, 10 * 1000); if
(response != null && response.size() > 0) { new
ResponseDecoder().decodeResponse(response); } else {
throw new JMSException("No response received"); } }
private int sendRequest(String opcode, Map<String, ?> query,
Destination replyAddress) throws JMSException { JmsMapMessage
request = (JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", opcode); // Can't set
map directly on object, so we'll hack it here
JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
for (Map.Entry<String, ?> entry : query.entrySet()) {
facade.put(entry.getKey(), entry.getValue()); } return
sendMessage(request, replyAddress); }
Unfortunately I haven't structured our Java implementation of the QMF2
interface as well as I could have done and it's rather entrenched in
another of our libraries but I'm sure no one will mind if I share the
attached implementation (based on the Qpid Python implementation) for
reference.
/Chris
On 19 December 2016 at 14:14, Filip Nguyen <[email protected]> wrote:
> We currently use Java JMS qpid-client 0.32. It is quite easy to use QMF
> [1] just by using MapMessage etc.
>
> With new Java Qpid JMS client org.apache.qpid:qpid-jms-client:jar:0.10.0
> I couldn't find a way how to use Qpid QMF. Is there any existing usage or
> example how to do that?
>
> [1]
>
> MapMessage request = session.createMapMessage();
> request.setJMSReplyTo(responseQueue);
> request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
> request.setStringProperty("qmf.opcode", "_query_request");
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>
--
*Chris Richardson*, System Architect
[email protected]
*FourC AS, Vestre Rosten 81, Trekanten, NO-7075 Tiller, Norwaywww.fourc.eu
<http://www.fourc.eu/>*
*Follow us on LinkedIn <http://bit.ly/fourcli>, Facebook
<http://bit.ly/fourcfb>, Google+ <http://bit.ly/fourcgp> and Twitter
<http://bit.ly/fourctw>!*
/*
* Copyright (C) 2015 FourC AS, http://www.fourc.eu/
* All Rights Reserved.
*/
package eu.fourc.messenger.fmf.impl.java;
import com.google.common.base.MoreObjects;
import eu.fourc.messenger.fmf.Exchange;
import eu.fourc.messenger.fmf.Node;
import eu.fourc.messenger.fmf.Queue;
import eu.fourc.messenger.impl.ConnectionImpl;
import org.apache.qpid.jms.JmsTopic;
import org.apache.qpid.jms.message.JmsMapMessage;
import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFacade;
import javax.jms.*;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
public class BrokerAgent {
private static final String TARGET_ADDRESS = "qmf.default.direct";
private static final String TARGET_SUBJECT = "broker";
private Session session;
private int correlator = 1; // Maybe use atomic int?
public BrokerAgent(Connection connection) throws JMSException {
Connection conn = connection instanceof ConnectionImpl ? ((ConnectionImpl)connection).getConnection() : connection;
session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
Session getSession() {
return session;
}
private int sendMessage(Message message, Destination replyAddress) throws JMSException {
Destination target = new JmsTopic(TARGET_ADDRESS);
MessageProducer sender = session.createProducer(target);
message.setJMSReplyTo(replyAddress);
int local_correlator = correlator++;
message.setJMSCorrelationID("ID:" + String.valueOf(local_correlator));
// Routing key
((AmqpJmsMessageFacade)((JmsMessage) message).getFacade()).getAmqpMessage().setSubject(TARGET_SUBJECT);
sender.send(message);
return local_correlator;
}
private List<Message> awaitResponse(MessageConsumer receiver, int correlation_id, long timeout) throws JMSException {
String local_correlation_id = "ID:" + String.valueOf(correlation_id);
List<Message> messages = new LinkedList<>();
long start = System.currentTimeMillis();
boolean finished = false;
while (!finished) {
// Make sure we don't wait longer than "timeout" in total, even when receiving multiple messages
long local_timeout = timeout - System.currentTimeMillis() + start;
if (local_timeout < 0) {
break;
}
Message message = receiver.receive(local_timeout);
if (message != null) {
if (message.getJMSCorrelationID().equals(local_correlation_id)) {
messages.add(message);
// If the message contains the "partial" flag, there should be more message to read.
// Empirical data suggests the broker transmits a maximum of 100 records per message.
finished = !message.propertyExists("partial");
}
// Don't acknowledge the message here: it could be a message we want (in which case
// we can acknowledge it later, when it's been processed) or there was a correlation mismatch
// (in which case it has nothing to do with us).
}
}
return messages;
}
private void method(String method, Map arguments) throws JMSException {
method(method, arguments, "org.apache.qpid.broker:broker:amqp-broker");
}
private void method(String method, Map arguments, String address) throws JMSException {
Destination reply_to = session.createTemporaryQueue();
MessageConsumer receiver = session.createConsumer(reply_to);
JmsMapMessage request = (JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", "_method_request");
request.setStringProperty("method", "request");
Map object_id_map = new HashMap();
object_id_map.put("_object_name", address);
request.setString("_method_name", method);
// Can't set map directly on object, so we'll hack it here
JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
facade.put("_object_id", object_id_map);
facade.put("_arguments", arguments);
int correlation_id = sendMessage(request, reply_to);
List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000);
if (response != null && response.size() > 0) {
new ResponseDecoder().decodeResponse(response);
} else {
throw new JMSException("No response received");
}
}
private int sendRequest(String opcode, Map<String, ?> query, Destination replyAddress) throws JMSException {
JmsMapMessage request = (JmsMapMessage) session.createMapMessage();
request.setValidatePropertyNames(false);
request.setStringProperty("x-amqp-0-10.app-id", "qmf2");
request.setStringProperty("qmf.opcode", opcode);
// Can't set map directly on object, so we'll hack it here
JmsMapMessageFacade facade = (JmsMapMessageFacade)request.getFacade();
for (Map.Entry<String, ?> entry : query.entrySet()) {
facade.put(entry.getKey(), entry.getValue());
}
return sendMessage(request, replyAddress);
}
private List objectQuery(String schemaMetaId, String objectMetaId, String objectId) throws JMSException {
Destination reply_to = session.createTemporaryQueue();
MessageConsumer receiver = session.createConsumer(reply_to);
Map schemaId = new HashMap();
schemaId.put(objectMetaId, objectId);
Map query = new HashMap();
query.put("_what", "OBJECT");
query.put(schemaMetaId, schemaId);
int correlation_id = sendRequest("_query_request", query, reply_to);
List<Message> response = awaitResponse(receiver, correlation_id, 10 * 1000);
return new ResponseDecoder().decodeResponse(response);
}
private List classQuery(String className) throws JMSException {
return objectQuery("_schema_id", "_class_name", className);
}
private List nameQuery(String objectId) throws JMSException {
return objectQuery("_object_id", "_object_name", objectId);
}
private Node getNode(String nodeType, String name) throws JMSException {
String qpidName = String.format("org.apache.qpid.broker:%s:%s", nodeType, name);
List nodes = nameQuery(qpidName);
return nodes == null || nodes.isEmpty() ? null : (Node) nodes.get(0);
}
public Queue getQueue(String name) throws JMSException {
return (Queue) getNode("queue", name);
}
public Exchange getExchange(String name) throws JMSException {
return (Exchange) getNode("exchange", name);
}
public List getQueues() throws JMSException {
return classQuery("queue");
}
public List getExchanges() throws JMSException {
return classQuery("exchange");
}
public void createExchange(String type, String name, Map<String, String> properties) throws JMSException {
Map<String, String> props = new HashMap<>();
props.put("exchange-type", type);
if (properties != null) {
props.putAll(properties);
}
Map<String, Object> args = new HashMap();
args.put("type", "exchange");
args.put("name", name);
args.put("properties", props);
args.put("strict", true);
method("create", args);
}
public void deleteExchange(String name) throws JMSException {
Map<String, Object> args = new HashMap<>();
args.put("type", "exchange");
args.put("name", name);
method("delete", args);
}
public void createQueue(String name, Map<String, String> properties) throws JMSException {
Map<String, String> props = new HashMap<>();
if (properties != null) {
props.putAll(properties);
}
Map<String, Object> args = new HashMap();
args.put("type", "queue");
args.put("name", name);
args.put("properties", props);
args.put("strict", true);
method("create", args);
}
public void deleteQueue(String name, boolean if_empty, boolean if_unused) throws JMSException {
Map<String, String> options = new HashMap<>();
options.put("if_empty", String.valueOf(if_empty));
options.put("if_unused", String.valueOf(if_unused));
Map<String, Object> args = new HashMap<>();
args.put("type", "queue");
args.put("name", name);
args.put("options", options);
method("delete", args);
}
public void bind(String exchange, String queue, String key, Map<String, String> options) throws JMSException {
Map<String, String> properties = options == null ? new HashMap<String, String>() : new HashMap<>(options);
Map<String, Object> args = new HashMap<>();
args.put("type", "binding");
args.put("name", String.format("%s/%s/%s", exchange, queue, key));
args.put("properties", properties);
args.put("strict", true);
method("create", args);
}
public void unbind(String exchange, String queue, String key) throws JMSException {
Map<String, Object> args = new HashMap<>();
args.put("type", "binding");
args.put("name", String.format("%s/%s/%s", exchange, queue, key));
args.put("strict", true);
method("delete", args);
}
public void reloadACLFile() throws JMSException {
Map arguments = new HashMap();
String address = "org.apache.qpid.acl:acl:org.apache.qpid.broker:broker:amqp-broker";
method("reloadACLFile", arguments, address);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("session", session)
.add("correlator", correlator)
.toString();
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]