Author: gsim
Date: Thu May 31 09:31:20 2007
New Revision: 543182

URL: http://svn.apache.org/viewvc?view=rev&rev=543182
Log:
Updates to dtx support.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/generate.sh
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu May 31 09:31:20 2007
@@ -181,6 +181,7 @@
   qpid/broker/DeliverableMessage.cpp \
   qpid/broker/DeliveryRecord.cpp \
   qpid/broker/DirectExchange.cpp \
+  qpid/broker/DtxAck.cpp \
   qpid/broker/DtxBuffer.cpp \
   qpid/broker/DtxHandlerImpl.cpp \
   qpid/broker/DtxManager.cpp \
@@ -234,6 +235,7 @@
   qpid/broker/Deliverable.h \
   qpid/broker/DeliverableMessage.h \
   qpid/broker/DirectExchange.h \
+  qpid/broker/DtxAck.h \
   qpid/broker/DtxBuffer.h \
   qpid/broker/DtxHandlerImpl.h \
   qpid/broker/DtxManager.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/generate.sh
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/generate.sh?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/generate.sh (original)
+++ incubator/qpid/trunk/qpid/cpp/src/generate.sh Thu May 31 09:31:20 2007
@@ -7,7 +7,7 @@
 
 gentools_dir="$srcdir/../gentools"
 specs_dir="$srcdir/../../specs"
-specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml  
$specs_dir/amqp-dtx-preview.xml"
+specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml  
$specs_dir/amqp-dtx-preview.0-9.xml"
 test -z "$JAVA" && JAVA=java ; 
 test -z "$JAVAC" && JAVAC=javac ; 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu May 31 
09:31:20 2007
@@ -28,17 +28,19 @@
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
 
-#include "BrokerChannel.h"
 #include "qpid/framing/ChannelAdapter.h"
 #include "qpid/QpidError.h"
-#include "DeliverableMessage.h"
-#include "BrokerQueue.h"
+
+#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
 #include "BrokerMessage.h"
+#include "BrokerQueue.h"
+#include "Connection.h"
+#include "DeliverableMessage.h"
+#include "DtxAck.h"
 #include "MessageStore.h"
 #include "TxAck.h"
 #include "TxPublish.h"
-#include "BrokerAdapter.h"
-#include "Connection.h"
 
 using std::mem_fun_ref;
 using std::bind2nd;
@@ -133,7 +135,8 @@
                                   % dtxBuffer->getXid() % xid);
     }
 
-    TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+    TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+    accumulatedAck.clear();
     dtxBuffer->enlist(txAck);    
     dtxBuffer->markEnded();
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu May 31 
09:31:20 2007
@@ -38,8 +38,8 @@
         class DeliveryRecord{
             mutable Message::shared_ptr msg;
             mutable Queue::shared_ptr queue;
-            std::string consumerTag;
-            uint64_t deliveryTag;
+            const std::string consumerTag;
+            const uint64_t deliveryTag;
             bool pull;
 
         public:

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?view=auto&rev=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Thu May 31 
09:31:20 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxAck.h"
+#include "qpid/log/Statement.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+{
+    remove_copy_if(unacked.begin(), unacked.end(), inserter(pending, 
pending.end()), 
+                   not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), 
&acked)));
+    unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy), 
&acked));
+}
+
+bool DtxAck::prepare(TransactionContext* ctxt) throw()
+{
+    try{
+        //record dequeue in the store
+        for (ack_iterator i = pending.begin(); i != pending.end(); i++) {
+            i->discard(ctxt);
+        }
+        return true;
+    }catch(...){
+        QPID_LOG(error, "Failed to prepare");
+        return false;
+    }
+}
+
+void DtxAck::commit() throw()
+{
+    pending.clear();
+}
+
+void DtxAck::rollback() throw()
+{
+    for_each(pending.begin(), pending.end(), 
mem_fun_ref(&DeliveryRecord::requeue));
+    pending.clear();
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?view=auto&rev=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Thu May 31 09:31:20 
2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxAck_
+#define _DtxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "AccumulatedAck.h"
+#include "DeliveryRecord.h"
+#include "TxOp.h"
+
+namespace qpid {
+    namespace broker {
+        class DtxAck : public TxOp{
+            std::list<DeliveryRecord> pending;
+
+        public:
+            DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+            virtual bool prepare(TransactionContext* ctxt) throw();
+            virtual void commit() throw();
+            virtual void rollback() throw();
+            virtual ~DtxAck(){}
+        };
+    }
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Thu May 31 
09:31:20 2007
@@ -22,23 +22,29 @@
 #include "BrokerChannel.h"
 
 using namespace qpid::broker;
+using qpid::framing::AMQP_ClientProxy;
 using qpid::framing::FieldTable;
 using qpid::framing::MethodContext;
 using std::string;
 
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : 
+    CoreRefs(parent),
+    dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)),
+    cClient(AMQP_ClientProxy::DtxCoordination::get(proxy))
+
+{
+}
 
 
 // DtxDemarcationHandler:
 
 
-void DtxHandlerImpl::select(const MethodContext& /*context*/ )
+void DtxHandlerImpl::select(const MethodContext& context )
 {
-    //don't need to do anything here really
-    //send select-ok
+    dClient.selectOk(context.getRequestId());
 }
 
-void DtxHandlerImpl::end(const MethodContext& /*context*/,
+void DtxHandlerImpl::end(const MethodContext& context,
                          u_int16_t /*ticket*/,
                          const string& xid,
                          bool fail,
@@ -54,10 +60,10 @@
     } else {
         channel.endDtx(xid);
     }
-    //send end-ok
+    dClient.endOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
-void DtxHandlerImpl::start(const MethodContext& /*context*/,
+void DtxHandlerImpl::start(const MethodContext& context,
                            u_int16_t /*ticket*/,
                            const string& xid,
                            bool /*join*/,
@@ -69,36 +75,36 @@
     } else {
         channel.startDtx(xid, broker.getDtxManager());
     }
-    //send start-ok
+    dClient.startOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
 // DtxCoordinationHandler:
 
-void DtxHandlerImpl::prepare(const MethodContext& /*context*/,
+void DtxHandlerImpl::prepare(const MethodContext& context,
                              u_int16_t /*ticket*/,
                              const string& xid )
 {
     broker.getDtxManager().prepare(xid);
-    //send prepare-ok
+    cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
-void DtxHandlerImpl::commit(const MethodContext& /*context*/,
+void DtxHandlerImpl::commit(const MethodContext& context,
                             u_int16_t /*ticket*/,
                             const string& xid,
                             bool /*onePhase*/ )
 {
-    broker.getDtxManager().commit(xid);
-    //send commit-ok
     //TODO use onePhase flag to validate correct sequence
+    broker.getDtxManager().commit(xid);
+    cClient.commitOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
 
-void DtxHandlerImpl::rollback(const MethodContext& /*context*/,
+void DtxHandlerImpl::rollback(const MethodContext& context,
                               u_int16_t /*ticket*/,
                               const string& xid )
 {
     broker.getDtxManager().rollback(xid);
-    //send rollback-ok
+    cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId());
 }
 
 void DtxHandlerImpl::recover(const MethodContext& /*context*/,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Thu May 31 
09:31:20 2007
@@ -31,6 +31,8 @@
       public framing::AMQP_ServerOperations::DtxCoordinationHandler,
       public framing::AMQP_ServerOperations::DtxDemarcationHandler
 {    
+    framing::AMQP_ClientProxy::DtxDemarcation dClient;
+    framing::AMQP_ClientProxy::DtxCoordination cClient;
 public:
     DtxHandlerImpl(CoreRefs& parent);
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu May 31 
09:31:20 2007
@@ -20,6 +20,7 @@
  */
 #include "DtxManager.h"
 #include <boost/format.hpp>
+#include <iostream>
 
 using namespace qpid::broker;
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp 
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp Thu May 
31 09:31:20 2007
@@ -98,7 +98,7 @@
                 while(frame && out.available() >= frame->size()){
                     encoded = true;
                     frame->encode(out);
-                    QPID_LOG(debug, "SENT: " << frame);
+                    QPID_LOG(debug, "SENT: " << *frame);
                     delete frame;
                     framesToWrite.pop();
                     frame = framesToWrite.empty() ? 0 : framesToWrite.front();


Reply via email to