On 08/21/2010 07:01 PM, Daniel Lundin wrote:
What would the equivalent of a queue.purge in AMQP 0.8/0.10 be using qpid.messaging?
There is no direct method to purge a queue in the messaging API. However you can use the management protocol to send a purge request to the broker (as a map message).
Attached are basic examples of this approach for python and c++ (purges 1 message from my-queue, the details obviously can be changed).
/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include <qpid/messaging/Address.h> #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <qpid/types/Variant.h> #include <iostream> using namespace qpid::messaging; using namespace qpid::types; using std::string; int main(int argc, char** argv) { Connection c(argc > 1 ? argv[1] : "localhost"); try { c.open(); Session session = c.createSession(); Address responses("qmf.default.direct/my-name; {node: {type: topic}}"); Receiver r = session.createReceiver(responses); Sender s = session.createSender("qmf.default.direct/broker"); Message request; request.setReplyTo(responses); request.setContentType("amqp/map"); request.setProperty("x-amqp-0-10.app-id", "qmf2"); request.setProperty("qmf.opcode", "_method_request"); Variant::Map oid; oid["_object_name"] = "org.apache.qpid.broker:queue:my-queue"; Variant::Map content; content["_method_name"] = "purge"; content["_object_id"] = oid; Variant::Map arguments; arguments["request"] = 1; content["_arguments"] = arguments; encode(content, request); s.send(request); Message response = r.fetch(); Variant::Map contentIn; decode(response, contentIn); std::cout << response.getProperties() << ": " << contentIn << std::endl; session.acknowledge(); } catch(const std::exception& error) { std::cout << "ERROR: " << error.what() << std::endl; } c.close(); return 0; }
#!/usr/bin/env python # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # import sys from qpid.messaging import * if len(sys.argv)<2: broker = "localhost:5672" else: broker = sys.argv[1] connection = Connection(broker) try: connection.open() session = connection.session() receiver = session.receiver("qmf.default.direct/my-name; {node: {type: topic}}") sender = session.sender("qmf.default.direct/broker") request = {} request["_object_id"] = {"_object_name":"org.apache.qpid.broker:queue:my-queue"} request["_method_name"] = "purge" request["_arguments"] = {"request":1} msg = Message(reply_to=receiver.source,content=request) msg.properties["x-amqp-0-10.app-id"] = "qmf2" msg.properties["qmf.opcode"] = "_method_request" sender.send(msg); message = receiver.fetch() print message session.acknowledge() except MessagingError,m: print m connection.close()
--------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:[email protected]
