Author: aconway Date: Thu May 14 15:14:32 2009 New Revision: 774809 URL: http://svn.apache.org/viewvc?rev=774809&view=rev Log: Fix for unpredictable enqueues by timer-triggered management code in a cluster.
ManagementAgent uses Broker::getClusterMessageHandler() (if non-0) to enqueue timer-triggered messages. Cluster provides handler that enqueues via cluster multicast. Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h (with props) Modified: qpid/trunk/qpid/cpp/src/Makefile.am qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Modified: qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ qpid/trunk/qpid/cpp/src/Makefile.am Thu May 14 15:14:32 2009 @@ -391,6 +391,7 @@ qpid/broker/Link.cpp \ qpid/broker/LinkRegistry.cpp \ qpid/broker/Message.cpp \ + qpid/broker/MessageHandler.h \ qpid/broker/MessageAdapter.cpp \ qpid/broker/MessageBuilder.cpp \ qpid/broker/MessageStoreModule.cpp \ Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu May 14 15:14:32 2009 @@ -154,6 +154,7 @@ queueEvents(poller), recovery(true), expiryPolicy(new ExpiryPolicy), + clusterMessageHandler(0), getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this)) { if (conf.enableMgmt) { @@ -264,7 +265,7 @@ queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC); } - //initialize known broker urls (TODO: add support for urls for other transports (SSL, RDMA)): + // Initialize known broker urls (TODO: add support for urls SSL, RDMA, etc.) if (conf.knownHosts.empty()) { boost::shared_ptr<ProtocolFactory> factory = getProtocolFactory(TCP_TRANSPORT); if (factory) { Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu May 14 15:14:32 2009 @@ -1,5 +1,5 @@ -#ifndef _Broker_ -#define _Broker_ +#ifndef QPID_BROKER_BROKER_H +#define QPID_BROKER_BROKER_H /* * @@ -68,6 +68,7 @@ namespace broker { class ExpiryPolicy; +class MessageHandler; static const uint16_t DEFAULT_PORT=5672; @@ -143,6 +144,7 @@ std::string federationTag; bool recovery; boost::intrusive_ptr<ExpiryPolicy> expiryPolicy; + MessageHandler* clusterMessageHandler; public: @@ -236,10 +238,19 @@ bool getRecovery() const { return recovery; } management::ManagementAgent* getManagementAgent() { return managementAgent; } + + /** Handler to route messages to queues with replication. + * Required for messages that are generated in a way the cluster + * cannot predict, e.g. as a result of a timer firing. + * + * @return 0 if not in a cluster. + */ + MessageHandler* getClusterMessageHandler() { return clusterMessageHandler; } + void setClusterMessageHandler(MessageHandler& h) { clusterMessageHandler = &h; } }; }} -#endif /*!_Broker_*/ +#endif /*!QPID_BROKER_BROKER_H*/ Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h?rev=774809&view=auto ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h (added) +++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h Thu May 14 15:14:32 2009 @@ -0,0 +1,43 @@ +#ifndef QPID_BROKER_MESSAGEHANDLER_H +#define QPID_BROKER_MESSAGEHANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <boost/intrusive_ptr.hpp> + +namespace qpid { +namespace broker { + +class Message; + +/** + * Handler for messages. + */ +class MessageHandler +{ + public: + virtual ~MessageHandler() {} + virtual void handle(const boost::intrusive_ptr<Message>&) = 0; +}; +}} // namespace qpid::broker + +#endif /*!QPID_BROKER_MESSAGEHANDLER_H*/ Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu May 14 15:14:32 2009 @@ -16,6 +16,74 @@ * */ +/** CLUSTER IMPLEMENTATION OVERVIEW + * + * The cluster works on the principle that if all members of the + * cluster receive identical input, they will all produce identical + * results. cluster::Connections intercept data received from clients + * and multicast it via CPG. The data is processed (passed to the + * broker::Connection) only when it is received from CPG in cluster + * order. Each cluster member has Connection objects for directly + * connected clients and "shadow" Connection objects for connections + * to other members. + * + * This assumes that all broker actions occur deterministically in + * response to data arriving on client connections. There are two + * situations where this assumption fails: + * - sending data in response to polling local connections for writabiliy. + * - taking actions based on a timer or timestamp comparison. + * + * IMPORTANT NOTE: any time code is added to the broker that uses timers, + * the cluster may need to be updated to take account of this. + * + * + * USE OF TIMESTAMPS IN THE BROKER + * + * The following are the current areas where broker uses timers or timestamps: + * + * - Producer flow control: broker::SemanticState uses connection::getClusterOrderOutput. + * a FrameHandler that sends frames to the client via the cluster. Used by broker::SessionState + * + * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by cluster::ExpiryPolicy. + * + * - Connection heartbeat: sends connection controls, not part of session command counting so OK to ignore. + * + * - LinkRegistry: only cluster elder is ever active for links. + * + * - management::ManagementBroker: uses MessageHandler supplied by cluster + * to send messages to the broker via the cluster. + * + * - Dtx: not yet supported with cluster. + * + * cluster::ExpiryPolicy implements the strategy for message expiry. + * + * CLUSTER PROTOCOL OVERVIEW + * + * Messages sent to/from CPG are called Events. + * + * An Event carries a ConnectionId, which includes a MemberId and a + * connection number. + * + * Events are either + * - Connection events: non-0 connection number and are associated with a connection. + * - Cluster Events: 0 connection number, are not associated with a connectin. + * + * Events are further categorized as: + * - Control: carries method frame(s) that affect cluster behavior. + * - Data: carries raw data received from a client connection. + * + * The cluster defines extensions to the AMQP command set in ../../../xml/cluster.xml + * which defines two classes: + * - cluster: cluster control information. + * - cluster.connection: control information for a specific connection. + * + * The following combinations are legal: + * - Data frames carrying connection data. + * - Cluster control events carrying cluster commands. + * - Connection control events carrying cluster.connection commands. + * - Connection control events carrying non-cluster frames: frames sent to the client. + * e.g. flow-control frames generated on a timer. + */ #include "Cluster.h" #include "ClusterSettings.h" #include "Connection.h" @@ -30,6 +98,7 @@ #include "qpid/broker/Connection.h" #include "qpid/broker/QueueRegistry.h" #include "qpid/broker/SessionState.h" +#include "qpid/framing/frame_functors.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/AMQP_AllOperations.h" #include "qpid/framing/AllInvoker.h" @@ -41,6 +110,15 @@ #include "qpid/framing/ClusterShutdownBody.h" #include "qpid/framing/ClusterUpdateOfferBody.h" #include "qpid/framing/ClusterUpdateRequestBody.h" + +#include "qpid/framing/ConnectionStartOkBody.h" +#include "qpid/framing/ConnectionTuneBody.h" +#include "qpid/framing/ConnectionOpenBody.h" +#include "qpid/framing/SessionAttachBody.h" +#include "qpid/framing/SessionRequestTimeoutBody.h" +#include "qpid/framing/SessionCommandPointBody.h" +#include "qpid/framing/AMQP_ClientProxy.h" + #include "qpid/log/Helpers.h" #include "qpid/log/Statement.h" #include "qpid/management/IdAllocator.h" @@ -57,6 +135,7 @@ #include <iterator> #include <map> #include <ostream> +#include <sstream> namespace qpid { namespace cluster { @@ -127,11 +206,10 @@ // Failover exchange provides membership updates to clients. failoverExchange.reset(new FailoverExchange(this)); broker.getExchanges().registerExchange(failoverExchange); - - // Update exchange is used during updates to replicate messages without modifying delivery-properties.exchange. broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new UpdateExchange(this))); - + broker.setClusterMessageHandler(*this); if (settings.quorum) quorum.init(); + cpg.join(name); // pump the CPG dispatch manually till we get initialized. while (!initialized) @@ -666,6 +744,7 @@ } void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, Lock&) { + if (state == LEFT) return; // If we receive an errorCheck here, it's because we have processed past the point // of the error so respond with ERROR_TYPE_NONE assert(map.getFrameSeq() >= frameSeq); @@ -674,4 +753,53 @@ ClusterErrorCheckBody(ProtocolVersion(), framing::cluster::ERROR_TYPE_NONE, frameSeq), self); } +size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + f.encodedSize(); } + +// +// If the broker needs to send messages to itself in an +// unpredictable context (e.g. management messages generated when +// a timer expires) it uses "selfConnection" +// +// selfConnection behaves as a local client connection, with +// respect to replication. However instead of mcasting data from a +// client, data for the selfConnection is mcast directly from +// Cluster::handle. +// +void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) { + // NOTE: don't take the lock here. We don't need to as mcast is thread safe, + // and locking here can cause deadlock with management locks. + // + + // Create self-connection on demand + if (selfConnection == ConnectionId()) { + QPID_LOG(debug, "Initialize self-connection"); + ostringstream name; + name << "qpid.cluster-self." << self; + ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), self, false, false); + selfConnection = selfc->getId(); + vector<AMQFrame> frames; + frames.push_back(AMQFrame((ConnectionStartOkBody()))); + frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120)))); + frames.push_back(AMQFrame((ConnectionOpenBody()))); + frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), name.str(), false)))); + frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 0))); + frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 0, 0))); + size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); + vector<char> store(size); + Buffer buf(store.data(), size); + for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, _1, boost::ref(buf))); + assert(buf.available() == 0); + selfc->decode(store.data(), size); // Multicast + } + + QPID_LOG(trace, "Message to self on " << selfConnection << ": " << *msg->getFrames().getMethod()); + const FrameSet& frames = msg->getFrames(); + size_t size = accumulate(frames.begin(), frames.end(), 0, accumulateEncodedSize); + Event e(DATA, selfConnection, size); + Buffer buf(e.getData(), e.getSize()); + EncodeFrame encoder(buf); + msg->getFrames().map(encoder); + mcast.mcast(e); +} + }} // namespace qpid::cluster Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu May 14 15:14:32 2009 @@ -38,6 +38,7 @@ #include "qmf/org/apache/qpid/cluster/Cluster.h" #include "qpid/Url.h" #include "qpid/broker/Broker.h" +#include "qpid/broker/MessageHandler.h" #include "qpid/management/Manageable.h" #include "qpid/sys/Monitor.h" @@ -64,7 +65,7 @@ /** * Connection to the cluster */ -class Cluster : private Cpg::Handler, public management::Manageable { +class Cluster : private Cpg::Handler, public management::Manageable, public broker::MessageHandler { public: typedef boost::intrusive_ptr<Connection> ConnectionPtr; typedef std::vector<ConnectionPtr> ConnectionVector; @@ -113,6 +114,9 @@ Decoder& getDecoder() { return decoder; } ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; } + + // Called in timer threads by management to replicate messages. + void handle(const boost::intrusive_ptr<broker::Message>&); private: typedef sys::Monitor::ScopedLock Lock; @@ -199,6 +203,7 @@ const std::string name; Url myUrl; const MemberId self; + ConnectionId selfConnection; framing::Uuid clusterId; NoOpConnectionOutputHandler shadowOut; qpid::management::ManagementAgent* mAgent; Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Thu May 14 15:14:32 2009 @@ -43,6 +43,7 @@ bool isCluster() const { return connectionId.getNumber() == 0; } bool isConnection() const { return connectionId.getNumber() != 0; } + bool isControl() const { return type == CONTROL; } bool isLastInEvent() const { return readCredit; } MemberId getMemberId() const { return connectionId.getMember(); } Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu May 14 15:14:32 2009 @@ -25,6 +25,7 @@ #include "qpid/broker/DeliverableMessage.h" #include "qpid/log/Statement.h" #include <qpid/broker/Message.h> +#include <qpid/broker/MessageHandler.h> #include "qpid/framing/MessageTransferBody.h" #include "qpid/sys/Time.h" #include "qpid/broker/ConnectionState.h" @@ -264,9 +265,11 @@ } void ManagementAgent::sendBuffer(Buffer& buf, - uint32_t length, - qpid::broker::Exchange::shared_ptr exchange, - string routingKey) + uint32_t length, + qpid::broker::Exchange::shared_ptr exchange, + string routingKey, + bool isPredictable) + { if (exchange.get() == 0) return; @@ -286,14 +289,21 @@ msg->getFrames().append(method); msg->getFrames().append(header); - MessageProperties* props = - msg->getFrames().getHeaders()->get<MessageProperties>(true); + DeliveryProperties* delivery = msg->getFrames().getHeaders()->get<DeliveryProperties>(true); + delivery->setRoutingKey(routingKey); + + MessageProperties* props = msg->getFrames().getHeaders()->get<MessageProperties>(true); props->setContentLength(length); msg->getFrames().append(content); - DeliverableMessage deliverable (msg); try { - exchange->route(deliverable, routingKey, 0); + if (!isPredictable && broker->getClusterMessageHandler()) { + broker->getClusterMessageHandler()->handle(msg); + } + else { + DeliverableMessage deliverable (msg); + exchange->route(deliverable, routingKey, 0); + } } catch(exception&) {} } @@ -347,7 +357,7 @@ contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); } if (object->hasInst() && (object->getInstChanged() || object->getForcePublish())) { @@ -358,7 +368,7 @@ contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.obj.1.0." + object->getPackageName() + "." + object->getClassName(); - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); } if (object->isDeleted()) @@ -387,7 +397,7 @@ contentSize = BUFSIZE - msgBuffer.available (); msgBuffer.reset (); routingKey = "console.heartbeat.1.0"; - sendBuffer (msgBuffer, contentSize, mExchange, routingKey); + sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false); } } Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=774809&r1=774808&r2=774809&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original) +++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu May 14 15:14:32 2009 @@ -204,7 +204,8 @@ void sendBuffer (framing::Buffer& buf, uint32_t length, qpid::broker::Exchange::shared_ptr exchange, - std::string routingKey); + std::string routingKey, + bool isPredictable=true); void moveNewObjectsLH(); bool authorizeAgentMessageLH(qpid::broker::Message& msg); --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org