Author: aconway
Date: Fri Oct 31 09:56:04 2008
New Revision: 709474
URL: http://svn.apache.org/viewvc?rev=709474&view=rev
Log:
Cluster returns connection exception for un-supported AMQP features.
Modified:
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Oct 31
09:56:04 2008
@@ -24,8 +24,10 @@
#include "qpid/broker/SessionState.h"
#include "qpid/broker/SemanticState.h"
+#include "qpid/framing/enum.h"
#include "qpid/framing/AMQFrame.h"
#include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/DeliveryProperties.h"
#include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
#include "qpid/framing/ConnectionCloseBody.h"
#include "qpid/framing/ConnectionCloseOkBody.h"
@@ -98,14 +100,33 @@
}
}
+bool Connection::checkUnsupported(const AMQBody& body) {
+ std::string message;
+ if (body.getMethod()) {
+ switch (body.getMethod()->amqpClassId()) {
+ case TX_CLASS_ID: message = "TX transactions are not currently
supported by cluster."; break;
+ case DTX_CLASS_ID: message = "DTX transactions are not currently
supported by cluster."; break;
+ }
+ }
+ else if (body.type() == HEADER_BODY) {
+ const DeliveryProperties* dp = static_cast<const
AMQHeaderBody&>(body).get<DeliveryProperties>();
+ if (dp && dp->getTtl()) message = "Message TTL is not currently
supported by cluster.";
+ }
+ if (!message.empty())
+ connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+ return !message.empty();
+}
+
// Delivered from cluster.
void Connection::delivered(framing::AMQFrame& f) {
QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
assert(!catchUp);
- // Handle connection controls, deliver other frames to connection.
- currentChannel = f.getChannel();
- if (!framing::invoke(*this, *f.getBody()).wasHandled())
- connection.received(f);
+ currentChannel = f.getChannel();
+ if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection
contol.
+ && !checkUnsupported(*f.getBody())) // Unsupported operation.
+ {
+ connection.received(f); // Pass to broker connection.
+ }
}
void Connection::closed() {
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Oct 31
09:56:04 2008
@@ -121,6 +121,7 @@
private:
bool catcUp;
+ bool checkUnsupported(const framing::AMQBody& body);
void deliverClose();
void deliverDoOutput(uint32_t requested);
void sendDoOutput();
Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Oct 31
09:56:04 2008
@@ -220,6 +220,20 @@
uint16_t channel;
};
+QPID_AUTO_TEST_CASE(testUnsupported) {
+ ScopedSuppressLogging sl;
+ ClusterFixture cluster(1);
+ Client c0(cluster[0], "c0");
+ BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
+ BOOST_CHECK(!c0.connection.isOpen());
+ Client c1(cluster[0], "c1");
+ BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);
+ Client c2(cluster[0], "c2");
+ Message m;
+ m.getDeliveryProperties().setTtl(1);
+ BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);
+}
+
QPID_AUTO_TEST_CASE(testUnacked) {
// Verify replication of unacknowledged messages.
ClusterFixture cluster(1);