Author: aconway
Date: Fri Oct 31 09:56:04 2008
New Revision: 709474

URL: http://svn.apache.org/viewvc?rev=709474&view=rev
Log:
Cluster returns connection exception for un-supported AMQP features.

Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Fri Oct 31 
09:56:04 2008
@@ -24,8 +24,10 @@
 
 #include "qpid/broker/SessionState.h"
 #include "qpid/broker/SemanticState.h"
+#include "qpid/framing/enum.h"
 #include "qpid/framing/AMQFrame.h"
 #include "qpid/framing/AllInvoker.h"
+#include "qpid/framing/DeliveryProperties.h"
 #include "qpid/framing/ClusterConnectionDeliverCloseBody.h"
 #include "qpid/framing/ConnectionCloseBody.h"
 #include "qpid/framing/ConnectionCloseOkBody.h"
@@ -98,14 +100,33 @@
     }
 }
 
+bool Connection::checkUnsupported(const AMQBody& body) {
+    std::string message;
+    if (body.getMethod()) {
+        switch (body.getMethod()->amqpClassId()) {
+          case TX_CLASS_ID: message = "TX transactions are not currently 
supported by cluster."; break;
+          case DTX_CLASS_ID: message = "DTX transactions are not currently 
supported by cluster."; break;
+        }
+    }
+    else if (body.type() == HEADER_BODY) {
+        const DeliveryProperties* dp = static_cast<const 
AMQHeaderBody&>(body).get<DeliveryProperties>();
+        if (dp && dp->getTtl()) message = "Message TTL is not currently 
supported by cluster.";
+    }
+    if (!message.empty())
+        connection.close(execution::ERROR_CODE_INTERNAL_ERROR, message, 0, 0);
+    return !message.empty();
+}
+
 // Delivered from cluster.
 void Connection::delivered(framing::AMQFrame& f) {
     QPID_LOG(trace, cluster << "DLVR " << *this << ": " << f);
     assert(!catchUp);
-    // Handle connection controls, deliver other frames to connection.
-    currentChannel = f.getChannel();
-    if (!framing::invoke(*this, *f.getBody()).wasHandled())
-        connection.received(f);
+    currentChannel = f.getChannel(); 
+    if (!framing::invoke(*this, *f.getBody()).wasHandled() // Connection 
contol.
+        && !checkUnsupported(*f.getBody())) // Unsupported operation.
+    {
+        connection.received(f); // Pass to broker connection.
+    }
 }
 
 void Connection::closed() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Fri Oct 31 
09:56:04 2008
@@ -121,6 +121,7 @@
   private:
     bool catcUp;
 
+    bool checkUnsupported(const framing::AMQBody& body);
     void deliverClose();
     void deliverDoOutput(uint32_t requested);
     void sendDoOutput();

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp?rev=709474&r1=709473&r2=709474&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/cluster_test.cpp Fri Oct 31 
09:56:04 2008
@@ -220,6 +220,20 @@
     uint16_t channel;
 };
 
+QPID_AUTO_TEST_CASE(testUnsupported) {
+    ScopedSuppressLogging sl;
+    ClusterFixture cluster(1);
+    Client c0(cluster[0], "c0");
+    BOOST_CHECK_THROW(c0.session.txSelect(), Exception);
+    BOOST_CHECK(!c0.connection.isOpen());
+    Client c1(cluster[0], "c1");
+    BOOST_CHECK_THROW(c1.session.dtxCommit(), Exception);    
+    Client c2(cluster[0], "c2");
+    Message  m;
+    m.getDeliveryProperties().setTtl(1);
+    BOOST_CHECK_THROW(c2.session.messageTransfer(arg::content=m), Exception);  
  
+}
+
 QPID_AUTO_TEST_CASE(testUnacked) {
     // Verify replication of unacknowledged messages.
     ClusterFixture cluster(1);


Reply via email to