On 27/08/16 04:53, lucas wrote:
Does "QMF framework" means qpid-config and qpid-route and so on?
QMF is a management protocol layered on top of AMQP. Those tools use that protocol, but you can also sends and receive QMF messages directly from any client API.
I never use anything about QMF and i cant even use qpid-route because it report i lack module qmf.console. But i have already install QPID-broker
I'm assuming it is the c++ broker you have, right? (There is a plugin for QMF support for the java broker, but the recommended approach for the java broker I believe is the REST API, i.e. HTTP).
、C++MessageAPI、PythonMessagAPI. I am also looking for the solution above.If QMF framework have connection with qmf.console,I think i cant use it temporarilly. Could you please write me an example code that use QMF framework to dinamically bind/unbind topics to queues ?
See attached.
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include <qpid/messaging/Address.h> #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <qpid/types/Variant.h> #include <iostream> #include <sstream> using namespace qpid::messaging; using namespace qpid::types; using std::string; class Controller { public: Controller(Session&); void bind(const std::string& exchange, const std::string& queue, const std::string& key); void unbind(const std::string& exchange, const std::string& queue, const std::string& key); private: Address replyTo; Sender sender; Receiver receiver; Variant::Map getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key); void invoke(const std::string& method, const Variant::Map& args); }; int main(int argc, char** argv) { Connection c(argc > 1 ? argv[1] : "localhost"); try { c.open(); Session session = c.createSession(); Receiver r = session.createReceiver("my-queue; {create: always, node:{x-declare:{auto-delete:True}}}"); Controller controller(session); while (true) { Message message = r.fetch(); if (message.getSubject() == "control") { Variant::Map content; decode(message, content); controller.bind(content["exchange"], "my-queue", content["key"]); } else { std::cout << "Received: " << message.getContent() << std::endl; } session.acknowledge(); } } catch(const std::exception& error) { std::cout << "ERROR: " << error.what() << std::endl; } c.close(); return 0; } Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"), sender(session.createSender("qmf.default.direct/broker")), receiver(session.createReceiver(replyTo)) {} void Controller::bind(const std::string& exchange, const std::string& queue, const std::string& key) { invoke("create", getBindingArguments(exchange, queue, key)); } void Controller::unbind(const std::string& exchange, const std::string& queue, const std::string& key) { invoke("delete", getBindingArguments(exchange, queue, key)); } Variant::Map Controller::getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key) { std::stringstream name; name << exchange << "/" << queue << "/" << key; Variant::Map args; args["type"] = "binding"; args["name"] = name.str(); return args; } void Controller::invoke(const std::string& method, const Variant::Map& args) { Variant::Map content; Variant::Map objectId; objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker"; content["_object_id"] = objectId; content["_method_name"] = method; content["_arguments"] = args; Message request; request.setReplyTo(replyTo); request.getProperties()["x-amqp-0-10.app-id"] = "qmf2"; request.getProperties()["qmf.opcode"] = "_method_request"; encode(content, request); sender.send(request); Message response; if (receiver.fetch(response, Duration::SECOND*5)) { if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") { std::string opcode = response.getProperties()["qmf.opcode"]; if (opcode == "_exception") { Variant::Map m; decode(response, m); std::cout << "Error: " << m["_values"] << std::endl; } } else { std::cout << "Invalid response received, not a qmfv2 message: app-id=" << response.getProperties()["x-amqp-0-10.app-id"] << std::endl; } } else { std::cout << "No response received" << std::endl; } }
--------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
