Author: gsim
Date: Thu May 31 09:31:20 2007
New Revision: 543182
URL: http://svn.apache.org/viewvc?view=rev&rev=543182
Log:
Updates to dtx support.
Added:
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (with props)
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (with props)
Modified:
incubator/qpid/trunk/qpid/cpp/src/Makefile.am
incubator/qpid/trunk/qpid/cpp/src/generate.sh
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp
Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Thu May 31 09:31:20 2007
@@ -181,6 +181,7 @@
qpid/broker/DeliverableMessage.cpp \
qpid/broker/DeliveryRecord.cpp \
qpid/broker/DirectExchange.cpp \
+ qpid/broker/DtxAck.cpp \
qpid/broker/DtxBuffer.cpp \
qpid/broker/DtxHandlerImpl.cpp \
qpid/broker/DtxManager.cpp \
@@ -234,6 +235,7 @@
qpid/broker/Deliverable.h \
qpid/broker/DeliverableMessage.h \
qpid/broker/DirectExchange.h \
+ qpid/broker/DtxAck.h \
qpid/broker/DtxBuffer.h \
qpid/broker/DtxHandlerImpl.h \
qpid/broker/DtxManager.h \
Modified: incubator/qpid/trunk/qpid/cpp/src/generate.sh
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/generate.sh?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/generate.sh (original)
+++ incubator/qpid/trunk/qpid/cpp/src/generate.sh Thu May 31 09:31:20 2007
@@ -7,7 +7,7 @@
gentools_dir="$srcdir/../gentools"
specs_dir="$srcdir/../../specs"
-specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml
$specs_dir/amqp-dtx-preview.xml"
+specs="$specs_dir/amqp.0-9.xml $specs_dir/amqp-errata.0-9.xml
$specs_dir/amqp-dtx-preview.0-9.xml"
test -z "$JAVA" && JAVA=java ;
test -z "$JAVAC" && JAVAC=javac ;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Thu May 31
09:31:20 2007
@@ -28,17 +28,19 @@
#include <boost/bind.hpp>
#include <boost/format.hpp>
-#include "BrokerChannel.h"
#include "qpid/framing/ChannelAdapter.h"
#include "qpid/QpidError.h"
-#include "DeliverableMessage.h"
-#include "BrokerQueue.h"
+
+#include "BrokerAdapter.h"
+#include "BrokerChannel.h"
#include "BrokerMessage.h"
+#include "BrokerQueue.h"
+#include "Connection.h"
+#include "DeliverableMessage.h"
+#include "DtxAck.h"
#include "MessageStore.h"
#include "TxAck.h"
#include "TxPublish.h"
-#include "BrokerAdapter.h"
-#include "Connection.h"
using std::mem_fun_ref;
using std::bind2nd;
@@ -133,7 +135,8 @@
% dtxBuffer->getXid() % xid);
}
- TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
+ TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+ accumulatedAck.clear();
dtxBuffer->enlist(txAck);
dtxBuffer->markEnded();
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DeliveryRecord.h Thu May 31
09:31:20 2007
@@ -38,8 +38,8 @@
class DeliveryRecord{
mutable Message::shared_ptr msg;
mutable Queue::shared_ptr queue;
- std::string consumerTag;
- uint64_t deliveryTag;
+ const std::string consumerTag;
+ const uint64_t deliveryTag;
bool pull;
public:
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp?view=auto&rev=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp Thu May 31
09:31:20 2007
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxAck.h"
+#include "qpid/log/Statement.h"
+
+using std::bind1st;
+using std::bind2nd;
+using std::mem_fun_ref;
+using namespace qpid::broker;
+
+DtxAck::DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked)
+{
+ remove_copy_if(unacked.begin(), unacked.end(), inserter(pending,
pending.end()),
+ not1(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy),
&acked)));
+ unacked.remove_if(bind2nd(mem_fun_ref(&DeliveryRecord::coveredBy),
&acked));
+}
+
+bool DtxAck::prepare(TransactionContext* ctxt) throw()
+{
+ try{
+ //record dequeue in the store
+ for (ack_iterator i = pending.begin(); i != pending.end(); i++) {
+ i->discard(ctxt);
+ }
+ return true;
+ }catch(...){
+ QPID_LOG(error, "Failed to prepare");
+ return false;
+ }
+}
+
+void DtxAck::commit() throw()
+{
+ pending.clear();
+}
+
+void DtxAck::rollback() throw()
+{
+ for_each(pending.begin(), pending.end(),
mem_fun_ref(&DeliveryRecord::requeue));
+ pending.clear();
+}
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.cpp
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h?view=auto&rev=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h Thu May 31 09:31:20
2007
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxAck_
+#define _DtxAck_
+
+#include <algorithm>
+#include <functional>
+#include <list>
+#include "AccumulatedAck.h"
+#include "DeliveryRecord.h"
+#include "TxOp.h"
+
+namespace qpid {
+ namespace broker {
+ class DtxAck : public TxOp{
+ std::list<DeliveryRecord> pending;
+
+ public:
+ DtxAck(AccumulatedAck& acked, std::list<DeliveryRecord>& unacked);
+ virtual bool prepare(TransactionContext* ctxt) throw();
+ virtual void commit() throw();
+ virtual void rollback() throw();
+ virtual ~DtxAck(){}
+ };
+ }
+}
+
+
+#endif
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxAck.h
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Thu May 31
09:31:20 2007
@@ -22,23 +22,29 @@
#include "BrokerChannel.h"
using namespace qpid::broker;
+using qpid::framing::AMQP_ClientProxy;
using qpid::framing::FieldTable;
using qpid::framing::MethodContext;
using std::string;
-DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) : CoreRefs(parent) {}
+DtxHandlerImpl::DtxHandlerImpl(CoreRefs& parent) :
+ CoreRefs(parent),
+ dClient(AMQP_ClientProxy::DtxDemarcation::get(proxy)),
+ cClient(AMQP_ClientProxy::DtxCoordination::get(proxy))
+
+{
+}
// DtxDemarcationHandler:
-void DtxHandlerImpl::select(const MethodContext& /*context*/ )
+void DtxHandlerImpl::select(const MethodContext& context )
{
- //don't need to do anything here really
- //send select-ok
+ dClient.selectOk(context.getRequestId());
}
-void DtxHandlerImpl::end(const MethodContext& /*context*/,
+void DtxHandlerImpl::end(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
bool fail,
@@ -54,10 +60,10 @@
} else {
channel.endDtx(xid);
}
- //send end-ok
+ dClient.endOk(0/*TODO - set flags*/, context.getRequestId());
}
-void DtxHandlerImpl::start(const MethodContext& /*context*/,
+void DtxHandlerImpl::start(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
bool /*join*/,
@@ -69,36 +75,36 @@
} else {
channel.startDtx(xid, broker.getDtxManager());
}
- //send start-ok
+ dClient.startOk(0/*TODO - set flags*/, context.getRequestId());
}
// DtxCoordinationHandler:
-void DtxHandlerImpl::prepare(const MethodContext& /*context*/,
+void DtxHandlerImpl::prepare(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid )
{
broker.getDtxManager().prepare(xid);
- //send prepare-ok
+ cClient.prepareOk(0/*TODO - set flags*/, context.getRequestId());
}
-void DtxHandlerImpl::commit(const MethodContext& /*context*/,
+void DtxHandlerImpl::commit(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid,
bool /*onePhase*/ )
{
- broker.getDtxManager().commit(xid);
- //send commit-ok
//TODO use onePhase flag to validate correct sequence
+ broker.getDtxManager().commit(xid);
+ cClient.commitOk(0/*TODO - set flags*/, context.getRequestId());
}
-void DtxHandlerImpl::rollback(const MethodContext& /*context*/,
+void DtxHandlerImpl::rollback(const MethodContext& context,
u_int16_t /*ticket*/,
const string& xid )
{
broker.getDtxManager().rollback(xid);
- //send rollback-ok
+ cClient.rollbackOk(0/*TODO - set flags*/, context.getRequestId());
}
void DtxHandlerImpl::recover(const MethodContext& /*context*/,
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.h Thu May 31
09:31:20 2007
@@ -31,6 +31,8 @@
public framing::AMQP_ServerOperations::DtxCoordinationHandler,
public framing::AMQP_ServerOperations::DtxDemarcationHandler
{
+ framing::AMQP_ClientProxy::DtxDemarcation dClient;
+ framing::AMQP_ClientProxy::DtxCoordination cClient;
public:
DtxHandlerImpl(CoreRefs& parent);
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Thu May 31
09:31:20 2007
@@ -20,6 +20,7 @@
*/
#include "DtxManager.h"
#include <boost/format.hpp>
+#include <iostream>
using namespace qpid::broker;
Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp?view=diff&rev=543182&r1=543181&r2=543182
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp
(original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/apr/LFSessionContext.cpp Thu May
31 09:31:20 2007
@@ -98,7 +98,7 @@
while(frame && out.available() >= frame->size()){
encoded = true;
frame->encode(out);
- QPID_LOG(debug, "SENT: " << frame);
+ QPID_LOG(debug, "SENT: " << *frame);
delete frame;
framesToWrite.pop();
frame = framesToWrite.empty() ? 0 : framesToWrite.front();