On 06/10/2013 08:35 PM, CLIVE wrote:
The application needs to dynamically create/remove bindings over time,
so I was initially creating individual receivers as I thought the
messages would get routed to the correct receiver based on the subject
key of each delivered message. Providing a list of bindings at start up
is not possible in this case.

If you want the messages to go to different receivers, is there a reason you want them to share the same queue? i.e. could you just create receivers as needed for each exchange/routing-key pair, closing them when not needed?

Thanks for the info on the link binding, that would solve my initial
problem. Would the message routing also get fixed using this address
string i.e. Receiver1 created with rxer1, receives just the rxer1
messages from the queue?

No. The x-bindings don't affect the messages selected from the queue, they just control which messages of those delivered to the exchange will be enqueued.

If you could send the detail that would be good. I can see that I will
probably re-design my implementation to use just a single receiver with
the bindings added/removed using the QMF commands. This would also
simplify my issues with the session going invalid.

The basic pattern is to send a special 'create' request map message to qmf.default.direct/broker (the broker agent). This messages should have a 'x-amqp-0-10.app-id' property set to 'qmf2' and a 'qmf.opcode' property set to '_method_request'. The content is a map of the form:

{
   _object_id : {_object_name:org.apache.qpid.broker:broker:amqp-broker},
   _method_name : create,
   _arguments : {type:binding, name:my_exchange/my_queue/my_key}
}

The name is constructed from the exchange name, the queue name and the routing key, each separated by a forward slash.

To unbind you send a very similar 'delete' message (all theat changes from the create is the value for the '_method_name' entry, which should be delete instead of create.

Attached is a little example that does something similar.

/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>
#include <sstream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

class Controller
{
  public:
    Controller(Session&);
    void bind(const std::string& exchange, const std::string& queue, const std::string& key);
    void unbind(const std::string& exchange, const std::string& queue, const std::string& key);
  private:
    Address replyTo;
    Sender sender;
    Receiver receiver;

    Variant::Map getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key);
    void invoke(const std::string& method, const Variant::Map& args);
};

int main(int argc, char** argv)
{
    Connection c(argc > 1 ? argv[1] : "localhost");
    try {
        c.open();
        Session session = c.createSession();
        Receiver r = session.createReceiver("my-queue; {create: always, node:{x-declare:{auto-delete:True}}}");
        Controller controller(session);
        while (true) {
            Message message = r.fetch();
            if (message.getSubject() == "control") {
                Variant::Map content;
                decode(message, content);
                controller.bind(content["exchange"], "my-queue", content["key"]);
            } else {
                std::cout << "Received: " << message.getContent() << std::endl;
            }
            session.acknowledge();
        }
    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}

Controller::Controller(Session& session) :replyTo("#; {create:always, node:{x-declare:{auto-delete:true}}}"),
                                          sender(session.createSender("qmf.default.direct/broker")),
                                          receiver(session.createReceiver(replyTo)) {}

void Controller::bind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("create", getBindingArguments(exchange, queue, key));
}

void Controller::unbind(const std::string& exchange, const std::string& queue, const std::string& key)
{
    invoke("delete", getBindingArguments(exchange, queue, key));
}

Variant::Map Controller::getBindingArguments(const std::string& exchange, const std::string& queue, const std::string& key)
{
    std::stringstream name;
    name << exchange << "/" << queue << "/" << key;
    Variant::Map args;
    args["type"] = "binding";
    args["name"] = name.str();
    return args;
}

void Controller::invoke(const std::string& method, const Variant::Map& args)
{
    Variant::Map content;
    Variant::Map objectId;
    objectId["_object_name"] = "org.apache.qpid.broker:broker:amqp-broker";
    content["_object_id"] = objectId;
    content["_method_name"] = method;
    content["_arguments"] = args;

    Message request;
    request.setReplyTo(replyTo);
    request.getProperties()["x-amqp-0-10.app-id"] = "qmf2";
    request.getProperties()["qmf.opcode"] = "_method_request";
    encode(content, request);

    sender.send(request);

    Message response;
    if (receiver.fetch(response, Duration::SECOND*5)) {
        if (response.getProperties()["x-amqp-0-10.app-id"] == "qmf2") {
            std::string opcode = response.getProperties()["qmf.opcode"];
            if (opcode == "_exception") {
                Variant::Map m;
                decode(response, m);
                std::cout << "Error: " << m["_values"] << std::endl;
            }
        } else {
            std::cout << "Invalid response received, not a qmfv2 message: app-id="
                      << response.getProperties()["x-amqp-0-10.app-id"] << std::endl;
        }
    } else {
        std::cout << "No response received" << std::endl;
    }
}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to