On 27/08/16 04:53, lucas wrote:
 Does "QMF framework" means qpid-config and qpid-route and so on?

QMF is a management protocol layered on top of AMQP. Those tools use that protocol, but you can also sends and receive QMF messages directly from any client API.

I never use anything about QMF and i cant even use qpid-route because it
report i lack module qmf.console.
But i have already install QPID-broker

I'm assuming it is the c++ broker you have, right? (There is a plugin for QMF support for the java broker, but the recommended approach for the java broker I believe is the REST API, i.e. HTTP).

、C++MessageAPI、PythonMessagAPI.
I am also looking for the solution above.If QMF framework have connection
with qmf.console,I think i cant use it temporarilly.
Could you please write me an example code that use QMF framework to
dinamically bind/unbind topics to
queues ?

See attached.

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

class Controller
{
  public:
    Controller(Session&);
    void bind(const std::string& exchange, const std::string& queue, const std::string& key);
    void unbind(const std::string& exchange, const std::string& queue, const std::string& key);
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;

    Variant::Map getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key);
    void invoke(const std::string& method, const Variant::Map& args);
};

int main(int argc, char** argv)
{
    Connection c(argc > 1 ? argv[1] : "localhost");
    try {
        c.open();
        Session session = c.createSession();
        Receiver r = session.createReceiver("my-queue; {create: always, node:{x-declare:{auto-delete:True}}}");
        Controller controller(session);
        while (true) {
            Message message = r.fetch();
            if (message.getSubject() == "control") {
                Variant::Map content;
                decode(message, content);
                controller.bind(content["exchange"], "my-queue", content["key"]);
            } else {
                std::cout << "Received: " << message.getContent() << std::endl;
            }
            session.acknowledge();
        }
    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}

Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                          sender(session.createSender("qmf.default.direct/broker")),
                                          receiver(session.createReceiver(replyTo)) {}

void Controller::bind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("create", getBindingArguments(exchange, queue, key));
}

void Controller::unbind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("delete", getBindingArguments(exchange, queue, key));
}

Variant::Map Controller::getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key)
{
    std::stringstream name;
    name << exchange << "/" << queue << "/" << key;
    Variant::Map args;
    args["type"] = "binding";
    args["name"] = name.str();
    return args;
}

void Controller::invoke(const std::string& method, const Variant::Map& args)
{
    Variant::Map content;
    Variant::Map objectId;
    objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
    content["_object_id"] = objectId;
    content["_method_name"] = method;
    content["_arguments"] = args;

    Message request;
    request.setReplyTo(replyTo);
    request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
    request.getProperties()["qmf.opcode"] = "_method_request";
    encode(content, request);

    sender.send(request);

    Message response;
    if (receiver.fetch(response, Duration::SECOND*5)) {
        if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
            std::string opcode = response.getProperties()["qmf.opcode"];
            if (opcode == "_exception") {
                Variant::Map m;
                decode(response, m);
                std::cout << "Error: " << m["_values"] << std::endl;
            }
        } else {
            std::cout << "Invalid response received, not a qmfv2 message: app-id="
                      << response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
        }
    } else {
        std::cout << "No response received" << std::endl;
    }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to