On 30/08/16 10:13, lucas wrote:
Thank you very much!Sim.
I have compiled and run your code and I find that working when I call
function controller.bind(),
but it seems doesn't work when I call controller.unbind(I qpif-config add a
queue and exchange,then bind them.I want to see if controller.unbind will
unbind them but It does not).

Works for me. I modified the example to allow you to trigger the unbind also (see attached). I did a quick test as follows:

$ qpid-config queues -r
Queue '46f35f54-10b6-44e2-9e36-74a6b4fa8f91:0.0'
    bind [46f35f54-10b6-44e2-9e36-74a6b4fa8f91:0.0] => ''
    bind [direct.c1249134-6c8e-454a-b973-9a2b48c71962] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
    bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
    bind [my-queue] => ''
$ qpid-send -a my-queue/control -M exchange=amq.direct -M key=my-key
$ qpid-config queues -r
Queue '0d2f8688-6a79-4ab7-9abb-a648f3584060:0.0'
    bind [0d2f8688-6a79-4ab7-9abb-a648f3584060:0.0] => ''
    bind [direct.fd79576d-5378-4088-a812-645d7ba0fb33] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
    bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
    bind [my-queue] => ''
    bind [my-key] => amq.direct
$ qpid-send -a my-queue/control -M exchange=amq.direct -M key=my-key -M 
unbind=true
$ qpid-config queues -r
Queue 'ab23bbb5-23f9-47d5-a5e8-d6dea168f7c2:0.0'
    bind [ab23bbb5-23f9-47d5-a5e8-d6dea168f7c2:0.0] => ''
    bind [direct.01106841-1373-4e5a-9351-ae266b16a39e] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
    bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
    bind [my-queue] => ''
$ qpid-config bind amq.topic my-queue some-key
$ qpid-config queues -r
Queue '49f64c5d-6f1c-4052-9539-981c4badf067:0.0'
    bind [49f64c5d-6f1c-4052-9539-981c4badf067:0.0] => ''
    bind [direct.19776178-a495-4dba-b099-6441ed870074] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
    bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
    bind [my-queue] => ''
    bind [some-key] => amq.topic
$ qpid-send -a my-queue/control -M exchange=amq.topic -M key=some-key -M 
unbind=true
$ qpid-config queues -r
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
    bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'f7361e1d-44eb-4954-b733-a05315d0c8a3:0.0'
    bind [f7361e1d-44eb-4954-b733-a05315d0c8a3:0.0] => ''
    bind [direct.f81355a8-a136-4956-bf42-c3f65a953e02] => qmf.default.topic
Queue 'my-queue'
    bind [my-queue] => ''

(Note that the example as attached always uses my-queue as the queue name. You would have to change it to allow a different queue name to be passed in).

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

class Controller
{
  public:
    Controller(Session&);
    void bind(const std::string& exchange, const std::string& queue, const std::string& key);
    void unbind(const std::string& exchange, const std::string& queue, const std::string& key);
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;

    Variant::Map getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key);
    void invoke(const std::string& method, const Variant::Map& args);
};

int main(int argc, char** argv)
{
    Connection c(argc > 1 ? argv[1] : "localhost");
    try {
        c.open();
        Session session = c.createSession();
        Receiver r = session.createReceiver("my-queue; {create: always, node:{x-declare:{auto-delete:True}}}");
        Controller controller(session);
        while (true) {
            Message message = r.fetch();
            if (message.getSubject() == "control") {
                Variant::Map content;
                decode(message, content);
                if (content["unbind"]) controller.unbind(content["exchange"], "my-queue", content["key"]);
                else controller.bind(content["exchange"], "my-queue", content["key"]);
            } else {
                std::cout << "Received: " << message.getContent() << std::endl;
            }
            session.acknowledge();
        }
    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}

Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                          sender(session.createSender("qmf.default.direct/broker")),
                                          receiver(session.createReceiver(replyTo)) {}

void Controller::bind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("create", getBindingArguments(exchange, queue, key));
}

void Controller::unbind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("delete", getBindingArguments(exchange, queue, key));
}

Variant::Map Controller::getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key)
{
    std::stringstream name;
    name << exchange << "/" << queue << "/" << key;
    Variant::Map args;
    args["type"] = "binding";
    args["name"] = name.str();
    return args;
}

void Controller::invoke(const std::string& method, const Variant::Map& args)
{
    Variant::Map content;
    Variant::Map objectId;
    objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
    content["_object_id"] = objectId;
    content["_method_name"] = method;
    content["_arguments"] = args;

    Message request;
    request.setReplyTo(replyTo);
    request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
    request.getProperties()["qmf.opcode"] = "_method_request";
    encode(content, request);

    sender.send(request);

    Message response;
    if (receiver.fetch(response, Duration::SECOND*5)) {
        if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
            std::string opcode = response.getProperties()["qmf.opcode"];
            if (opcode == "_exception") {
                Variant::Map m;
                decode(response, m);
                std::cout << "Error: " << m["_values"] << std::endl;
            }
        } else {
            std::cout << "Invalid response received, not a qmfv2 message: app-id="
                      << response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
        }
    } else {
        std::cout << "No response received" << std::endl;
    }
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to