On 30/08/16 10:13, lucas wrote:
Thank you very much!Sim.
I have compiled and run your code and I find that working when I call
function controller.bind(),
but it seems doesn't work when I call controller.unbind(I qpif-config add a
queue and exchange,then bind them.I want to see if controller.unbind will
unbind them but It does not).
Works for me. I modified the example to allow you to trigger the unbind
also (see attached). I did a quick test as follows:
$ qpid-config queues -r
Queue '46f35f54-10b6-44e2-9e36-74a6b4fa8f91:0.0'
bind [46f35f54-10b6-44e2-9e36-74a6b4fa8f91:0.0] => ''
bind [direct.c1249134-6c8e-454a-b973-9a2b48c71962] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
bind [my-queue] => ''
$ qpid-send -a my-queue/control -M exchange=amq.direct -M key=my-key
$ qpid-config queues -r
Queue '0d2f8688-6a79-4ab7-9abb-a648f3584060:0.0'
bind [0d2f8688-6a79-4ab7-9abb-a648f3584060:0.0] => ''
bind [direct.fd79576d-5378-4088-a812-645d7ba0fb33] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
bind [my-queue] => ''
bind [my-key] => amq.direct
$ qpid-send -a my-queue/control -M exchange=amq.direct -M key=my-key -M
unbind=true
$ qpid-config queues -r
Queue 'ab23bbb5-23f9-47d5-a5e8-d6dea168f7c2:0.0'
bind [ab23bbb5-23f9-47d5-a5e8-d6dea168f7c2:0.0] => ''
bind [direct.01106841-1373-4e5a-9351-ae266b16a39e] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
bind [my-queue] => ''
$ qpid-config bind amq.topic my-queue some-key
$ qpid-config queues -r
Queue '49f64c5d-6f1c-4052-9539-981c4badf067:0.0'
bind [49f64c5d-6f1c-4052-9539-981c4badf067:0.0] => ''
bind [direct.19776178-a495-4dba-b099-6441ed870074] => qmf.default.topic
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'my-queue'
bind [my-queue] => ''
bind [some-key] => amq.topic
$ qpid-send -a my-queue/control -M exchange=amq.topic -M key=some-key -M
unbind=true
$ qpid-config queues -r
Queue 'db2c36e2-1818-4792-b4a4-a5b63fccf636#'
bind [db2c36e2-1818-4792-b4a4-a5b63fccf636#] => ''
Queue 'f7361e1d-44eb-4954-b733-a05315d0c8a3:0.0'
bind [f7361e1d-44eb-4954-b733-a05315d0c8a3:0.0] => ''
bind [direct.f81355a8-a136-4956-bf42-c3f65a953e02] => qmf.default.topic
Queue 'my-queue'
bind [my-queue] => ''
(Note that the example as attached always uses my-queue as the queue
name. You would have to change it to allow a different queue name to be
passed in).
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>
using namespace qpid::messaging;
using namespace qpid::types;
using std::string;
class Controller
{
public:
Controller(Session&);
void bind(const std::string& exchange, const std::string& queue, const std::string& key);
void unbind(const std::string& exchange, const std::string& queue, const std::string& key);
private:
Address replyTo;
Sender sender;
Receiver receiver;
Variant::Map getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key);
void invoke(const std::string& method, const Variant::Map& args);
};
int main(int argc, char** argv)
{
Connection c(argc > 1 ? argv[1] : "localhost");
try {
c.open();
Session session = c.createSession();
Receiver r = session.createReceiver("my-queue; {create: always, node:{x-declare:{auto-delete:True}}}");
Controller controller(session);
while (true) {
Message message = r.fetch();
if (message.getSubject() == "control") {
Variant::Map content;
decode(message, content);
if (content["unbind"]) controller.unbind(content["exchange"], "my-queue", content["key"]);
else controller.bind(content["exchange"], "my-queue", content["key"]);
} else {
std::cout << "Received: " << message.getContent() << std::endl;
}
session.acknowledge();
}
} catch(const std::exception& error) {
std::cout << "ERROR: " << error.what() << std::endl;
}
c.close();
return 0;
}
Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
sender(session.createSender("qmf.default.direct/broker")),
receiver(session.createReceiver(replyTo)) {}
void Controller::bind(const std::string& exchange, const std::string& queue, const std::string& key)
{
invoke("create", getBindingArguments(exchange, queue, key));
}
void Controller::unbind(const std::string& exchange, const std::string& queue, const std::string& key)
{
invoke("delete", getBindingArguments(exchange, queue, key));
}
Variant::Map Controller::getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key)
{
std::stringstream name;
name << exchange << "/" << queue << "/" << key;
Variant::Map args;
args["type"] = "binding";
args["name"] = name.str();
return args;
}
void Controller::invoke(const std::string& method, const Variant::Map& args)
{
Variant::Map content;
Variant::Map objectId;
objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
content["_object_id"] = objectId;
content["_method_name"] = method;
content["_arguments"] = args;
Message request;
request.setReplyTo(replyTo);
request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
request.getProperties()["qmf.opcode"] = "_method_request";
encode(content, request);
sender.send(request);
Message response;
if (receiver.fetch(response, Duration::SECOND*5)) {
if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
std::string opcode = response.getProperties()["qmf.opcode"];
if (opcode == "_exception") {
Variant::Map m;
decode(response, m);
std::cout << "Error: " << m["_values"] << std::endl;
}
} else {
std::cout << "Invalid response received, not a qmfv2 message: app-id="
<< response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
}
} else {
std::cout << "No response received" << std::endl;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]