Author: aconway
Date: Wed Nov 5 07:22:47 2008
New Revision: 711587
URL: http://svn.apache.org/viewvc?rev=711587&view=rev
Log:
Cluster: replicate transaction state to newcomers.
constants.rb: generate type code constants for AMQP types. Useful with Array.
framing/Array:
- added some std:::vector like functions & typedefs.
- use TypeCode enums, human readable ostream << operator.
rubygen - fixed error in generation of exceptions for bad codes.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
Modified: incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb (original)
+++ incubator/qpid/trunk/qpid/cpp/rubygen/framing.0-10/constants.rb Wed Nov 5
07:22:47 2008
@@ -152,7 +152,7 @@
assign = "holder = new #{c.name.caps}Exception(text); " unless
c.name == "normal"
genl "case #{c.value}: #{assign}break;"
}
- genl " holder = new #{invalid}(QPID_MSG(\"Bad exception code: \" <<
code << \": \" << text));"
+ genl "default: holder = new #{invalid}(QPID_MSG(\"Bad
#{enum.parent.name}: \" << code << \": \" << text));"
}
genl "return holder;"
}
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Wed Nov 5 07:22:47 2008
@@ -506,6 +506,7 @@
qpid/broker/TxAccept.h \
qpid/broker/TxBuffer.h \
qpid/broker/TxOp.h \
+ qpid/broker/TxOpVisitor.h \
qpid/broker/TxPublish.h \
qpid/broker/Vhost.h \
qpid/client/AckMode.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Wed Nov 5 07:22:47
2008
@@ -39,6 +39,7 @@
virtual void commit() throw();
virtual void rollback() throw();
virtual ~DtxAck(){}
+ virtual void accept(TxOpConstVisitor& visitor) const {
visitor(*this); }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredDequeue.h Wed Nov 5
07:22:47 2008
@@ -45,6 +45,10 @@
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredDequeue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const {
visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/RecoveredEnqueue.h Wed Nov 5
07:22:47 2008
@@ -45,6 +45,11 @@
virtual void commit() throw();
virtual void rollback() throw();
virtual ~RecoveredEnqueue(){}
+ virtual void accept(TxOpConstVisitor& visitor) const {
visitor(*this); }
+
+ Queue::shared_ptr getQueue() const { return queue; }
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Nov 5
07:22:47 2008
@@ -436,7 +436,7 @@
if(requeue){
//take copy and clear unacked as requeue may result in redelivery to
this session
//which will in turn result in additions to unacked
- std::list<DeliveryRecord> copy = unacked;
+ DeliveryRecords copy = unacked;
unacked.clear();
for_each(copy.rbegin(), copy.rend(),
mem_fun_ref(&DeliveryRecord::requeue));
}else{
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Wed Nov 5
07:22:47 2008
@@ -134,7 +134,7 @@
DeliveryAdapter& deliveryAdapter;
ConsumerImplMap consumers;
NameGenerator tagGenerator;
- std::list<DeliveryRecord> unacked;
+ DeliveryRecords unacked;
TxBuffer::shared_ptr txBuffer;
DtxBuffer::shared_ptr dtxBuffer;
bool dtxSelected;
@@ -216,8 +216,11 @@
static ConsumerImpl* castToConsumerImpl(OutputTask* p) { return
boost::polymorphic_downcast<ConsumerImpl*>(p); }
template <class F> void eachConsumer(F f) {
outputTasks.eachOutput(boost::bind(f, boost::bind(castToConsumerImpl, _1))); }
- template <class F> void eachUnacked(F f) { std::for_each(unacked.begin(),
unacked.end(), f); }
-
+ DeliveryRecords& getUnacked() { return unacked; }
+ framing::SequenceSet getAccumulatedAck() const { return accumulatedAck; }
+ TxBuffer::shared_ptr getTxBuffer() const { return txBuffer; }
+ void setTxBuffer(const TxBuffer::shared_ptr& txb) { txBuffer = txb; }
+ void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck =
s; }
void record(const DeliveryRecord& delivery);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.cpp Wed Nov 5
07:22:47 2008
@@ -69,7 +69,7 @@
}
}
-TxAccept::TxAccept(SequenceSet& _acked, std::list<DeliveryRecord>& _unacked) :
+TxAccept::TxAccept(const SequenceSet& _acked, DeliveryRecords& _unacked) :
acked(_acked), unacked(_unacked), ops(unacked)
{
//populate the ops
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxAccept.h Wed Nov 5
07:22:47 2008
@@ -56,8 +56,8 @@
void commit();
};
- framing::SequenceSet& acked;
- std::list<DeliveryRecord>& unacked;
+ framing::SequenceSet acked;
+ DeliveryRecords& unacked;
RangeOps ops;
public:
@@ -66,11 +66,15 @@
* acks received
* @param unacked the record of delivered messages
*/
- TxAccept(framing::SequenceSet& acked, std::list<DeliveryRecord>&
unacked);
+ TxAccept(const framing::SequenceSet& acked, DeliveryRecords&
unacked);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
virtual ~TxAccept(){}
+ virtual void accept(TxOpConstVisitor& visitor) const {
visitor(*this); }
+
+ // Used by cluster replication.
+ const framing::SequenceSet& getAcked() const { return acked; }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.cpp Wed Nov 5
07:22:47 2008
@@ -22,6 +22,7 @@
#include "qpid/log/Statement.h"
#include <boost/mem_fn.hpp>
+#include <boost/bind.hpp>
using boost::mem_fn;
using namespace qpid::broker;
@@ -73,3 +74,7 @@
}
return false;
}
+
+void TxBuffer::accept(TxOpConstVisitor& v) const {
+ std::for_each(ops.begin(), ops.end(), boost::bind(&TxOp::accept, _1,
boost::ref(v)));
+}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxBuffer.h Wed Nov 5
07:22:47 2008
@@ -107,6 +107,9 @@
* commit
*/
bool commitLocal(TransactionalStore* const store);
+
+ // Used by cluster to replicate transaction status.
+ void accept(TxOpConstVisitor& v) const;
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOp.h Wed Nov 5 07:22:47
2008
@@ -21,11 +21,15 @@
#ifndef _TxOp_
#define _TxOp_
+#include "TxOpVisitor.h"
#include "TransactionalStore.h"
#include <boost/shared_ptr.hpp>
namespace qpid {
namespace broker {
+
+class TxOpConstVisitor;
+
class TxOp{
public:
typedef boost::shared_ptr<TxOp> shared_ptr;
@@ -34,9 +38,11 @@
virtual void commit() throw() = 0;
virtual void rollback() throw() = 0;
virtual ~TxOp(){}
+
+ virtual void accept(TxOpConstVisitor&) const = 0;
};
- }
-}
+
+}} // namespace qpid::broker
#endif
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h?rev=711587&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h Wed Nov 5
07:22:47 2008
@@ -0,0 +1,100 @@
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
+#ifndef QPID_BROKER_TXOPVISITOR_H
+#define QPID_BROKER_TXOPVISITOR_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/shared_ptr.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxAck;
+class RecoveredDequeue;
+class RecoveredEnqueue;
+class TxAccept;
+class TxPublish;
+
+/**
+ * Visitor for TxOp familly of classes.
+ */
+struct TxOpConstVisitor
+{
+ virtual ~TxOpConstVisitor() {}
+ virtual void operator()(const DtxAck&) = 0;
+ virtual void operator()(const RecoveredDequeue&) = 0;
+ virtual void operator()(const RecoveredEnqueue&) = 0;
+ virtual void operator()(const TxAccept&) = 0;
+ virtual void operator()(const TxPublish&) = 0;
+};
+
+}} // namespace qpid::broker
+
+#endif /*!QPID_BROKER_TXOPVISITOR_H*/
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxOpVisitor.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/TxPublish.h Wed Nov 5
07:22:47 2008
@@ -75,8 +75,12 @@
virtual void deliverTo(const boost::shared_ptr<Queue>& queue);
virtual ~TxPublish(){}
+ virtual void accept(TxOpConstVisitor& visitor) const {
visitor(*this); }
uint64_t contentSize();
+
+ boost::intrusive_ptr<Message> getMessage() const { return msg; }
+ const std::list<Queue::shared_ptr> getQueues() const { return
queues; }
};
}
}
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/ClusterPlugin.cpp Wed Nov 5
07:22:47 2008
@@ -90,7 +90,4 @@
static ClusterPlugin instance; // Static initialization.
-// For test purposes.
-Cluster& getGlobalCluster() { assert(instance.cluster); return
*instance.cluster; }
-
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Wed Nov 5
07:22:47 2008
@@ -24,7 +24,11 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/broker/TxBuffer.h"
#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/RecoveredEnqueue.h"
+#include "qpid/broker/RecoveredDequeue.h"
#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
@@ -36,7 +40,7 @@
#include <boost/current_function.hpp>
-// FIXME aconway 2008-11-03:
+// TODO aconway 2008-11-03:
//
// Disproportionate amount of code here is dedicated to receiving a
// brain-dump when joining a cluster and building initial
@@ -113,7 +117,6 @@
std::string message;
if (body.getMethod()) {
switch (body.getMethod()->amqpClassId()) {
- case TX_CLASS_ID: message = "TX transactions are not currently
supported by cluster."; break;
case DTX_CLASS_ID: message = "DTX transactions are not currently
supported by cluster."; break;
}
}
@@ -122,13 +125,13 @@
if (dp && dp->getTtl()) message = "Message TTL is not currently
supported by cluster.";
}
if (!message.empty())
- connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ connection.close(connection::CLOSE_CODE_FRAMING_ERROR, message, 0, 0);
return !message.empty();
}
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
- QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
+ QPID_LOG(trace, cluster << " RECV: " << *this << ": " << f);
assert(!catchUp);
currentChannel = f.getChannel();
if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection
contol.
@@ -247,11 +250,15 @@
return self.first == cluster.getId() && self.second == 0;
}
+
+shared_ptr<broker::Queue> Connection::findQueue(const std::string& qname) {
+ shared_ptr<broker::Queue> queue =
cluster.getBroker().getQueues().find(qname);
+ if (!queue) throw Exception(QPID_MSG(cluster << " can't find queue " <<
qname));
+ return queue;
+}
+
broker::QueuedMessage Connection::getDumpMessage() {
- // Get a message from the DUMP queue.
- broker::Queue::shared_ptr dumpQueue =
cluster.getBroker().getQueues().find(DumpClient::DUMP);
- if (!dumpQueue) throw Exception(QPID_MSG(cluster << " missing dump
queue"));
- broker::QueuedMessage m = dumpQueue->get();
+ broker::QueuedMessage m = findQueue(DumpClient::DUMP)->get();
if (!m.payload) throw Exception(QPID_MSG(cluster << " empty dump queue"));
return m;
}
@@ -267,14 +274,11 @@
bool ended,
bool windowing)
{
- broker::Queue::shared_ptr queue =
cluster.getBroker().getQueues().find(qname);
- if (!queue) throw Exception(QPID_MSG(cluster << " bad deliveryRecord queue
" << qname));
broker::QueuedMessage m;
+ broker::Queue::shared_ptr queue = findQueue(qname);
if (!ended) { // Has a message
- if (acquired) { // Message at front of dump queue
- broker::Queue::shared_ptr dumpQueue =
cluster.getBroker().getQueues().find(DumpClient::DUMP);
- m = dumpQueue->get();
- }
+ if (acquired) // Message is on the dump queue
+ m = getDumpMessage();
else // Message at original position in original
queue
m = queue->find(position);
if (!m.payload)
@@ -286,8 +290,7 @@
if (cancelled) dr.cancel(dr.getTag());
if (completed) dr.complete();
if (ended) dr.setEnded(); // Exsitance of message
-
- semanticState().record(dr);
+ semanticState().record(dr); // Part of the session's unacked list.
}
void Connection::queuePosition(const string& qname, const SequenceNumber&
position) {
@@ -304,6 +307,36 @@
return o << c.getId() << "(" << type << (c.isCatchUp() ? ",catchup" : "")
<< ")";
}
+void Connection::txStart() {
+ txBuffer = make_shared_ptr(new broker::TxBuffer());
+}
+void Connection::txAccept(const framing::SequenceSet& acked) {
+ txBuffer->enlist(make_shared_ptr(new broker::TxAccept(acked,
semanticState().getUnacked())));
+}
+
+void Connection::txDequeue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new
broker::RecoveredDequeue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txEnqueue(const std::string& queue) {
+ txBuffer->enlist(make_shared_ptr(new
broker::RecoveredEnqueue(findQueue(queue), getDumpMessage().payload)));
+}
+
+void Connection::txPublish(const framing::Array& queues, bool delivered) {
+ boost::shared_ptr<broker::TxPublish> txPub(new
broker::TxPublish(getDumpMessage().payload));
+ for (framing::Array::const_iterator i = queues.begin(); i != queues.end();
++i)
+ txPub->deliverTo(findQueue((*i)->get<std::string>()));
+ txPub->delivered = delivered;
+ txBuffer->enlist(txPub);
+}
+
+void Connection::txEnd() {
+ semanticState().setTxBuffer(txBuffer);
+}
+
+void Connection::accumulatedAck(const qpid::framing::SequenceSet& s) {
+ semanticState().setAccumulatedAck(s);
+}
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Wed Nov 5
07:22:47 2008
@@ -125,12 +125,21 @@
void queuePosition(const std::string&, const framing::SequenceNumber&);
+ void txStart();
+ void txAccept(const framing::SequenceSet&);
+ void txDequeue(const std::string&);
+ void txEnqueue(const std::string&);
+ void txPublish(const qpid::framing::Array&, bool);
+ void txEnd();
+ void accumulatedAck(const qpid::framing::SequenceSet&);
+
private:
bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
+ boost::shared_ptr<broker::Queue> findQueue(const std::string& qname);
broker::SessionState& sessionState();
broker::SemanticState& semanticState();
broker::QueuedMessage getDumpMessage();
@@ -148,6 +157,7 @@
framing::SequenceNumber mcastSeq;
framing::SequenceNumber deliverSeq;
framing::ChannelId currentChannel;
+ boost::shared_ptr<broker::TxBuffer> txBuffer;
friend std::ostream& operator<<(std::ostream&, const Connection&);
};
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.cpp Wed Nov 5
07:22:47 2008
@@ -32,6 +32,12 @@
#include "qpid/broker/ExchangeRegistry.h"
#include "qpid/broker/SessionHandler.h"
#include "qpid/broker/SessionState.h"
+#include "qpid/broker/TxOpVisitor.h"
+#include "qpid/broker/DtxAck.h"
+#include "qpid/broker/TxAccept.h"
+#include "qpid/broker/TxPublish.h"
+#include "qpid/broker/RecoveredDequeue.h"
+#include "qpid/broker/RecoveredEnqueue.h"
#include "qpid/framing/MessageTransferBody.h"
#include "qpid/framing/ClusterConnectionMembershipBody.h"
#include "qpid/framing/ClusterConnectionShadowReadyBody.h"
@@ -43,7 +49,7 @@
#include "qpid/log/Statement.h"
#include "qpid/Url.h"
#include <boost/bind.hpp>
-
+#include <algorithm>
namespace qpid {
namespace cluster {
@@ -198,7 +204,7 @@
shadowConnection = catchUpConnection();
broker::Connection& bc = dumpConnection->getBrokerConnection();
- // FIXME aconway 2008-10-20: What authentication info to reconnect?
+ // FIXME aconway 2008-10-20: What authentication info to use on reconnect?
shadowConnection.open(dumpeeUrl, bc.getUserId(), ""/*password*/,
"/"/*vhost*/, bc.getFrameMax());
bc.eachSessionHandler(boost::bind(&DumpClient::dumpSession, this, _1));
ClusterConnectionProxy(shadowConnection).shadowReady(
@@ -227,7 +233,10 @@
ss->getSemanticState().eachConsumer(std::bind1st(std::mem_fun(&DumpClient::dumpConsumer),this));
QPID_LOG(debug, dumperId << " dumping unacknowledged messages.");
- ss->getSemanticState().eachUnacked(boost::bind(&DumpClient::dumpUnacked,
this, _1));
+ broker::DeliveryRecords& drs = ss->getSemanticState().getUnacked();
+ std::for_each(drs.begin(), drs.end(),
boost::bind(&DumpClient::dumpUnacked, this, _1));
+
+ dumpTxState(ss->getSemanticState()); // Tx transaction state.
// Adjust for command counter for message in progress, will be sent after
state update.
boost::intrusive_ptr<Message> inProgress = ss->getMessageInProgress();
@@ -283,22 +292,12 @@
}
void DumpClient::dumpUnacked(const broker::DeliveryRecord& dr) {
- dumpDeliveryRecordMessage(dr);
- dumpDeliveryRecord(dr);
-}
-
-void DumpClient::dumpDeliveryRecordMessage(const broker::DeliveryRecord& dr) {
- // Dump the message associated with a dr if need be.
if (!dr.isEnded() && dr.isAcquired() && dr.getMessage().payload) {
// If the message is acquired then it is no longer on the
// dumpees queue, put it on the dump queue for dumpee to pick up.
//
MessageDumper(DUMP, shadowSession).dumpQueuedMessage(dr.getMessage());
}
-}
-
-void DumpClient::dumpDeliveryRecord(const broker::DeliveryRecord& dr) {
- // Assumes the associated message has already been dumped (if needed)
ClusterConnectionProxy(shadowSession).deliveryRecord(
dr.getQueue()->getName(),
dr.getMessage().position,
@@ -312,4 +311,56 @@
dr.isWindowing());
}
+class TxOpDumper : public broker::TxOpConstVisitor, public MessageDumper {
+ public:
+ TxOpDumper(DumpClient& dc, client::AsyncSession s)
+ : MessageDumper(DumpClient::DUMP, s), parent(dc), session(s), proxy(s)
{}
+
+ void operator()(const broker::DtxAck& ) {
+ throw InternalErrorException("DTX transactions not currently supported
by cluster.");
+ }
+
+ void operator()(const broker::RecoveredDequeue& rdeq) {
+ dumpMessage(rdeq.getMessage());
+ proxy.txEnqueue(rdeq.getQueue()->getName());
+ }
+
+ void operator()(const broker::RecoveredEnqueue& renq) {
+ dumpMessage(renq.getMessage());
+ proxy.txEnqueue(renq.getQueue()->getName());
+ }
+
+ void operator()(const broker::TxAccept& txAccept) {
+ proxy.txAccept(txAccept.getAcked());
+ }
+
+ void operator()(const broker::TxPublish& txPub) {
+ dumpMessage(txPub.getMessage());
+ typedef std::list<Queue::shared_ptr> QueueList;
+ const QueueList& qlist = txPub.getQueues();
+ Array qarray(TYPE_CODE_STR8);
+ for (QueueList::const_iterator i = qlist.begin(); i != qlist.end();
++i)
+ qarray.push_back(Array::ValuePtr(new Str8Value((*i)->getName())));
+ proxy.txPublish(qarray, txPub.delivered);
+ }
+
+ private:
+ DumpClient& parent;
+ client::AsyncSession session;
+ ClusterConnectionProxy proxy;
+};
+
+void DumpClient::dumpTxState(broker::SemanticState& s) {
+ QPID_LOG(debug, dumperId << " dumping TX transaction state.");
+ ClusterConnectionProxy proxy(shadowSession);
+ proxy.accumulatedAck(s.getAccumulatedAck());
+ broker::TxBuffer::shared_ptr txBuffer = s.getTxBuffer();
+ if (txBuffer) {
+ proxy.txStart();
+ TxOpDumper dumper(*this, shadowSession);
+ txBuffer->accept(dumper);
+ proxy.txEnd();
+ }
+}
+
}} // namespace qpid::cluster
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/DumpClient.h Wed Nov 5
07:22:47 2008
@@ -71,6 +71,8 @@
void dump();
void run(); // Will delete this when finished.
+ void dumpUnacked(const broker::DeliveryRecord&);
+
private:
void dumpQueue(const boost::shared_ptr<broker::Queue>&);
void dumpExchange(const boost::shared_ptr<broker::Exchange>&);
@@ -79,10 +81,8 @@
void dumpBinding(const std::string& queue, const broker::QueueBinding&
binding);
void dumpConnection(const boost::intrusive_ptr<Connection>& connection);
void dumpSession(broker::SessionHandler& s);
+ void dumpTxState(broker::SemanticState& s);
void dumpConsumer(const broker::SemanticState::ConsumerImpl*);
- void dumpUnacked(const broker::DeliveryRecord&);
- void dumpDeliveryRecord(const broker::DeliveryRecord&);
- void dumpDeliveryRecordMessage(const broker::DeliveryRecord&);
MemberId dumperId;
MemberId dumpeeId;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/framing/SequenceSet.h Wed Nov 5
07:22:47 2008
@@ -31,9 +31,9 @@
class SequenceSet : public RangeSet<SequenceNumber> {
public:
SequenceSet() {}
- explicit SequenceSet(const RangeSet<SequenceNumber>& r)
+ SequenceSet(const RangeSet<SequenceNumber>& r)
: RangeSet<SequenceNumber>(r) {}
- explicit SequenceSet(const SequenceNumber& s) { add(s); }
+ SequenceSet(const SequenceNumber& s) { add(s); }
SequenceSet(const SequenceNumber& start, const SequenceNumber finish) {
add(start,finish); }
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TxMocks.h Wed Nov 5 07:22:47 2008
@@ -114,6 +114,9 @@
void check(){
assertEqualVector(expected, actual);
}
+
+ void accept(TxOpConstVisitor&) const {}
+
~MockTxOp(){}
};
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Wed Nov 5
07:22:47 2008
@@ -45,13 +45,6 @@
#include <algorithm>
#include <iterator>
-namespace qpid {
-namespace cluster {
-// FIXME aconway 2008-11-04: remove.
-Cluster& getGlobalCluster(); // Defined in qpid/cluster/ClusterPlugin.cpp
-}} // namespace qpid::cluster
-
-
namespace std { // ostream operators in std:: namespace
template <class T>
ostream& operator<<(ostream& o, const std::set<T>& s) { return seqPrint(o, s);
}
@@ -69,7 +62,6 @@
using qpid::broker::Broker;
using boost::shared_ptr;
using qpid::cluster::Cluster;
-using qpid::cluster::getGlobalCluster;
/** Parse broker & cluster options */
Broker::Options parseOpts(size_t argc, const char* argv[]) {
@@ -216,8 +208,19 @@
uint16_t channel;
};
-QPID_AUTO_TEST_CASE_EXPECTED_FAILURES(testTxTransaction, 1) {
- ClusterFixture cluster(1, 1); // FIXME aconway 2008-11-04: local broker at
index 1
+QPID_AUTO_TEST_CASE(testUnsupported) {
+ ScopedSuppressLogging sl;
+ ClusterFixture cluster(1);
+ Client c1(cluster[0], "c1");
+ BOOST_CHECK_THROW(c1.session.dtxSelect(), FramingErrorException);
+ Client c2(cluster[0], "c2");
+ Message m;
+ m.getDeliveryProperties().setTtl(1);
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
+}
+
+QPID_AUTO_TEST_CASE(testTxTransaction) {
+ ClusterFixture cluster(1);
Client c0(cluster[0], "c0");
c0.session.queueDeclare(arg::queue="q");
c0.session.messageTransfer(arg::content=Message("A", "q"));
@@ -236,7 +239,8 @@
SubscriptionManager rollbackSubs(rollbackSession);
rollbackSession.txSelect();
rollbackSession.messageTransfer(arg::content=Message("1", "q"));
- BOOST_CHECK_EQUAL(rollbackSubs.get("q", TIME_SEC).getData(), "B");
+ Message rollbackMessage = rollbackSubs.get("q", TIME_SEC);
+ BOOST_CHECK_EQUAL(rollbackMessage.getData(), "B");
BOOST_CHECK_EQUAL(c0.session.queueQuery("q").getMessageCount(), 0u);
// Add new member mid transaction.
@@ -250,10 +254,14 @@
rollbackSession.messageTransfer(arg::content=Message("3", "q"));
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 0u);
+
// Commit/roll back.
commitSession.txCommit();
rollbackSession.txRollback();
- // Verify queue status: just the comitted messages
+ rollbackSession.messageRelease(rollbackMessage.getId());
+
+
+ // Verify queue status: just the comitted messages and dequeues should
remain.
BOOST_CHECK_EQUAL(c1.session.queueQuery("q").getMessageCount(), 4u);
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "B");
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "a");
@@ -261,20 +269,6 @@
BOOST_CHECK_EQUAL(c1.subs.get("q", TIME_SEC).getData(), "c");
}
-QPID_AUTO_TEST_CASE(testUnsupported) {
- ScopedSuppressLogging sl;
- ClusterFixture cluster(1);
- Client c0(cluster[0], "c0");
- BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
- BOOST_CHECK(!c0.connection.isOpen());
- Client c1(cluster[0], "c1");
- BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);
- Client c2(cluster[0], "c2");
- Message m;
- m.getDeliveryProperties().setTtl(1);
- BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
-}
-
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
ClusterFixture cluster(1);
@@ -388,8 +382,7 @@
Client c1(cluster[1], "c1");
BOOST_CHECK(c1.subs.get(m, "q", TIME_SEC));
BOOST_CHECK_EQUAL(m.getData(), "abcd");
-
- BOOST_CHECK_EQUAL(2u, getGlobalCluster().getUrls().size());
+ BOOST_CHECK_EQUAL(2u, knownBrokerPorts(c1.connection).size());
}
QPID_AUTO_TEST_CASE(testConnectionKnownHosts) {
Modified: incubator/qpid/trunk/qpid/cpp/xml/cluster.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=711587&r1=711586&r2=711587&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ incubator/qpid/trunk/qpid/cpp/xml/cluster.xml Wed Nov 5 07:22:47 2008
@@ -96,6 +96,18 @@
<field name="windowing" type="bit"/>
</control>
+ <!-- Tx transaction state. -->
+ <control name="tx-start" code="0x12"/>
+ <control name="tx-accept" code="0x13"> <field name="commands"
type="sequence-set"/> </control>
+ <control name="tx-dequeue" code="0x14"> <field name="queue" type="str8"/>
</control>
+ <control name="tx-enqueue" code="0x15"> <field name="queue" type="str8"/>
</control>
+ <control name="tx-publish" code="0x16">
+ <field name="queues" type="array"/> <!--Array of str8 -->
+ <field name="delivered" type="bit"/>
+ </control>
+ <control name="tx-end" code="0x17"/>
+ <control name="accumulated-ack" code="0x18"> <field name="commands"
type="sequence-set"/> </control>
+
<!-- Complete a session state dump. -->
<control name="session-state" code="0x1F" label="Set session state during
a brain dump.">
<!-- Target session deduced from channel number. -->