On 08/21/2010 07:01 PM, Daniel Lundin wrote:
What would the equivalent of a queue.purge in AMQP 0.8/0.10 be using
qpid.messaging?

There is no direct method to purge a queue in the messaging API. However you can use the management protocol to send a purge request to the broker (as a map message).

Attached are basic examples of this approach for python and c++ (purges 1 message from my-queue, the details obviously can be changed).
/*
 *
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */


#include <qpid/messaging/Address.h>
#include <qpid/messaging/Connection.h>
#include <qpid/messaging/Message.h>
#include <qpid/messaging/Receiver.h>
#include <qpid/messaging/Sender.h>
#include <qpid/messaging/Session.h>
#include <qpid/types/Variant.h>
#include <iostream>

using namespace qpid::messaging;
using namespace qpid::types;

using std::string;

int main(int argc, char** argv)
{
    Connection c(argc > 1 ? argv[1] : "localhost");
    try {
        c.open();
        Session session = c.createSession();
        Address responses("qmf.default.direct/my-name; {node: {type: topic}}");
        Receiver r = session.createReceiver(responses);
        Sender s = session.createSender("qmf.default.direct/broker");
    	Message request;
        request.setReplyTo(responses);
        request.setContentType("amqp/map");
        request.setProperty("x-amqp-0-10.app-id", "qmf2");
        request.setProperty("qmf.opcode", "_method_request");
        Variant::Map oid;
        oid["_object_name"] = "org.apache.qpid.broker:queue:my-queue";
        Variant::Map content;
        content["_method_name"] = "purge";
        content["_object_id"] = oid;
        Variant::Map arguments;
        arguments["request"] = 1;
        content["_arguments"] = arguments;

        encode(content, request);
        s.send(request);
        Message response = r.fetch();
        Variant::Map contentIn;
        decode(response, contentIn);
        std::cout << response.getProperties() << ": "  << contentIn << std::endl;
        session.acknowledge();

    } catch(const std::exception& error) {
        std::cout << "ERROR: " << error.what() << std::endl;
    }
    c.close();
    return 0;
}


#!/usr/bin/env python
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

import sys
from qpid.messaging import *

if len(sys.argv)<2:
  broker =  "localhost:5672" 
else:
  broker = sys.argv[1]

connection = Connection(broker)

try:
  connection.open()
  session = connection.session()

  receiver = session.receiver("qmf.default.direct/my-name; {node: {type: 
topic}}")
  sender = session.sender("qmf.default.direct/broker")

  request = {}
  request["_object_id"] = 
{"_object_name":"org.apache.qpid.broker:queue:my-queue"}
  request["_method_name"] = "purge"
  request["_arguments"] = {"request":1}
  msg = Message(reply_to=receiver.source,content=request)
  msg.properties["x-amqp-0-10.app-id"] = "qmf2"
  msg.properties["qmf.opcode"] = "_method_request"
  sender.send(msg);

  message = receiver.fetch()
  print message
  session.acknowledge()

except MessagingError,m:
  print m

connection.close()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to