Author: aconway
Date: Thu May 14 15:14:32 2009
New Revision: 774809

URL: http://svn.apache.org/viewvc?rev=774809&view=rev
Log:
Fix for unpredictable enqueues by timer-triggered management code in a cluster.

ManagementAgent uses Broker::getClusterMessageHandler() (if non-0) to
enqueue timer-triggered messages. Cluster provides handler that enqueues
via cluster multicast.

Added:
    qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h   (with props)
Modified:
    qpid/trunk/qpid/cpp/src/Makefile.am
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
    qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h

Modified: qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/Makefile.am?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ qpid/trunk/qpid/cpp/src/Makefile.am Thu May 14 15:14:32 2009
@@ -391,6 +391,7 @@
   qpid/broker/Link.cpp \
   qpid/broker/LinkRegistry.cpp \
   qpid/broker/Message.cpp \
+  qpid/broker/MessageHandler.h \
   qpid/broker/MessageAdapter.cpp \
   qpid/broker/MessageBuilder.cpp \
   qpid/broker/MessageStoreModule.cpp \

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.cpp Thu May 14 15:14:32 2009
@@ -154,6 +154,7 @@
     queueEvents(poller),
     recovery(true),
     expiryPolicy(new ExpiryPolicy),
+    clusterMessageHandler(0),
     getKnownBrokers(boost::bind(&Broker::getKnownBrokersImpl, this))
 {
     if (conf.enableMgmt) {
@@ -264,7 +265,7 @@
         queueCleaner.start(conf.queueCleanInterval * qpid::sys::TIME_SEC);
     }
 
-    //initialize known broker urls (TODO: add support for urls for other 
transports (SSL, RDMA)):
+    // Initialize known broker urls (TODO: add support for urls SSL, RDMA, 
etc.)
     if (conf.knownHosts.empty()) {
         boost::shared_ptr<ProtocolFactory> factory = 
getProtocolFactory(TCP_TRANSPORT);
         if (factory) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/Broker.h Thu May 14 15:14:32 2009
@@ -1,5 +1,5 @@
-#ifndef _Broker_
-#define _Broker_
+#ifndef QPID_BROKER_BROKER_H
+#define QPID_BROKER_BROKER_H
 
 /*
  *
@@ -68,6 +68,7 @@
 namespace broker {
 
 class ExpiryPolicy;
+class MessageHandler;
 
 static const  uint16_t DEFAULT_PORT=5672;
 
@@ -143,6 +144,7 @@
     std::string federationTag;
     bool recovery;
     boost::intrusive_ptr<ExpiryPolicy> expiryPolicy;
+    MessageHandler* clusterMessageHandler;
 
   public:
 
@@ -236,10 +238,19 @@
     bool getRecovery() const { return recovery; }
 
     management::ManagementAgent* getManagementAgent() { return 
managementAgent; }
+
+    /** Handler to route messages to queues with replication.
+     * Required for messages that are generated in a way the cluster
+     * cannot predict, e.g. as a result of a timer firing.
+     * 
+     * @return 0 if not in a cluster.
+     */
+    MessageHandler* getClusterMessageHandler() { return clusterMessageHandler; 
}
+    void setClusterMessageHandler(MessageHandler& h) { clusterMessageHandler = 
&h; }
 };
 
 }}
             
 
 
-#endif  /*!_Broker_*/
+#endif  /*!QPID_BROKER_BROKER_H*/

Added: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h?rev=774809&view=auto
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h (added)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h Thu May 14 15:14:32 
2009
@@ -0,0 +1,43 @@
+#ifndef QPID_BROKER_MESSAGEHANDLER_H
+#define QPID_BROKER_MESSAGEHANDLER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace broker {
+
+class Message;
+
+/**
+ * Handler for messages.
+ */
+class MessageHandler
+{
+  public:
+    virtual ~MessageHandler() {}
+    virtual void handle(const boost::intrusive_ptr<Message>&) = 0;
+};
+}} // namespace qpid::broker
+
+#endif  /*!QPID_BROKER_MESSAGEHANDLER_H*/

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: qpid/trunk/qpid/cpp/src/qpid/broker/MessageHandler.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.cpp Thu May 14 15:14:32 2009
@@ -16,6 +16,74 @@
  *
  */
 
+/** CLUSTER IMPLEMENTATION OVERVIEW
+ *
+ * The cluster works on the principle that if all members of the
+ * cluster receive identical input, they will all produce identical
+ * results. cluster::Connections intercept data received from clients
+ * and multicast it via CPG. The data is processed (passed to the
+ * broker::Connection) only when it is received from CPG in cluster
+ * order. Each cluster member has Connection objects for directly
+ * connected clients and "shadow" Connection objects for connections
+ * to other members.
+ *
+ * This assumes that all broker actions occur deterministically in
+ * response to data arriving on client connections. There are two
+ * situations where this assumption fails:
+ *  - sending data in response to polling local connections for writabiliy.
+ *  - taking actions based on a timer or timestamp comparison.
+ *
+ * IMPORTANT NOTE: any time code is added to the broker that uses timers,
+ * the cluster may need to be updated to take account of this.
+ * 
+ *
+ * USE OF TIMESTAMPS IN THE BROKER
+ *  
+ * The following are the current areas where broker uses timers or timestamps:
+ * 
+ * - Producer flow control: broker::SemanticState uses 
connection::getClusterOrderOutput.
+ *   a FrameHandler that sends frames to the client via the cluster. Used by 
broker::SessionState
+ *   
+ * - QueueCleaner, Message TTL: uses ExpiryPolicy, which is implemented by 
cluster::ExpiryPolicy.
+ * 
+ * - Connection heartbeat: sends connection controls, not part of session 
command counting so OK to ignore.
+ * 
+ * - LinkRegistry: only cluster elder is ever active for links.
+ * 
+ * - management::ManagementBroker: uses MessageHandler supplied by  cluster
+ *   to send messages to the broker via the cluster.
+ *   
+ * - Dtx: not yet supported with cluster.  
+ *
+ * cluster::ExpiryPolicy implements the strategy for message expiry.
+ *
+ * CLUSTER PROTOCOL OVERVIEW
+ * 
+ * Messages sent to/from CPG are called Events.
+ *
+ * An Event carries a ConnectionId, which includes a MemberId and a
+ * connection number.
+ * 
+ * Events are either
+ *  - Connection events: non-0 connection number and are associated with a 
connection.
+ *  - Cluster Events: 0 connection number, are not associated with a connectin.
+ * 
+ * Events are further categorized as:
+ *  - Control: carries method frame(s) that affect cluster behavior.
+ *  - Data: carries raw data received from a client connection.
+ *
+ * The cluster defines extensions to the AMQP command set in 
../../../xml/cluster.xml
+ * which defines two classes:
+ *  - cluster: cluster control information.
+ *  - cluster.connection: control information for a specific connection.
+ *
+ * The following combinations are legal:
+ *  - Data frames carrying connection data.
+ *  - Cluster control events carrying cluster commands.
+ *  - Connection control events carrying cluster.connection commands.
+ *  - Connection control events carrying non-cluster frames: frames sent to 
the client.
+ *    e.g. flow-control frames generated on a timer.
+ */
 #include "Cluster.h"
 #include "ClusterSettings.h"
 #include "Connection.h"
@@ -30,6 +98,7 @@
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/QueueRegistry.h"
 #include "qpid/broker/SessionState.h"
+#include "qpid/framing/frame_functors.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AMQP_AllOperations.h"
 #include "qpid/framing/AllInvoker.h"
@@ -41,6 +110,15 @@
 #include "qpid/framing/ClusterShutdownBody.h"
 #include "qpid/framing/ClusterUpdateOfferBody.h"
 #include "qpid/framing/ClusterUpdateRequestBody.h"
+
+#include "qpid/framing/ConnectionStartOkBody.h"
+#include "qpid/framing/ConnectionTuneBody.h"
+#include "qpid/framing/ConnectionOpenBody.h"
+#include "qpid/framing/SessionAttachBody.h"
+#include "qpid/framing/SessionRequestTimeoutBody.h"
+#include "qpid/framing/SessionCommandPointBody.h"
+#include "qpid/framing/AMQP_ClientProxy.h"
+
 #include "qpid/log/Helpers.h"
 #include "qpid/log/Statement.h"
 #include "qpid/management/IdAllocator.h"
@@ -57,6 +135,7 @@
 #include <iterator>
 #include <map>
 #include <ostream>
+#include <sstream>
 
 namespace qpid {
 namespace cluster {
@@ -127,11 +206,10 @@
     // Failover exchange provides membership updates to clients.
     failoverExchange.reset(new FailoverExchange(this));
     broker.getExchanges().registerExchange(failoverExchange);
-
-    // Update exchange is used during updates to replicate messages without 
modifying delivery-properties.exchange.
     
broker.getExchanges().registerExchange(boost::shared_ptr<broker::Exchange>(new 
UpdateExchange(this)));
-
+    broker.setClusterMessageHandler(*this);
     if (settings.quorum) quorum.init();
+
     cpg.join(name);
     // pump the CPG dispatch manually till we get initialized. 
     while (!initialized)
@@ -666,6 +744,7 @@
 }
 
 void Cluster::errorCheck(const MemberId& , uint8_t type, uint64_t frameSeq, 
Lock&) {
+    if (state == LEFT) return;
     // If we receive an errorCheck here, it's because we  have processed past 
the point
     // of the error so respond with ERROR_TYPE_NONE
     assert(map.getFrameSeq() >= frameSeq);
@@ -674,4 +753,53 @@
             ClusterErrorCheckBody(ProtocolVersion(), 
framing::cluster::ERROR_TYPE_NONE, frameSeq), self);
 }
 
+size_t accumulateEncodedSize(size_t total, const AMQFrame& f) { return total + 
f.encodedSize(); }
+
+//
+// If the broker needs to send messages to itself in an
+// unpredictable context (e.g. management messages generated when
+// a timer expires) it uses "selfConnection"
+//
+// selfConnection behaves as a local client connection, with
+// respect to replication. However instead of mcasting data from a
+// client, data for the selfConnection is mcast directly from
+// Cluster::handle.
+// 
+void Cluster::handle(const boost::intrusive_ptr<broker::Message>& msg) {
+    // NOTE: don't take the lock here. We don't need to as mcast is thread 
safe,
+    // and locking here can cause deadlock with management locks.
+    // 
+
+    // Create self-connection on demand
+    if (selfConnection == ConnectionId()) {
+        QPID_LOG(debug, "Initialize self-connection");
+        ostringstream name;
+        name << "qpid.cluster-self." << self;
+        ConnectionPtr selfc = new Connection(*this, shadowOut, name.str(), 
self, false, false);
+        selfConnection = selfc->getId();
+        vector<AMQFrame> frames;
+        frames.push_back(AMQFrame((ConnectionStartOkBody())));
+        
frames.push_back(AMQFrame((ConnectionTuneBody(ProtocolVersion(),32767,65535,0,120))));
+        frames.push_back(AMQFrame((ConnectionOpenBody())));
+        frames.push_back(AMQFrame((SessionAttachBody(ProtocolVersion(), 
name.str(), false))));
+        frames.push_back(AMQFrame(SessionRequestTimeoutBody(ProtocolVersion(), 
0)));
+        frames.push_back(AMQFrame(SessionCommandPointBody(ProtocolVersion(), 
0, 0)));
+        size_t size = accumulate(frames.begin(), frames.end(), 0, 
accumulateEncodedSize);
+        vector<char> store(size);
+        Buffer buf(store.data(), size);
+        for_each(frames.begin(), frames.end(), boost::bind(&AMQFrame::encode, 
_1, boost::ref(buf)));
+        assert(buf.available() == 0);
+        selfc->decode(store.data(), size);         // Multicast
+    }
+
+    QPID_LOG(trace, "Message to self on " << selfConnection << ": " << 
*msg->getFrames().getMethod());
+    const FrameSet& frames = msg->getFrames();
+    size_t size = accumulate(frames.begin(), frames.end(), 0, 
accumulateEncodedSize);
+    Event e(DATA, selfConnection, size);
+    Buffer buf(e.getData(), e.getSize());
+    EncodeFrame encoder(buf);
+    msg->getFrames().map(encoder);
+    mcast.mcast(e);
+}
+
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cluster.h Thu May 14 15:14:32 2009
@@ -38,6 +38,7 @@
 #include "qmf/org/apache/qpid/cluster/Cluster.h"
 #include "qpid/Url.h"
 #include "qpid/broker/Broker.h"
+#include "qpid/broker/MessageHandler.h"
 #include "qpid/management/Manageable.h"
 #include "qpid/sys/Monitor.h"
 
@@ -64,7 +65,7 @@
 /**
  * Connection to the cluster
  */
-class Cluster : private Cpg::Handler, public management::Manageable {
+class Cluster : private Cpg::Handler, public management::Manageable, public 
broker::MessageHandler {
   public:
     typedef boost::intrusive_ptr<Connection> ConnectionPtr;
     typedef std::vector<ConnectionPtr> ConnectionVector;
@@ -113,6 +114,9 @@
     Decoder& getDecoder() { return decoder; }
 
     ExpiryPolicy& getExpiryPolicy() { return *expiryPolicy; }
+
+    // Called in timer threads by management to replicate messages.
+    void handle(const boost::intrusive_ptr<broker::Message>&);
     
   private:
     typedef sys::Monitor::ScopedLock Lock;
@@ -199,6 +203,7 @@
     const std::string name;
     Url myUrl;
     const MemberId self;
+    ConnectionId selfConnection;
     framing::Uuid clusterId;
     NoOpConnectionOutputHandler shadowOut;
     qpid::management::ManagementAgent* mAgent;

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/EventFrame.h Thu May 14 15:14:32 2009
@@ -43,6 +43,7 @@
 
     bool isCluster() const { return connectionId.getNumber() == 0; }
     bool isConnection() const { return connectionId.getNumber() != 0; }
+    bool isControl() const { return type == CONTROL; }
     bool isLastInEvent() const { return readCredit; }
     MemberId getMemberId() const { return connectionId.getMember(); }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.cpp Thu May 14 
15:14:32 2009
@@ -25,6 +25,7 @@
 #include "qpid/broker/DeliverableMessage.h"
 #include "qpid/log/Statement.h"
 #include <qpid/broker/Message.h>
+#include <qpid/broker/MessageHandler.h>
 #include "qpid/framing/MessageTransferBody.h"
 #include "qpid/sys/Time.h"
 #include "qpid/broker/ConnectionState.h"
@@ -264,9 +265,11 @@
 }
 
 void ManagementAgent::sendBuffer(Buffer&  buf,
-                                  uint32_t length,
-                                  qpid::broker::Exchange::shared_ptr exchange,
-                                  string   routingKey)
+                                 uint32_t length,
+                                 qpid::broker::Exchange::shared_ptr exchange,
+                                 string   routingKey,
+                                 bool isPredictable)
+
 {
     if (exchange.get() == 0)
         return;
@@ -286,14 +289,21 @@
     msg->getFrames().append(method);
     msg->getFrames().append(header);
 
-    MessageProperties* props =
-        msg->getFrames().getHeaders()->get<MessageProperties>(true);
+    DeliveryProperties* delivery = 
msg->getFrames().getHeaders()->get<DeliveryProperties>(true);
+    delivery->setRoutingKey(routingKey);
+
+    MessageProperties* props = 
msg->getFrames().getHeaders()->get<MessageProperties>(true);
     props->setContentLength(length);
     msg->getFrames().append(content);
 
-    DeliverableMessage deliverable (msg);
     try {
-        exchange->route(deliverable, routingKey, 0);
+        if (!isPredictable && broker->getClusterMessageHandler()) {
+            broker->getClusterMessageHandler()->handle(msg);
+        }
+        else {
+            DeliverableMessage deliverable (msg);
+            exchange->route(deliverable, routingKey, 0);
+        }
     } catch(exception&) {}
 }
 
@@ -347,7 +357,7 @@
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
             routingKey = "console.obj.1.0." + object->getPackageName() + "." + 
object->getClassName();
-            sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+            sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
         }
         
         if (object->hasInst() && (object->getInstChanged() || 
object->getForcePublish())) {
@@ -358,7 +368,7 @@
             contentSize = BUFSIZE - msgBuffer.available ();
             msgBuffer.reset ();
             routingKey = "console.obj.1.0." + object->getPackageName() + "." + 
object->getClassName();
-            sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+            sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
         }
 
         if (object->isDeleted())
@@ -387,7 +397,7 @@
         contentSize = BUFSIZE - msgBuffer.available ();
         msgBuffer.reset ();
         routingKey = "console.heartbeat.1.0";
-        sendBuffer (msgBuffer, contentSize, mExchange, routingKey);
+        sendBuffer (msgBuffer, contentSize, mExchange, routingKey, false);
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h?rev=774809&r1=774808&r2=774809&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/management/ManagementAgent.h Thu May 14 
15:14:32 2009
@@ -204,7 +204,8 @@
     void sendBuffer         (framing::Buffer&             buf,
                              uint32_t                     length,
                              qpid::broker::Exchange::shared_ptr exchange,
-                             std::string                  routingKey);
+                             std::string                  routingKey,
+                             bool isPredictable=true);
     void moveNewObjectsLH();
 
     bool authorizeAgentMessageLH(qpid::broker::Message& msg);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscr...@qpid.apache.org

Reply via email to